Springboot 集成 ws

Springboot 集成 WS

Spring Boot 2.7.5 原生 WebSocket(spring-boot-starter-websocket),实现应用层消息路由、Redis 心跳检测、定时推送任务的完整方案。


工作中对需要利用ws实现的需求较多,因此记录一下自己集成实现过程,不算是最佳实现,仅供参考

目录

  1. 消息协议
  2. 配置入口
  3. 会话管理
  4. 消息路由
  5. 入口处理器
  6. 业务处理器
  7. 架构总览

1. 消息协议

通信的第一步:约定前后端统一的消息格式。

1.1 WsMessage — 消息载体

定义 JSON 消息结构,包含 code(业务状态码)、event(事件类型)、msg(描述)、data(业务数据)。提供静态工厂方法简化标准响应的构建。

@Data
public class WsMessage<T> implements Serializable {

    private static final long serialVersionUID = 1L;

    private int code;
    private String event;
    private String msg;
    private T data;

    /** 序列化为 JSON 字符串,用于通过 WebSocket 文本帧发送 */
    public String toText() {
        return JSON.toJSONString(this);
    }

    /** 构建成功响应 (code=200) */
    public static <T> WsMessage<T> success(String event, T data) {
        WsMessage<T> message = new WsMessage<>();
        message.setCode(200);
        message.setMsg("success");
        message.setEvent(event);
        message.setData(data);
        return message;
    }

    /** 构建服务端错误响应 (code=500) */
    public static <T> WsMessage<T> error(String msg) {
        WsMessage<T> message = new WsMessage<>();
        message.setCode(500);
        message.setMsg(msg);
        message.setEvent(WsEventConstants.ERROR);
        return message;
    }

    /** 构建"未知事件类型"响应 (code=400) */
    public static WsMessage<Void> unknownEvent(String event) {
        WsMessage<Void> message = new WsMessage<>();
        message.setCode(400);
        message.setMsg("Unknown event type: " + event);
        message.setEvent(WsEventConstants.ERROR);
        return message;
    }
}

要点:

  • toText() 将消息序列化为 JSON,通过 WebSocket 文本帧发送
  • data 字段使用泛型 T,但运行时被擦除为 Object(反序列化后是 fastjson 的 JSONObject),各 Handler 需自行做二次类型转换
  • 三种工厂方法覆盖成功、错误、未知事件三种标准响应

1.2 WsEventConstants — 事件常量

集中管理 event 字段的可取值。常量值与 WsMessageHandler#getHandlerType() 返回值一一对应,作为消息路由的 key。

public final class WsEventConstants {

    private WsEventConstants() {}

    /** 通用错误事件 */
    public static final String ERROR = "error";
    /** 客户端心跳请求 */
    public static final String PING = "ping";
    /** 服务端心跳响应 */
    public static final String PONG = "pong";
    /** 告警查询 */
    public static final String ALARM = "alarm";
    /** 订阅未读消息推送 */
    public static final String UN_READ_INFO = "un_read_info";
}

要点:

  • 新增事件类型只需在这里加常量 + 实现一个 WsMessageHandler,路由逻辑无需修改

这里的 PING / PONG 是应用层的业务心跳(JSON 消息),与 WebSocket 协议层(RFC 6455)的 Ping/Pong 控制帧是完全不同的概念——协议层的帧由 Tomcat 自动处理,应用代码无需关心。


2. 配置入口

2.1 WebSocketConfig — 端点注册

使用 Spring 原生 WebSocket(@EnableWebSocket + WebSocketConfigurer)。


@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Resource
    private WsEndpointHandler endpointHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(endpointHandler, "/ws").setAllowedOrigins("*");
    }
}

要点:

  • 客户端通过 ws://host:port/ws 建立连接
  • setAllowedOrigins("*") 允许跨域

2.2 application-dev.yaml — 心跳参数

ws:
  heartbeat:
    expire-seconds: 20   # 20 秒未收到 ping 即判定会话失效

expire-seconds 控制 Redis 心跳 key 的 TTL。开发环境设 20 秒方便调试,生产环境应加大(如 90 秒),避免网络抖动误杀连接。


3. 会话管理

3.1 WsSessionManager — 会话生命周期 + Redis 心跳 + 死连接清理

