vue+springboot+sockJs+stomp实现websocket消息推送
上次写过一篇原生Websocket实现的消息推送的文章传送门。今天讲一下使用sockJS+stomp实现上述功能首先还是从服务端说起1、引入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</art
·
上次写过一篇原生Websocket实现的消息推送的文章传送门。
今天讲一下使用sockJS+stomp实现上述功能
首先还是从服务端说起
1、 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、 websocket的基本配置包括前端接收通知的地址和前端连接时的地址(“/ricky- websocket”),
使用@Bean注解实列化新增连接、断开连接、websocketSession注册类(下面会详细说)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// /users 默认通知
config.enableSimpleBroker("/topic", "/client");
config.setApplicationDestinationPrefixes("/app");
//设置前缀 默认是user 可以修改 点对点时使用
//config.setUserDestinationPrefix("/websocket/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ricky-websocket").setAllowedOrigins("*") .withSockJS();
}
@Bean
public SocketSessionRegistry SocketSessionRegistry() {
//websocketSession注册
return new SocketSessionRegistry();
}
@Bean
public STOMPConnectEventListener STOMPConnectEventListener() {
//新增连接
return new STOMPConnectEventListener();
}
@Bean
public STOMPDisconnectEventListener STOMPDisconnectEventListener(){
//断开连接
return new STOMPDisconnectEventListener();
}
3、 新增连接的处理
public class STOMPConnectEventListener implements ApplicationListener<SessionConnectEvent> {
@Autowired
SocketSessionRegistry webAgentSessionRegistry;
@Override
public void onApplicationEvent(SessionConnectEvent event) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
Principal principal = sha.getUser();
if(principal != null && principal instanceof OAuth2Authentication){
OAuth2Authentication authentication = (OAuth2Authentication) principal;
String companyId = SecurityUtils.getTenantId((OAuth2Authentication)sha.getUser());
//接收前端连接时传递的参数:userId
String userId = sha.getNativeHeader("userId").get(0);
String userName = principal.getName();
//String sessionId = sha.getSessionId();
webAgentSessionRegistry.registerSessionId(companyId,userId,userName);
}
}
}
public class SocketSessionRegistry {
//this map save every session
//这个集合存储session
private final ConcurrentMap<String, String> companyUserSessionIds = new ConcurrentHashMap<>();
private final Object lock = new Object();
public SocketSessionRegistry() {
}
/**
* 获取所有session
*
* @return t
*/
public ConcurrentMap<String, String> getAllSessionIds() {
return this.companyUserSessionIds;
}
//将新加入的连接添加进map
public void registerSessionId(String companyId, String userId, String userName) {
Assert.notNull(userName, "userName must not be null");
Assert.notNull(companyId, "Company ID must not be null");
Assert.notNull(userId, "User ID must not be null");
synchronized (this.lock) {
companyUserSessionIds.put(companyId + "-" + userId, userName);
}
}
//断开连接时从map中移除
public void unregisterSessionId(String userName) {
Assert.notNull(userName, "userName must not be null");
synchronized (this.lock) {
if(companyUserSessionIds.size()>0){
String key = companyUserSessionIds.entrySet().stream().filter(t -> StringUtils.equals(t.getValue(), userName)).findFirst().get().getKey();
if(StringUtils.isNotBlank(key)){
companyUserSessionIds.remove(key);
}
}
// System.out.println("Disconnect: " + map);
}
}
4、断开连接的处理类
public class STOMPDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {
@Autowired
SocketSessionRegistry webAgentSessionRegistry;
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
Principal principal = sha.getUser();
if (principal != null && principal instanceof OAuth2Authentication) {
webAgentSessionRegistry.unregisterSessionId(principal.getName());
}
}
}
5、向前端推送消息的示例
ConcurrentMap<String, String> sessionIds = webAgentSessionRegistry.getAllSessionIds();
if(sessionIds.size() == 0){
return true;
}
if(contactIds!=null && contactIds.size()>0){
contactIds.forEach(id->{
if(sessionIds.get(id)!=null){
//主要是调用这个方法
template.convertAndSendToUser (sessionIds.get(id),"/client", “消息内容或消息对象的JSON字符串”);
}
});
}
下面开始说前端的一些配置和处理
1、 引入stomp和sockjs
import SockJS from 'sockjs-client'
import Stomp from 'stompjs'
vue 安装sockJS和stomp:
npm install sockjs-client --save
npm install stompjs --save
2、创建websocket连接
var stompClient = null
var websocket = null
var lockReconnect = false // 避免ws重复连接
var reconnect_times = 0
createStompWebSocket({ state, commit }, payload) {
if (stompClient && stompClient.ws.readyState === WebSocket.OPEN) {
console.log('stomp still opening')
return
}
var companyId = sessionStorage.getItem('companyId')
var userId = sessionStorage.getItem('userId')
if (!companyId || !userId || companyId === 'undefined') { return }
const url = `/api/notification/ricky-websocket?access_token=${getToken()}`//创建连接的url(携带token)
var socket = new SockJS(url)
stompClient = Stomp.over(socket)
stompClient.connect({ userId: userId }, connectCallback, errorCallback)
function connectCallback(frame) {
reconnect_times = 0
stompClient.subscribe('/user/client', function(greeting) {
var obj = JSON.parse(greeting.body)
//TODO
//接收到消息的处理
})
}
function errorCallback(frame) {
console.log('error connect' + frame)
payload.componentRef.reconnect()
}
window.onbeforeunload = function() {
if (stompClient.ws.readyState === 1) {
stompClient.disconnect(function() {
stompClient.ws.close()
console.log('close websocket')
})
}
}
},
3、 发生异常断开时触发重连
reconnect({ state, commit }, payload) {
if (lockReconnect) return
if (reconnect_times <= 5) {
if (!getToken()) { return }
setTimeout(() => {
// 如果断开,每隔一分钟重连一次
console.log('reconnect_times==' + reconnect_times)
lockReconnect = true
payload.componentRef.createStompWebSocket()
reconnect_times++
lockReconnect = false
}, 60000)
}
},
差不多就是这样了,有任何建议可以评论留言
更多推荐
已为社区贡献2条内容
所有评论(0)