上次写过一篇原生Websocket实现的消息推送的文章传送门
今天讲一下使用sockJS+stomp实现上述功能
首先还是从服务端说起
1、 引入依赖

		 <dependency>
		    <groupId>org.springframework.boot</groupId>
		    <artifactId>spring-boot-starter-websocket</artifactId>
		 </dependency>

2、 websocket的基本配置包括前端接收通知的地址和前端连接时的地址(“/ricky- websocket”),
使用@Bean注解实列化新增连接、断开连接、websocketSession注册类(下面会详细说)

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // /users 默认通知
        config.enableSimpleBroker("/topic", "/client");
        config.setApplicationDestinationPrefixes("/app");
        //设置前缀  默认是user 可以修改  点对点时使用
        //config.setUserDestinationPrefix("/websocket/");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ricky-websocket").setAllowedOrigins("*") .withSockJS();
	}

 	@Bean
    public SocketSessionRegistry SocketSessionRegistry() {
    	//websocketSession注册
        return new SocketSessionRegistry();
    }

    @Bean
    public STOMPConnectEventListener STOMPConnectEventListener() {
    	//新增连接
        return new STOMPConnectEventListener();
    }
    @Bean
    public STOMPDisconnectEventListener STOMPDisconnectEventListener(){
    	//断开连接
        return new STOMPDisconnectEventListener();
    }

3、 新增连接的处理

public class STOMPConnectEventListener  implements ApplicationListener<SessionConnectEvent> {

    @Autowired
    SocketSessionRegistry webAgentSessionRegistry;

    @Override
    public void onApplicationEvent(SessionConnectEvent event) {
        StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
        Principal principal = sha.getUser();
        if(principal != null && principal instanceof OAuth2Authentication){
            OAuth2Authentication authentication = (OAuth2Authentication) principal;
            String companyId = SecurityUtils.getTenantId((OAuth2Authentication)sha.getUser());
            //接收前端连接时传递的参数:userId
            String userId = sha.getNativeHeader("userId").get(0);
            String userName = principal.getName();
            //String sessionId = sha.getSessionId();
            webAgentSessionRegistry.registerSessionId(companyId,userId,userName);
        }
    }
}
public class SocketSessionRegistry {
    //this map save every session
    //这个集合存储session
    private final ConcurrentMap<String, String> companyUserSessionIds = new ConcurrentHashMap<>(); 

    private final Object lock = new Object();

    public SocketSessionRegistry() {
    }
       /**
     * 获取所有session
     *
     * @return t
     */
    public ConcurrentMap<String, String> getAllSessionIds() {
        return this.companyUserSessionIds;
    }

//将新加入的连接添加进map
 public void registerSessionId(String companyId, String userId, String userName) {
        Assert.notNull(userName, "userName must not be null");
        Assert.notNull(companyId, "Company ID must not be null");
        Assert.notNull(userId, "User ID must not be null");

        synchronized (this.lock) {   
           companyUserSessionIds.put(companyId + "-" + userId, userName);
        }
    }
//断开连接时从map中移除
  public void unregisterSessionId(String userName) {
        Assert.notNull(userName, "userName must not be null");
        synchronized (this.lock) {
          	
            if(companyUserSessionIds.size()>0){
                String key = companyUserSessionIds.entrySet().stream().filter(t -> StringUtils.equals(t.getValue(), userName)).findFirst().get().getKey();
                if(StringUtils.isNotBlank(key)){
                    companyUserSessionIds.remove(key);
                }
            }

          //  System.out.println("Disconnect: " + map);
        }
    }

4、断开连接的处理类

public class STOMPDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {

    @Autowired
    SocketSessionRegistry webAgentSessionRegistry;

    @Override
    public void onApplicationEvent(SessionDisconnectEvent event) {
        StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
        Principal principal = sha.getUser();
        if (principal != null && principal instanceof OAuth2Authentication) {
           webAgentSessionRegistry.unregisterSessionId(principal.getName());
        }
    }
}

5、向前端推送消息的示例

 ConcurrentMap<String, String> sessionIds = webAgentSessionRegistry.getAllSessionIds();
        if(sessionIds.size() == 0){
            return true;
        }
        if(contactIds!=null && contactIds.size()>0){
            contactIds.forEach(id->{
                if(sessionIds.get(id)!=null){
                //主要是调用这个方法
                    template.convertAndSendToUser (sessionIds.get(id),"/client", “消息内容或消息对象的JSON字符串”);
                }
            });
        }

下面开始说前端的一些配置和处理
1、 引入stomp和sockjs

import SockJS from 'sockjs-client'
import Stomp from 'stompjs'

vue 安装sockJS和stomp:

npm install sockjs-client --save
npm install stompjs --save

2、创建websocket连接

var stompClient = null
var websocket = null
var lockReconnect = false // 避免ws重复连接
var reconnect_times = 0
createStompWebSocket({ state, commit }, payload) {
    if (stompClient && stompClient.ws.readyState === WebSocket.OPEN) {
      console.log('stomp still opening')
      return
    }
    var companyId = sessionStorage.getItem('companyId')
    var userId = sessionStorage.getItem('userId')
    if (!companyId || !userId || companyId === 'undefined') { return }
    const url = `/api/notification/ricky-websocket?access_token=${getToken()}`//创建连接的url(携带token)
    var socket = new SockJS(url)
    stompClient = Stomp.over(socket)
    stompClient.connect({ userId: userId }, connectCallback, errorCallback)
    function connectCallback(frame) { 
      reconnect_times = 0
      stompClient.subscribe('/user/client', function(greeting) {       
        var obj = JSON.parse(greeting.body)
        //TODO
        //接收到消息的处理
      })
    }
    function errorCallback(frame) {
      console.log('error connect' + frame)
      payload.componentRef.reconnect()
    }
    window.onbeforeunload = function() {
      if (stompClient.ws.readyState === 1) {
        stompClient.disconnect(function() {
          stompClient.ws.close()
          console.log('close websocket')
        })
      }
    } 
    
  },

3、 发生异常断开时触发重连

reconnect({ state, commit }, payload) {
    if (lockReconnect) return
    if (reconnect_times <= 5) {
      if (!getToken()) { return }
      setTimeout(() => {
        // 如果断开,每隔一分钟重连一次
        console.log('reconnect_times==' + reconnect_times)
        lockReconnect = true
        payload.componentRef.createStompWebSocket()
        reconnect_times++
        lockReconnect = false
      }, 60000)
    }
  },

差不多就是这样了,有任何建议可以评论留言

Logo

前往低代码交流专区

更多推荐