集中管理所有 WebSocket 会话的注册、注销、心跳刷新和死连接清理。核心思路是利用 Redis key 的 TTL 机制做心跳检测——客户端发 ping 时刷新 key 的过期时间,定时任务扫描 key 已失效的 session 并清理。

/**
 * WebSocket 会话状态管理器。
 * <p>
 * 统一管理 sessionMap(内存)、Redis 心跳(持久化)以及定时扫描清理超时会话。
 */
@Slf4j
@Component
@ConfigurationProperties(prefix = "ws.heartbeat")
public class WsSessionManager {

    private static final String HEARTBEAT_KEY_PREFIX = "ws:heartbeat:";

    /** Redis 心跳 key 的过期时间(秒),超过此时间未收到 ping 即判定会话失效 */
    private int expireSeconds = 90;

    public void setExpireSeconds(int expireSeconds) {
        this.expireSeconds = expireSeconds;
    }

    //会话管理map
    private final ConcurrentMap<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();

    //自定义会话关闭前置处理器
    private final List<SessionCloseListener> closeListeners = new CopyOnWriteArrayList<>();

    @Resource
    private RedisService redisService;

    /** 注册会话关闭回调,unregister 时通知;用于关闭会话前需要进行自定义逻辑处理*/
    public void addCloseListener(SessionCloseListener listener) {
        closeListeners.add(listener);
    }

    private String heartbeatKey(WebSocketSession session) {
        return redisService.generateRedisKey(HEARTBEAT_KEY_PREFIX, session.getId());
    }

    /** 注册会话:包装为线程安全 + 加入内存 + 写入 Redis 初始心跳 */
    public void register(WebSocketSession session) {
        WebSocketSession safeSession = new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64 * 1024);
        String id = session.getId();
        sessionMap.put(id, safeSession);
        redisService.set(heartbeatKey(session), LocalDateTime.now().toString(), expireSeconds);
        log.info("WebSocket session registered: {}, total: {}", id, sessionMap.size());
    }

    /** 注销会话:通知监听器 → 关闭连接 + 从内存移除 + 删除 Redis 心跳 key */
    public void unregister(WebSocketSession session) {
        String id = session.getId();
        //关闭前需要进行自定义逻辑处理的会话
        closeListeners.forEach(l -> l.onSessionClosed(id));
        sessionMap.remove(id);
        redisService.delete(heartbeatKey(session));
        closeQuietly(session, CloseStatus.NORMAL);
        log.info("WebSocket session unregistered: {}, total: {}", id, sessionMap.size());
    }

    /** 刷新心跳(客户端发 ping 时调用) */
    public void refreshHeartbeat(WebSocketSession session) {
        redisService.set(heartbeatKey(session), LocalDateTime.now().toString(), expireSeconds);
    }

    /** 获取指定会话,供主动推送时使用 */
    public WebSocketSession getSession(String id) {
        return sessionMap.get(id);
    }

    /** 判断会话心跳是否有效 */
    public boolean isAlive(WebSocketSession session) {
        return redisService.hasKey(heartbeatKey(session));
    }

    /** 当前在线会话数 */
    public int size() {
        return sessionMap.size();
    }

    @PreDestroy
    public void destroy() {
        List<WebSocketSession> sessions = new ArrayList<>(sessionMap.values());
        sessionMap.clear();
        sessions.forEach(s -> {
            redisService.delete(heartbeatKey(s));
            closeQuietly(s, CloseStatus.GOING_AWAY);
        });
        log.info("All WebSocket sessions cleaned up on shutdown, total closed: {}", sessions.size());
    }

    @Scheduled(fixedDelay = 30_000)
    public void cleanDeadSessions() {
        List<WebSocketSession> deadSessions = sessionMap.values().stream()
                .filter(s -> !redisService.hasKey(heartbeatKey(s)))
                .collect(Collectors.toList());

        deadSessions.forEach(s -> {
            String id = s.getId();
            closeListeners.forEach(l -> l.onSessionClosed(id));
            sessionMap.remove(id);
            closeQuietly(s, CloseStatus.SESSION_NOT_RELIABLE);
        });

        if (!deadSessions.isEmpty()) {
            log.info("Cleaned {} dead sessions, remaining: {}", deadSessions.size(), sessionMap.size());
        }
    }

    private void closeQuietly(WebSocketSession session, CloseStatus status) {
        try {
            if (session.isOpen()) {
                session.close(status);
            }
        } catch (IOException e) {
            log.error("Failed to close session: {}", session.getId(), e);
        }
    }
}
3.1.1 SessionCloseListener — 会话关闭前置处理(无需求可以不用)

