Springboot 集成 WS
Springboot 集成 ws
Springboot 集成 WS
Spring Boot 2.7.5 原生 WebSocket(
spring-boot-starter-websocket),实现应用层消息路由、Redis 心跳检测、定时推送任务的完整方案。
工作中对需要利用ws实现的需求较多,因此记录一下自己集成实现过程,不算是最佳实现,仅供参考
目录
1. 消息协议
通信的第一步:约定前后端统一的消息格式。
1.1 WsMessage — 消息载体
定义 JSON 消息结构,包含 code(业务状态码)、event(事件类型)、msg(描述)、data(业务数据)。提供静态工厂方法简化标准响应的构建。
@Data
public class WsMessage<T> implements Serializable {
private static final long serialVersionUID = 1L;
private int code;
private String event;
private String msg;
private T data;
/** 序列化为 JSON 字符串,用于通过 WebSocket 文本帧发送 */
public String toText() {
return JSON.toJSONString(this);
}
/** 构建成功响应 (code=200) */
public static <T> WsMessage<T> success(String event, T data) {
WsMessage<T> message = new WsMessage<>();
message.setCode(200);
message.setMsg("success");
message.setEvent(event);
message.setData(data);
return message;
}
/** 构建服务端错误响应 (code=500) */
public static <T> WsMessage<T> error(String msg) {
WsMessage<T> message = new WsMessage<>();
message.setCode(500);
message.setMsg(msg);
message.setEvent(WsEventConstants.ERROR);
return message;
}
/** 构建"未知事件类型"响应 (code=400) */
public static WsMessage<Void> unknownEvent(String event) {
WsMessage<Void> message = new WsMessage<>();
message.setCode(400);
message.setMsg("Unknown event type: " + event);
message.setEvent(WsEventConstants.ERROR);
return message;
}
}
要点:
toText()将消息序列化为 JSON,通过 WebSocket 文本帧发送data字段使用泛型T,但运行时被擦除为Object(反序列化后是 fastjson 的JSONObject),各 Handler 需自行做二次类型转换- 三种工厂方法覆盖成功、错误、未知事件三种标准响应
1.2 WsEventConstants — 事件常量
集中管理 event 字段的可取值。常量值与 WsMessageHandler#getHandlerType() 返回值一一对应,作为消息路由的 key。
public final class WsEventConstants {
private WsEventConstants() {}
/** 通用错误事件 */
public static final String ERROR = "error";
/** 客户端心跳请求 */
public static final String PING = "ping";
/** 服务端心跳响应 */
public static final String PONG = "pong";
/** 告警查询 */
public static final String ALARM = "alarm";
/** 订阅未读消息推送 */
public static final String UN_READ_INFO = "un_read_info";
}
要点:
- 新增事件类型只需在这里加常量 + 实现一个
WsMessageHandler,路由逻辑无需修改
这里的 PING / PONG 是应用层的业务心跳(JSON 消息),与 WebSocket 协议层(RFC 6455)的 Ping/Pong 控制帧是完全不同的概念——协议层的帧由 Tomcat 自动处理,应用代码无需关心。
2. 配置入口
2.1 WebSocketConfig — 端点注册
使用 Spring 原生 WebSocket(@EnableWebSocket + WebSocketConfigurer)。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Resource
private WsEndpointHandler endpointHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(endpointHandler, "/ws").setAllowedOrigins("*");
}
}
要点:
- 客户端通过
ws://host:port/ws建立连接 setAllowedOrigins("*")允许跨域
2.2 application-dev.yaml — 心跳参数
ws:
heartbeat:
expire-seconds: 20 # 20 秒未收到 ping 即判定会话失效
expire-seconds 控制 Redis 心跳 key 的 TTL。开发环境设 20 秒方便调试,生产环境应加大(如 90 秒),避免网络抖动误杀连接。
3. 会话管理
3.1 WsSessionManager — 会话生命周期 + Redis 心跳 + 死连接清理
集中管理所有 WebSocket 会话的注册、注销、心跳刷新和死连接清理。核心思路是利用 Redis key 的 TTL 机制做心跳检测——客户端发 ping 时刷新 key 的过期时间,定时任务扫描 key 已失效的 session 并清理。
/**
* WebSocket 会话状态管理器。
* <p>
* 统一管理 sessionMap(内存)、Redis 心跳(持久化)以及定时扫描清理超时会话。
*/
@Slf4j
@Component
@ConfigurationProperties(prefix = "ws.heartbeat")
public class WsSessionManager {
private static final String HEARTBEAT_KEY_PREFIX = "ws:heartbeat:";
/** Redis 心跳 key 的过期时间(秒),超过此时间未收到 ping 即判定会话失效 */
private int expireSeconds = 90;
public void setExpireSeconds(int expireSeconds) {
this.expireSeconds = expireSeconds;
}
//会话管理map
private final ConcurrentMap<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
//自定义会话关闭前置处理器
private final List<SessionCloseListener> closeListeners = new CopyOnWriteArrayList<>();
@Resource
private RedisService redisService;
/** 注册会话关闭回调,unregister 时通知;用于关闭会话前需要进行自定义逻辑处理*/
public void addCloseListener(SessionCloseListener listener) {
closeListeners.add(listener);
}
private String heartbeatKey(WebSocketSession session) {
return redisService.generateRedisKey(HEARTBEAT_KEY_PREFIX, session.getId());
}
/** 注册会话:包装为线程安全 + 加入内存 + 写入 Redis 初始心跳 */
public void register(WebSocketSession session) {
WebSocketSession safeSession = new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64 * 1024);
String id = session.getId();
sessionMap.put(id, safeSession);
redisService.set(heartbeatKey(session), LocalDateTime.now().toString(), expireSeconds);
log.info("WebSocket session registered: {}, total: {}", id, sessionMap.size());
}
/** 注销会话:通知监听器 → 关闭连接 + 从内存移除 + 删除 Redis 心跳 key */
public void unregister(WebSocketSession session) {
String id = session.getId();
//关闭前需要进行自定义逻辑处理的会话
closeListeners.forEach(l -> l.onSessionClosed(id));
sessionMap.remove(id);
redisService.delete(heartbeatKey(session));
closeQuietly(session, CloseStatus.NORMAL);
log.info("WebSocket session unregistered: {}, total: {}", id, sessionMap.size());
}
/** 刷新心跳(客户端发 ping 时调用) */
public void refreshHeartbeat(WebSocketSession session) {
redisService.set(heartbeatKey(session), LocalDateTime.now().toString(), expireSeconds);
}
/** 获取指定会话,供主动推送时使用 */
public WebSocketSession getSession(String id) {
return sessionMap.get(id);
}
/** 判断会话心跳是否有效 */
public boolean isAlive(WebSocketSession session) {
return redisService.hasKey(heartbeatKey(session));
}
/** 当前在线会话数 */
public int size() {
return sessionMap.size();
}
@PreDestroy
public void destroy() {
List<WebSocketSession> sessions = new ArrayList<>(sessionMap.values());
sessionMap.clear();
sessions.forEach(s -> {
redisService.delete(heartbeatKey(s));
closeQuietly(s, CloseStatus.GOING_AWAY);
});
log.info("All WebSocket sessions cleaned up on shutdown, total closed: {}", sessions.size());
}
@Scheduled(fixedDelay = 30_000)
public void cleanDeadSessions() {
List<WebSocketSession> deadSessions = sessionMap.values().stream()
.filter(s -> !redisService.hasKey(heartbeatKey(s)))
.collect(Collectors.toList());
deadSessions.forEach(s -> {
String id = s.getId();
closeListeners.forEach(l -> l.onSessionClosed(id));
sessionMap.remove(id);
closeQuietly(s, CloseStatus.SESSION_NOT_RELIABLE);
});
if (!deadSessions.isEmpty()) {
log.info("Cleaned {} dead sessions, remaining: {}", deadSessions.size(), sessionMap.size());
}
}
private void closeQuietly(WebSocketSession session, CloseStatus status) {
try {
if (session.isOpen()) {
session.close(status);
}
} catch (IOException e) {
log.error("Failed to close session: {}", session.getId(), e);
}
}
}
3.1.1 SessionCloseListener — 会话关闭前置处理(无需求可以不用)
自定义的会话退出前置处理器:SessionCloseListener ,有业务在会话关闭前需要进行一些自定义的逻辑处理,只需要在对应MessageHandler中实现并在初始化是添加到List<SessionCloseListener> closeListeners中,在session关闭时使用即可closeListeners.forEach(l -> l.onSessionClosed(id));
/**
* Session 关闭监听器。
* 注册到 {@link WsSessionManager#addCloseListener} 后,session 注销时回调。
*/
@FunctionalInterface
public interface SessionCloseListener {
/** session 关闭时触发,参数为 sessionId */
void onSessionClosed(String sessionId);
}
要点:
- 心跳检测流程: 客户端发 ping →
refreshHeartbeat刷新 Redis key TTL → 定时任务cleanDeadSessions扫描 key 不存在的 session → 清理 - 线程安全:
register时用ConcurrentWebSocketSessionDecorator包装 session,内部队列保证多线程 send 时串行化,避免数据交错 - 优雅退出:
@PreDestroy批量清理 +closeQuietly静默关闭,不会因个别连接关闭失败中断整体清理 @ConfigurationProperties(prefix = "ws.heartbeat")绑定 YAML 配置,expireSeconds默认可被 application.yaml 覆盖
4. 消息路由
4.1 WsMessageHandler — 处理器接口
定义消息处理器的统一契约。每新增一种事件类型,只需实现此接口并加 @Component,Spring 自动收集并注入到 MessageHandlerFactory。
public interface WsMessageHandler {
/** 返回该处理器对应的 event 类型值 */
String getHandlerType();
/**
* 处理消息。
* data 运行时类型为 fastjson 的 JSONObject(因为 WsMessage 的泛型 T 在运行时被擦除),
* 各 Handler 需自行将 data 转换为目标领域对象。
*/
void handle(WebSocketSession session, Object data) throws IOException;
}
要点:
getHandlerType()=WsEventConstants中的常量 = 客户端 JSON 的event字段,三者保持一致data参数是JSONObject,因为泛型擦除,Handler 内部需要用JSON.parseObject(JSON.toJSONString(data), Target.class)做二次转换- 接口声明
throws IOException,由上层WsEndpointHandler统一 catch
4.2 MessageHandlerFactory — 路由工厂
Spring 自动收集容器中所有 WsMessageHandler 实现类到列表,在 @PostConstruct 阶段转换为 Map<eventType, Handler>。
@Slf4j
@Component
public class MessageHandlerFactory {
@Resource
private List<WsMessageHandler> handlers;
private Map<String, WsMessageHandler> handlerMap;
@PostConstruct
public void init() {
if (handlers == null || handlers.isEmpty()) {
log.warn("No WsMessageHandler implementations found");
handlerMap = new ConcurrentHashMap<>();
return;
}
handlerMap = handlers.stream()
.collect(Collectors.toMap(WsMessageHandler::getHandlerType, Function.identity()));
log.info("Registered {} WebSocket message handlers: {}", handlerMap.size(), handlerMap.keySet());
}
public WsMessageHandler getHandler(String type) {
WsMessageHandler handler = handlerMap.get(type);
if (handler == null) {
log.warn("No handler found for event type: {}", type);
}
return handler;
}
}
要点:
- Spring 自动收集注入
handlers列表——新增 Handler 只需加@Component,无需手动注册 @PostConstruct一次性建好路由表ConcurrentHashMap保证并发读安全
5. 入口处理器
5.1 WsEndpointHandler — 连接 & 消息入口
继承 TextWebSocketHandler,是整个 WebSocket 通信的总入口。每个消息到达后的处理流程:心跳有效性检查 → JSON 解析 → 按 event 路由 Handler → 执行业务。任一环节失败都通过 safeSend 返回错误消息,不会导致连接中断。
@Slf4j
@Component
public class WsEndpointHandler extends TextWebSocketHandler {
@Resource
private MessageHandlerFactory handlerFactory;
@Resource
private WsSessionManager sessionManager;
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessionManager.register(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
sessionManager.unregister(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
// 心跳失效的会话直接注销,不再处理消息
if (!sessionManager.isAlive(session)) {
sessionManager.unregister(session);
return;
}
String payload = message.getPayload();
// 1. 解析 JSON
WsMessage<?> wsMessage;
try {
wsMessage = JSON.parseObject(payload, WsMessage.class);
} catch (Exception e) {
log.error("Failed to parse WebSocket message: {}", payload, e);
safeSend(session, WsMessage.error("Invalid message format").toText());
return;
}
// 2. 获取线程安全的 session(由 WsSessionManager 用 ConcurrentWebSocketSessionDecorator 包装)
WebSocketSession safeSession = sessionManager.getSession(session.getId());
if (safeSession == null) {
log.warn("Session not found in manager: {}", session.getId());
return;
}
// 3. 按 event 路由到对应的 Handler
WsMessageHandler handler = handlerFactory.getHandler(wsMessage.getEvent());
if (handler == null) {
safeSend(safeSession, WsMessage.unknownEvent(wsMessage.getEvent()).toText());
return;
}
// 4. 执行业务处理
try {
handler.handle(safeSession, wsMessage.getData());
} catch (Exception e) {
log.error("Handler error for event: {}, session: {}", wsMessage.getEvent(), session.getId(), e);
safeSend(safeSession, WsMessage.error("Handler error: " + e.getMessage()).toText());
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable error) {
String id = session != null ? session.getId() : "unknown";
log.error("WebSocket transport error on session: {}", id, error);
if (session != null) {
sessionManager.unregister(session);
}
}
/** 安全发送——捕获发送异常避免连接中断 */
private void safeSend(WebSocketSession session, String text) {
try {
session.sendMessage(new TextMessage(text));
} catch (Exception e) {
log.error("Failed to send message to session: {}", session.getId(), e);
}
}
}
要点:
- 消息处理四步走: 心跳检查 → 解析 → 路由 → 执行,每步有独立的错误处理
- 安全 session: 从
sessionManager.getSession(id)获取的是ConcurrentWebSocketSessionDecorator包装后的实例,所有 send 操作使用它保证线程安全 - safeSend: 发送失败只记录日志不抛异常,避免因网络抖动导致整个连接断开
- 连接/断开: 委托给
sessionManager.register/unregister,Handler 本身不持有会话状态
6. 业务处理器
6.1 HeartbeatMessageHandler — 心跳
客户端定时发送 {"event":"ping"},服务端刷新 Redis 心跳 key 的 TTL + 回复 {"event":"pong","code":200}。
@Slf4j
@Component
public class HeartbeatMessageHandler implements WsMessageHandler {
@Resource
private WsSessionManager sessionManager;
@Override
public String getHandlerType() {
return WsEventConstants.PING;
}
@Override
public void handle(WebSocketSession session, Object data) throws IOException {
sessionManager.refreshHeartbeat(session);
session.sendMessage(new TextMessage(
WsMessage.success(WsEventConstants.PONG, "pong").toText()));
}
}
要点:
refreshHeartbeat重置 Redis key 的 TTL,若客户端停止发 ping,key 过期后cleanDeadSessions会自动清理- 回复 pong 让客户端也能检测连接是否存活(客户端发 ping 后若未收到 pong 则可认为连接已断)
6.2 AlarmMessageHandler — 请求-响应模式
客户端发送 {"event":"alarm","data":{"eventId":"xxx"}},服务端查数据库返回最新告警详情。这是典型的请求-响应交互模式。
@Slf4j
@Component
public class AlarmMessageHandler implements WsMessageHandler {
@Resource
private EventService eventService;
@Override
public String getHandlerType() {
return WsEventConstants.ALARM;
}
@Override
public void handle(WebSocketSession session, Object data) throws IOException {
// data 是 JSONObject,需经过序列化→反序列化两步转换为目标类型
Event reqEvent = JSON.parseObject(JSON.toJSONString(data), Event.class);
Event alarmLatest = eventService.getById(reqEvent.getEventId());
session.sendMessage(new TextMessage(
WsMessage.success(WsEventConstants.ALARM, alarmLatest).toText()));
}
}
要点:
- 泛型擦除的处理:
data实际类型是JSONObject,不能直接强转为Event,需要toJSONString → parseObject两步完成类型转换 - 请求-响应模式:一次请求对应一次响应,Handler 执行完就结束
6.3 UnreadInfoMessageHandler — 定时推送模式
客户端发送 {"event":"un_read_info","data":{"userId":"xxx"}} 订阅推送,服务端为该 session 启动一个定时任务,每隔 2 秒查询未读消息数并推送。任务每次执行前自检 session 心跳状态,心跳超时自动取消。
/**
* 未读消息推送处理器。
* <p>
* 客户端发送 {@code {"event":"un_read_info","data":{"userId":"xxx"}}} 订阅推送,
* 服务端每 2 秒查询未读数并推送给该 session。
* <p>
* 退出无需客户端主动发消息:任务每次执行前自检 {@code isAlive()},
* session 心跳超时后自动取消。
*/
@Slf4j
@Component
public class UnreadInfoMessageHandler implements WsMessageHandler {
//推送间隔
private static final long PUSH_INTERVAL_SECONDS = 2;
//任务执行池
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
//任务管理map
private final ConcurrentMap<String, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
@Resource
private WsSessionManager sessionManager;
@Resource
private EmployeeMapper employeeMapper;
//因为需要关闭任务池,所以把停止方法作为在初始化的时候加入到sessionManager
@PostConstruct
public void init() {
sessionManager.addCloseListener(this::stopPush);
}
@Override
public String getHandlerType() {
return WsEventConstants.UN_READ_INFO;
}
@Override
public void handle(WebSocketSession session, Object data) throws IOException {
String sessionId = session.getId();
EmployeeEntity employeeEntity = JSON.parseObject(JSON.toJSONString(data), EmployeeEntity.class);
//业务校验
Long employeeId = employeeEntity.getEmployeeId();
if (ObjectUtil.isNull(employeeId)){
session.sendMessage(new TextMessage(WsMessage.error("employeeId is required").toText()));
return;
}
EmployeeVO employee = employeeMapper.getEmployeeById(employeeId);
if (ObjectUtil.isNull(employee)){
String format = String.format("employeeId:{%d} not exist", employeeId);
session.sendMessage(new TextMessage(WsMessage.error(format).toText()));
return;
}
// computeIfAbsent 保证"检查 + 创建 + 入 map"原子执行
tasks.computeIfAbsent(sessionId, id -> scheduler
.scheduleAtFixedRate(
() -> pushUnreadMessage(sessionId, session, employeeId, employee),
0,
PUSH_INTERVAL_SECONDS,
TimeUnit.SECONDS));
}
private void pushUnreadMessage(String sessionId, WebSocketSession session, Long employeeId, EmployeeVO employee) {
//检查session
if (!sessionManager.isAlive(session)) {
stopPush(sessionId);
return;
}
//业务执行
try {
int count = queryUnreadCount(employeeId);
String message = String.format("employee:name{%s} has {%d} UnReadMessage", employee.getActualName(), count);
session.sendMessage(new TextMessage(WsMessage.success(WsEventConstants.UN_READ_INFO, message).toText()));
} catch (IOException e) {
log.error("Failed to push unread count, stopping push for session: {}", sessionId, e);
stopPush(sessionId);
}
}
/**
* 会话关闭前,移除任务map 关闭推送任务
*/
private void stopPush(String sessionId) {
ScheduledFuture<?> future = tasks.remove(sessionId);
if (ObjectUtil.isNotNull(future)){
future.cancel(false);
log.info("Unread push stopped for session: {}", sessionId);
}
}
/**
* 查询用户未读消息数。TODO: 替换为实际查询逻辑。
*/
private int queryUnreadCount(Long employeeId) {
return RandomUtil.randomInt(0,100);
}
@PreDestroy
public void destroy() {
tasks.values().forEach(f -> f.cancel(true));
scheduler.shutdown();
}
}
要点:
- 重复订阅保护:
tasksMap 记录已有的推送任务,同一 session 重复发送事件时直接跳过 - 参数校验: 先校验 employeeId 非空,再校验员工是否存在,失败时返回明确的错误消息
- 优雅退出(三重保障): 1) 每次执行前
isAlive()自检,心跳超时自动stopPush;2)@PreDestroy保证应用关闭时取消所有任务并关闭线程池 ;3)通过SessionCloseListener在会话关闭时,执行该传入函数stopPush - 错误隔离: 某次推送发送失败只停止当前 session 的任务,不影响其他 session
- 使用
ScheduledExecutorService而非@Scheduled,因为任务是按 session 动态创建/销毁的,@Scheduled无法满足
7. 总结
7.1 协议层
先定通信契约,前后端、handler 之间的统一语言。
WsEventConstants— event 类型常量(ping/pong/alarm/un_read_info)WsMessage<T>— 消息载体,包含code+event+msg+data,序列化为 JSON 文本帧传输
7.2 处理器接口
定义所有业务 handler 的契约,依赖协议层。
WsMessageHandler—getHandlerType()返回 event 常量 +handle(session, data)执行业务逻辑
7.3 路由层
依赖接口,不依赖具体实现。启动时自动收集,消息到达时分发。
MessageHandlerFactory—Map<event, handler>
核心骨架完成:消息能路由,handler 可插拔。
7.4 会话管理
独立模块,只管会话生命周期。此时还没有 handler 注册进来,但回调接口已备好。
SessionCloseListener— 函数式接口,定义会话关闭的回调契约WsSessionManagerregister()— 包装ConcurrentWebSocketSessionDecorator+ 加入内存 + 写 Redis 心跳unregister()— 通知监听器 → 移除内存 → 删 Redis key → 关闭连接refreshHeartbeat()— 客户端 ping 时续期 Redis keycleanDeadSessions()— 定时扫描心跳过期的 session 并清理addCloseListener(SessionCloseListener)— 注册关闭回调
7.5 入口层
把路由和会话串起来。
WsEndpointHandlerafterConnectionEstablished→sessionManager.register()afterConnectionClosed→sessionManager.unregister()handleTextMessage→ JSON 解析 → 路由到 handler → handler.handle()handleTransportError→ 兜底清理
7.6 配置
挂载到 Spring 容器,此时服务可启动、消息可分发,只是还没有具体业务 handler。
WebSocketConfig—@EnableWebSocket+ 注册/ws端点
7.7 业务 handler(最后,逐个迭代)
每加一个 handler 加 @Component 注解,MessageHandlerFactory 自动收录,零耦合。
HeartbeatMessageHandler— ping → pong,刷新 Redis 心跳AlarmMessageHandler— 查询告警详情并返回UnreadInfoMessageHandler— 订阅推送,注册SessionCloseListener做主动清理
8. 架构总览
客户端
│ ws://host:port/ws
▼
WebSocketConfig ← 注册端点 /ws
│
▼
WsEndpointHandler ← 连接建立 / 关闭 / 消息入口
├── WsSessionManager ← 会话生命周期 + Redis 心跳 + 死连接清理
└── MessageHandlerFactory ← event → Handler 路由(O(1) 查找)
│
├── HeartbeatMessageHandler ← 心跳 ping/pong,刷新 Redis TTL
├── AlarmMessageHandler ← 请求-响应模式:查询告警
└── UnreadInfoMessageHandler ← 推送模式:定时推送未读数
包结构
com.java.ws
├── WsMessage.java ← 消息协议载体
├── WsEventConstants.java ← 事件常量
├── config
│ ├── WebSocketConfig.java ← 端点注册
│ ├── WsSessionManager.java ← 会话管理器
│ └── MessageHandlerFactory.java ← 消息路由工厂
└── handler
├── SessionCloseListener.java ←自定义会话关闭前置处理
├── WsMessageHandler.java ← 处理器接口
├── WsEndpointHandler.java ← 入口处理器
└── message
├── HeartbeatMessageHandler.java ← 心跳处理器
├── AlarmMessageHandler.java ← 告警查询处理器
└── UnreadInfoMessageHandler.java ← 定时推送处理器
更多推荐



所有评论(0)