自定义的会话退出前置处理器:SessionCloseListener ,有业务在会话关闭前需要进行一些自定义的逻辑处理,只需要在对应MessageHandler中实现并在初始化是添加到List<SessionCloseListener> closeListeners中,在session关闭时使用即可closeListeners.forEach(l -> l.onSessionClosed(id));

/**
 * Session 关闭监听器。
 * 注册到 {@link WsSessionManager#addCloseListener} 后,session 注销时回调。
 */
@FunctionalInterface
public interface SessionCloseListener {

    /** session 关闭时触发,参数为 sessionId */
    void onSessionClosed(String sessionId);
}

要点:

  • 心跳检测流程: 客户端发 ping → refreshHeartbeat 刷新 Redis key TTL → 定时任务 cleanDeadSessions 扫描 key 不存在的 session → 清理
  • 线程安全: register 时用 ConcurrentWebSocketSessionDecorator 包装 session,内部队列保证多线程 send 时串行化,避免数据交错
  • 优雅退出: @PreDestroy 批量清理 + closeQuietly 静默关闭,不会因个别连接关闭失败中断整体清理
  • @ConfigurationProperties(prefix = "ws.heartbeat") 绑定 YAML 配置,expireSeconds 默认可被 application.yaml 覆盖

4. 消息路由

4.1 WsMessageHandler — 处理器接口

定义消息处理器的统一契约。每新增一种事件类型,只需实现此接口并加 @Component,Spring 自动收集并注入到 MessageHandlerFactory

public interface WsMessageHandler {

    /** 返回该处理器对应的 event 类型值 */
    String getHandlerType();

    /**
     * 处理消息。
     * data 运行时类型为 fastjson 的 JSONObject(因为 WsMessage 的泛型 T 在运行时被擦除),
     * 各 Handler 需自行将 data 转换为目标领域对象。
     */
    void handle(WebSocketSession session, Object data) throws IOException;
}

要点:

  • getHandlerType() = WsEventConstants 中的常量 = 客户端 JSON 的 event 字段,三者保持一致
  • data 参数是 JSONObject,因为泛型擦除,Handler 内部需要用 JSON.parseObject(JSON.toJSONString(data), Target.class) 做二次转换
  • 接口声明 throws IOException,由上层 WsEndpointHandler 统一 catch

4.2 MessageHandlerFactory — 路由工厂

Spring 自动收集容器中所有 WsMessageHandler 实现类到列表,在 @PostConstruct 阶段转换为 Map<eventType, Handler>

@Slf4j
@Component
public class MessageHandlerFactory {

    @Resource
    private List<WsMessageHandler> handlers;

    private Map<String, WsMessageHandler> handlerMap;

    @PostConstruct
    public void init() {
        if (handlers == null || handlers.isEmpty()) {
            log.warn("No WsMessageHandler implementations found");
            handlerMap = new ConcurrentHashMap<>();
            return;
        }
        handlerMap = handlers.stream()
                .collect(Collectors.toMap(WsMessageHandler::getHandlerType, Function.identity()));
        log.info("Registered {} WebSocket message handlers: {}", handlerMap.size(), handlerMap.keySet());
    }

    public WsMessageHandler getHandler(String type) {
        WsMessageHandler handler = handlerMap.get(type);
        if (handler == null) {
            log.warn("No handler found for event type: {}", type);
        }
        return handler;
    }
}

要点:

  • Spring 自动收集注入 handlers 列表——新增 Handler 只需加 @Component,无需手动注册
  • @PostConstruct 一次性建好路由表
  • ConcurrentHashMap 保证并发读安全

5. 入口处理器

5.1 WsEndpointHandler — 连接 & 消息入口

继承 TextWebSocketHandler,是整个 WebSocket 通信的总入口。每个消息到达后的处理流程:心跳有效性检查 → JSON 解析 → 按 event 路由 Handler → 执行业务。任一环节失败都通过 safeSend 返回错误消息,不会导致连接中断。

@Slf4j
@Component
public class WsEndpointHandler extends TextWebSocketHandler {

    @Resource
    private MessageHandlerFactory handlerFactory;

    @Resource
    private WsSessionManager sessionManager;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        sessionManager.register(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessionManager.unregister(session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        // 心跳失效的会话直接注销,不再处理消息
        if (!sessionManager.isAlive(session)) {
            sessionManager.unregister(session);
            return;
        }

        String payload = message.getPayload();

        // 1. 解析 JSON
        WsMessage<?> wsMessage;
        try {
            wsMessage = JSON.parseObject(payload, WsMessage.class);
        } catch (Exception e) {
            log.error("Failed to parse WebSocket message: {}", payload, e);
            safeSend(session, WsMessage.error("Invalid message format").toText());
            return;
        }

        // 2. 获取线程安全的 session(由 WsSessionManager 用 ConcurrentWebSocketSessionDecorator 包装)
        WebSocketSession safeSession = sessionManager.getSession(session.getId());
        if (safeSession == null) {
            log.warn("Session not found in manager: {}", session.getId());
            return;
        }

        // 3. 按 event 路由到对应的 Handler
        WsMessageHandler handler = handlerFactory.getHandler(wsMessage.getEvent());
        if (handler == null) {
            safeSend(safeSession, WsMessage.unknownEvent(wsMessage.getEvent()).toText());
            return;
        }

        // 4. 执行业务处理
        try {
            handler.handle(safeSession, wsMessage.getData());
        } catch (Exception e) {
            log.error("Handler error for event: {}, session: {}", wsMessage.getEvent(), session.getId(), e);
            safeSend(safeSession, WsMessage.error("Handler error: " + e.getMessage()).toText());
        }
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable error) {
        String id = session != null ? session.getId() : "unknown";
        log.error("WebSocket transport error on session: {}", id, error);
        if (session != null) {
            sessionManager.unregister(session);
        }
    }

    /** 安全发送——捕获发送异常避免连接中断 */
    private void safeSend(WebSocketSession session, String text) {
        try {
            session.sendMessage(new TextMessage(text));
        } catch (Exception e) {
            log.error("Failed to send message to session: {}", session.getId(), e);
        }
    }
}

要点:

  • 消息处理四步走: 心跳检查 → 解析 → 路由 → 执行,每步有独立的错误处理
  • 安全 session:sessionManager.getSession(id) 获取的是 ConcurrentWebSocketSessionDecorator 包装后的实例,所有 send 操作使用它保证线程安全
  • safeSend: 发送失败只记录日志不抛异常,避免因网络抖动导致整个连接断开
  • 连接/断开: 委托给 sessionManager.register/unregister,Handler 本身不持有会话状态

6. 业务处理器

6.1 HeartbeatMessageHandler — 心跳

客户端定时发送 {"event":"ping"},服务端刷新 Redis 心跳 key 的 TTL + 回复 {"event":"pong","code":200}

@Slf4j
@Component
public class HeartbeatMessageHandler implements WsMessageHandler {

    @Resource
    private WsSessionManager sessionManager;

    @Override
    public String getHandlerType() {
        return WsEventConstants.PING;
    }

    @Override
    public void handle(WebSocketSession session, Object data) throws IOException {
        sessionManager.refreshHeartbeat(session);
        session.sendMessage(new TextMessage(
                WsMessage.success(WsEventConstants.PONG, "pong").toText()));
    }
}

要点:

  • refreshHeartbeat 重置 Redis key 的 TTL,若客户端停止发 ping,key 过期后 cleanDeadSessions 会自动清理
  • 回复 pong 让客户端也能检测连接是否存活(客户端发 ping 后若未收到 pong 则可认为连接已断)

6.2 AlarmMessageHandler — 请求-响应模式

客户端发送 {"event":"alarm","data":{"eventId":"xxx"}},服务端查数据库返回最新告警详情。这是典型的请求-响应交互模式。

@Slf4j
@Component
public class AlarmMessageHandler implements WsMessageHandler {

    @Resource
    private EventService eventService;

    @Override
    public String getHandlerType() {
        return WsEventConstants.ALARM;
    }

    @Override
    public void handle(WebSocketSession session, Object data) throws IOException {
        // data 是 JSONObject,需经过序列化→反序列化两步转换为目标类型
        Event reqEvent = JSON.parseObject(JSON.toJSONString(data), Event.class);
        Event alarmLatest = eventService.getById(reqEvent.getEventId());
        session.sendMessage(new TextMessage(
                WsMessage.success(WsEventConstants.ALARM, alarmLatest).toText()));
    }
}

要点:

  • 泛型擦除的处理: data 实际类型是 JSONObject,不能直接强转为 Event,需要 toJSONString → parseObject 两步完成类型转换
  • 请求-响应模式:一次请求对应一次响应,Handler 执行完就结束

6.3 UnreadInfoMessageHandler — 定时推送模式

客户端发送 {"event":"un_read_info","data":{"userId":"xxx"}} 订阅推送,服务端为该 session 启动一个定时任务,每隔 2 秒查询未读消息数并推送。任务每次执行前自检 session 心跳状态,心跳超时自动取消。

/**
 * 未读消息推送处理器。
 * <p>
 * 客户端发送 {@code {"event":"un_read_info","data":{"userId":"xxx"}}} 订阅推送,
 * 服务端每 2 秒查询未读数并推送给该 session。
 * <p>
 * 退出无需客户端主动发消息:任务每次执行前自检 {@code isAlive()},
 * session 心跳超时后自动取消。
 */
@Slf4j
@Component
public class UnreadInfoMessageHandler implements WsMessageHandler {

    //推送间隔
    private static final long PUSH_INTERVAL_SECONDS = 2;

    //任务执行池
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

    //任务管理map
    private final ConcurrentMap<String, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();

    @Resource
    private WsSessionManager sessionManager;

    @Resource
    private EmployeeMapper employeeMapper;

    //因为需要关闭任务池,所以把停止方法作为在初始化的时候加入到sessionManager
    @PostConstruct
    public void init() {
        sessionManager.addCloseListener(this::stopPush);
    }

    @Override
    public String getHandlerType() {
        return WsEventConstants.UN_READ_INFO;
    }

    @Override
    public void handle(WebSocketSession session, Object data) throws IOException {
        String sessionId = session.getId();

        EmployeeEntity employeeEntity = JSON.parseObject(JSON.toJSONString(data), EmployeeEntity.class);
        //业务校验
        Long employeeId = employeeEntity.getEmployeeId();
        if (ObjectUtil.isNull(employeeId)){
            session.sendMessage(new TextMessage(WsMessage.error("employeeId is required").toText()));
            return;
        }
        EmployeeVO employee = employeeMapper.getEmployeeById(employeeId);
        if (ObjectUtil.isNull(employee)){
            String format = String.format("employeeId:{%d} not exist", employeeId);
            session.sendMessage(new TextMessage(WsMessage.error(format).toText()));
            return;
        }

        // computeIfAbsent 保证"检查 + 创建 + 入 map"原子执行
        tasks.computeIfAbsent(sessionId, id -> scheduler
                .scheduleAtFixedRate(
                        () -> pushUnreadMessage(sessionId, session, employeeId, employee),
                        0,
                        PUSH_INTERVAL_SECONDS,
                        TimeUnit.SECONDS));
    }

    private void pushUnreadMessage(String sessionId, WebSocketSession session, Long employeeId, EmployeeVO employee) {
        //检查session
        if (!sessionManager.isAlive(session)) {
            stopPush(sessionId);
            return;
        }
        //业务执行
        try {
            int count = queryUnreadCount(employeeId);
            String message = String.format("employee:name{%s} has {%d} UnReadMessage", employee.getActualName(), count);
            session.sendMessage(new TextMessage(WsMessage.success(WsEventConstants.UN_READ_INFO, message).toText()));
        } catch (IOException e) {
            log.error("Failed to push unread count, stopping push for session: {}", sessionId, e);
            stopPush(sessionId);
        }
    }

    /**
     * 会话关闭前,移除任务map 关闭推送任务
     */
    private void stopPush(String sessionId) {
        ScheduledFuture<?> future = tasks.remove(sessionId);
        if (ObjectUtil.isNotNull(future)){
            future.cancel(false);
            log.info("Unread push stopped for session: {}", sessionId);
        }
    }

    /**
     * 查询用户未读消息数。TODO: 替换为实际查询逻辑。
     */
    private int queryUnreadCount(Long employeeId) {
        return RandomUtil.randomInt(0,100);
    }

    @PreDestroy
    public void destroy() {
        tasks.values().forEach(f -> f.cancel(true));
        scheduler.shutdown();
    }
}

要点:

  • 重复订阅保护: tasks Map 记录已有的推送任务,同一 session 重复发送事件时直接跳过
  • 参数校验: 先校验 employeeId 非空,再校验员工是否存在,失败时返回明确的错误消息
  • 优雅退出(三重保障): 1) 每次执行前 isAlive() 自检,心跳超时自动 stopPush;2) @PreDestroy 保证应用关闭时取消所有任务并关闭线程池 ;3)通过SessionCloseListener 在会话关闭时,执行该传入函数 stopPush
  • 错误隔离: 某次推送发送失败只停止当前 session 的任务,不影响其他 session
  • 使用 ScheduledExecutorService 而非 @Scheduled,因为任务是按 session 动态创建/销毁的,@Scheduled 无法满足

7. 总结

7.1 协议层

先定通信契约,前后端、handler 之间的统一语言。

  • WsEventConstants — event 类型常量(ping/pong/alarm/un_read_info
  • WsMessage<T> — 消息载体,包含 code + event + msg + data,序列化为 JSON 文本帧传输

7.2 处理器接口

定义所有业务 handler 的契约,依赖协议层。

  • WsMessageHandlergetHandlerType() 返回 event 常量 + handle(session, data) 执行业务逻辑

7.3 路由层

依赖接口,不依赖具体实现。启动时自动收集,消息到达时分发。

  • MessageHandlerFactoryMap<event, handler>
    核心骨架完成:消息能路由,handler 可插拔。

7.4 会话管理

独立模块,只管会话生命周期。此时还没有 handler 注册进来,但回调接口已备好。

  • SessionCloseListener — 函数式接口,定义会话关闭的回调契约
  • WsSessionManager
    • register() — 包装 ConcurrentWebSocketSessionDecorator + 加入内存 + 写 Redis 心跳
    • unregister() — 通知监听器 → 移除内存 → 删 Redis key → 关闭连接
    • refreshHeartbeat() — 客户端 ping 时续期 Redis key
    • cleanDeadSessions() — 定时扫描心跳过期的 session 并清理
    • addCloseListener(SessionCloseListener) — 注册关闭回调

7.5 入口层

把路由和会话串起来。

  • WsEndpointHandler
    • afterConnectionEstablishedsessionManager.register()
    • afterConnectionClosedsessionManager.unregister()
    • handleTextMessage → JSON 解析 → 路由到 handler → handler.handle()
    • handleTransportError → 兜底清理

7.6 配置

挂载到 Spring 容器,此时服务可启动、消息可分发,只是还没有具体业务 handler。

  • WebSocketConfig@EnableWebSocket + 注册 /ws 端点

7.7 业务 handler(最后,逐个迭代)

每加一个 handler 加 @Component 注解,MessageHandlerFactory 自动收录,零耦合。

  • HeartbeatMessageHandler — ping → pong,刷新 Redis 心跳
  • AlarmMessageHandler — 查询告警详情并返回
  • UnreadInfoMessageHandler — 订阅推送,注册 SessionCloseListener 做主动清理

8. 架构总览

客户端
  │  ws://host:port/ws
  ▼
WebSocketConfig                 ← 注册端点 /ws
  │
  ▼
WsEndpointHandler               ← 连接建立 / 关闭 / 消息入口
  ├── WsSessionManager          ← 会话生命周期 + Redis 心跳 + 死连接清理
  └── MessageHandlerFactory     ← event → Handler 路由(O(1) 查找)
        │
        ├── HeartbeatMessageHandler   ← 心跳 ping/pong,刷新 Redis TTL
        ├── AlarmMessageHandler       ← 请求-响应模式:查询告警
        └── UnreadInfoMessageHandler  ← 推送模式:定时推送未读数

包结构

com.java.ws
├── WsMessage.java                  ← 消息协议载体
├── WsEventConstants.java           ← 事件常量
├── config
│   ├── WebSocketConfig.java        ← 端点注册
│   ├── WsSessionManager.java       ← 会话管理器
│   └── MessageHandlerFactory.java  ← 消息路由工厂
└── handler
    ├── SessionCloseListener.java  	←自定义会话关闭前置处理
    ├── WsMessageHandler.java       ← 处理器接口
    ├── WsEndpointHandler.java      ← 入口处理器
    └── message
        ├── HeartbeatMessageHandler.java 	← 心跳处理器
        ├── AlarmMessageHandler.java     	← 告警查询处理器
        └── UnreadInfoMessageHandler.java 	← 定时推送处理器

更多推荐