1.pom.xml

 <!--websocket-->
 <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-websocket</artifactId>
   </dependency>

2.登录方法中登录成功后将用户账号存入session

HttpSession session = request.getSession();
ServletContext application = session.getServletContext();
Integer userId = ShiroUtils.getUserId();
session.setAttribute("userId", userId);
Set<Integer> onlineUserSet = (Set)application.getAttribute("onlineUserSet");
onlineUserSet.add(userId);
application.setAttribute("onlineUserSet", onlineUserSet);
//System.out.println("当前在线用户:"+onlineUserSet.toString());

3.监听session变化判断登录用户人数和用户id

@WebListener
@Component
public class UserStatisticsListener implements HttpSessionListener,ServletContextListener {

    private ServletContext application = null;
    public static HttpSession httpSession = null;

    @Override
    public void contextDestroyed(ServletContextEvent s) {
        System.out.println("context Destroyed");
    }

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        application = sce.getServletContext();
        Set<Integer> onlineUserSet = new HashSet<>();
        application.setAttribute("onlineUserSet", onlineUserSet);
        System.out.println("onlineUserSet2==>"+onlineUserSet);
    }

    /**
     * 创建session
     * @param se
     */
    @Override
    public void sessionCreated(HttpSessionEvent se) {
        httpSession = se.getSession();
        System.out.println("sessionId==>"+se.getSession().getId());
    }

    /**
     * 销毁session
     * @param se
     */
    @Override
    public void sessionDestroyed(HttpSessionEvent se) {
        System.out.println("销毁session");

        HttpSession session = se.getSession();
        Set<Integer> onlineUserSet = (Set<Integer>)application.getAttribute("onlineUserSet");
        Integer userId = (Integer)session.getAttribute("userId");
        onlineUserSet.remove(userId);
        application.setAttribute("onlineUserSet",onlineUserSet);

        onlineUserSet = (Set<Integer>)application.getAttribute("onlineUserSet");
        System.out.println("当前在线用户:"+onlineUserSet.toString());
        System.out.println(userId + "-->退出");
        //推送所有在线用户现有的在线用户id
        if(!CollectionUtils.isEmpty(onlineUserSet)){
            WebSocketConfiguration.sendInfo(onlineUserSet.toString(),onlineUserSet);
        }

    }
}

4.建立websocket连接

  1. 需要自定义HttpSessionConfigurator将Session
  2. 转化成HttpSession,否则websocket的sessionId和登录后存储的sessionId不一致,无法监听退出后的操作
    注意: 在使用@ServerEndpoint时如果项目引入aop则会出现启动异常,可
    在WebLogAspect 类中加入不扫描的包或类,如下:

    !execution(public * com.demo.WebSocketConfiguration…*(…))
@Aspect
@Component
public class WebLogAspect { /**
     /** 
     * 扫描的包,类
     */
    @Pointcut("execution(public * com.demo.*.*..*(..)) && !execution(public * com.demo.WebSocketConfiguration..*(..)) ")
    public void webLog(){}
    ......
    }
@Component
@ServerEndpoint(value = "/websocket/{userId}",configurator = HttpSessionConfigurator.class)
@Slf4j
public class WebSocketConfiguration {
   /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     * <p>
     * Map的key为userId,List用于存放一个userId的多个终端,比如同一个userId,同时在手机端和PC登陆
     */
    private static ConcurrentHashMap<Integer, List<WebSocketConfiguration>> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private Integer userId = null;

    private HttpSession httpSession = null;

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,EndpointConfig config, @PathParam("userId") Integer userId) {
        if(null != userId){
            this.session = session;
            this.userId = userId;
            //httpSession赋值和登录使用同一个session
            this.httpSession = (HttpSession)config.getUserProperties().get(HttpSession.class.getName());
            List<WebSocketConfiguration> servers;
            List<WebSocketConfiguration> webSocketServers = new ArrayList<>();
            //将接入的客户端信息添加到内存
            if (webSocketMap.containsKey(userId)) {
                //查询当前userId以及当前的session是否已经存在,如果存在,先移除再新增,如果不存在,直接新增
                webSocketServers = webSocketMap.get(userId).stream().filter(o -> o.session.getId().equals(session.getId())).collect(Collectors.toList());
            }
            if (webSocketMap.containsKey(userId) && webSocketServers.size() > 0) {
                webSocketServers = webSocketMap.get(userId);
                webSocketServers.removeIf(webSocketServer -> webSocketServer.session.getId().equals(session.getId()));
                servers = webSocketServers;
                servers.add(this);
                webSocketMap.put(userId, servers);
            } else {
                servers = null == webSocketMap.get(userId) ? new ArrayList<>() : webSocketMap.get(userId);
                servers.add(this);
                webSocketMap.put(userId, servers);
            }
            log.info("用户【" + userId + "】sessionId:[" + session.getId() + "]连接成功");
            //推送给所有在线用户现在已上线的用户id
            if(null != this.httpSession){
                Set<Integer> onlineUserSet = (Set)this.httpSession.getServletContext().getAttribute("onlineUserSet");
                sendInfo(onlineUserSet.toString(),onlineUserSet);
            }else {
                sendInfo(userId);
            }
        }
    }


    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        List<WebSocketConfiguration> webSocketServers = new ArrayList<>();
        if (webSocketMap.containsKey(userId)) {
            webSocketServers = webSocketMap.get(userId).stream().filter(o -> o.session.getId().equals(session.getId())).collect(Collectors.toList());
        }
        if (webSocketMap.containsKey(userId) && webSocketServers.size() > 0) {
            webSocketServers = webSocketMap.get(userId);
            Iterator<WebSocketConfiguration> iterator = webSocketServers.iterator();
            while (iterator.hasNext()) {
                if (iterator.next().session.getId().equals(session.getId())) {
                    iterator.remove();
                }
            }
            webSocketMap.put(userId, webSocketServers);
            log.info("用户【" + userId + "】sessionId:[" + session.getId() + "]断开连接" );
        }
        //调用销毁session
        if(null != userId && null != this.httpSession){
           // this.httpSession.invalidate();
            //发送断开消息
            sendInfo(userId);
        }
        HttpSession httpSession1 = this.httpSession;

		//todo 待优化,暂时用Timer判定销毁session,优点:可以和前端交互判定浏览器关闭后的session销毁及及时推送给其他用户;缺点:前端自动退出登录后会执行两次销毁程序,则此时会有空指针异常抛出,但不影响程序使用
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("隔1分钟执行销毁session....");
                if(webSocketMap.containsKey(userId)){
                    List<WebSocketConfiguration> webSocketServers = webSocketMap.get(userId);
                    if(CollectionUtils.isEmpty(webSocketServers)){
                        if(null != httpSession1){
                            System.out.println("销毁...");
                            httpSession1.invalidate();
                        }
                    }
                }
            }
        }, 1000 * 60);

    }


    /**
     * 收到客户端消息后调用的方法
     * 前端发送心跳包判断后台程序是否关闭
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户【" + userId + "】sessionId:[" + session.getId() + "]发送消息给服务端:" + message);
        try {
            //判断userId是否存在
            if(webSocketMap.containsKey(userId)){
                sendMessage("ok");
            }else {
            //退送断线消息MessageUtils.Offline自定义用于前端判断,如:String Offline = "404session失效"
                sendMessage(MessageUtils.Offline);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * @param error
     */
    @OnError
    public void onError(Throwable error) {
        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 发送自定义消息
     */
    public static void sendInfo(String message, @PathParam("userIds") Set<Integer> userIds){
        log.info(JSON.toJSONString("所有客户端:" + webSocketMap));
        if (!CollectionUtils.isEmpty(userIds)) {
            userIds.forEach(i->{
                if(webSocketMap.containsKey(i)){
                    for (WebSocketConfiguration webSocketServer : webSocketMap.get(i)) {
                        try {
	                        //哪个终端有消息,就推送哪个
							//	if (sessionId.equals(webSocketServer.session.getId())) {
			                //    	webSocketServer.sendMessage(message);
			                //	}
			                //对该用户的所有终端都推送无需判断sessionId
                            webSocketServer.sendMessage(message);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    }


    /**
     * 发送自定义消息
     */
    public static void sendInfo(@PathParam("userId") Integer userId){
        if (userId != null && webSocketMap.containsKey(userId)) {
            for (WebSocketConfiguration webSocketServer : webSocketMap.get(userId)) {
                try {webSocketServer.sendMessage(MessageUtils.Offline);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } else {
            log.error("用户" + userId + ",不在线!");
        }
    }

}

5.创建HttpSessionConfigurator.class

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;

public class HttpSessionConfigurator extends ServerEndpointConfig.Configurator{

    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        if(null != httpSession){
            sec.getUserProperties().put(HttpSession.class.getName(), httpSession);
        }
    }
}

6.创建WebSocketConfig

自动注册使用@ServerEndpoint注解声明的websocket endpoint

@Configuration
public class WebSocketConfig {
    /**
     * ServerEndpointExporter 作用
     * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

7.vue部分websocket.js

import {Notification} from 'element-ui'

var url =  'ws://' + location.host + '/socket/websocket/'
var ws;
var tt;
var lockReconnect = false;//避免重复连接
var clientId = localStorage.getItem("clientId")//缓存中取出客户端id
let count = 0;
//连接失败3次后会自动跳转到登录页面
var websocket = {
  Init: function(clientId) {
    this.clientId = localStorage.getItem("clientId")
    if ("WebSocket" in window) {
      ws = new WebSocket(url + clientId);
    } else if ("MozWebSocket" in window) {
      ws = new MozWebSocket(url + clientId);
    } else {
      console.log("您的浏览器不支持 WebSocket!");
      return;
    }

    ws.onmessage = function(e) {
      messageHandle(e.data)
      heartCheck.start()
    }

    ws.onclose = function() {
      console.log("连接已关闭")
      localStorage.clear()
      if(count <= 3){
        reconnect(clientId);
      }else {
        Notification({
          title: '错误',
          message: '连接已关闭',
          type: 'error',
        });
        window.location.reload();
      }
    }

    ws.onopen = function(e) {
      console.log("连接成功")
      messageHandle(e.data)
    }

    ws.onerror = function(e) {
      console.log("数据传输发生错误,后台服务关闭");
      if(count <= 3){
        reconnect(clientId);
      }else {
        Notification({
          title: '错误',
          message: '数据传输发生错误,服务关闭或网络不通!',
          type: 'error',
        });
        localStorage.clear()
        window.location.reload();
      }
      messageHandle(undefined)
    }
  },
  Send:function(sender,reception,body,flag){
    let data = {
      sender:sender,
      reception:reception,
      body:body,
      flag:flag
    }
    let msg= JSON.stringify(data)
    ws.send(msg)
  },
  getWebSocket(){
    return ws;
  },
  getStatus() {
    if (ws.readyState == 0) {
      return "未连接";
    } else if (ws.readyState == 1) {
      return "已连接";
    } else if (ws.readyState == 2) {
      return "连接正在关闭";
    } else if (ws.readyState == 3) {
      return "连接已关闭";
    }
  }
}

export default websocket;

//根据消息标识做不同的处理
function messageHandle(message) {
  console.log(message,"msg")
  switch (message) {
    case '404session失效':
      //关闭连接跳转登录
      ws.close();
      break;
    case undefined :
      //连接失败
      count++;
      break;
    case 'ok' :
      //心跳消息成功
      count = 0;
      break;
    default:
      let msg = JSON.parse(message)
      //list通知其他用户现在在线的人的id,存储到onlineUser
      localStorage.setItem("onlineUser",msg)
      count = 0
      heartCheck.start();
  }
}

function reconnect(sname) {
  if(lockReconnect) {
    return;
  };
  lockReconnect = true;
  //没连接上会一直重连,设置延迟避免请求过多
  tt && clearTimeout(tt);
  tt = setTimeout(function () {
    console.log("执行断线重连...")
    websocket.Init(sname);
    lockReconnect = false;
  }, 5000);
}

//心跳检测
var heartCheck = {
  timeout: 1000 * 10 * 3, //3 = 30s
  timeoutObj: null,
  serverTimeoutObj: null,
  start: function(){
    var self = this;
    this.timeoutObj && clearTimeout(this.timeoutObj);
    this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
    this.timeoutObj = setTimeout(function(){
      //这里发送一个心跳,后端收到后,返回一个心跳消息,
      //onmessage拿到返回的心跳就说明连接正常
      console.log('心跳检测...');
      ws.send("发送心跳检测:"+ clientId );
      self.serverTimeoutObj = setTimeout(function() {
        if(ws.readyState!==1){
          ws.close();
        }
      }, self.timeout);

    }, this.timeout)
  }
}


8.App.vue页面全局引入websocket

import websocket from '@/config/websocket'
 created: function () {
      //从localStorage中获取用户信息,是登陆状态则能够进行webSocket重连
      let onlineUser = localStorage.getItem('onlineUser');
      if(onlineUser){
        let id = localStorage.getItem("clientId")
        console.log(id,"info")
        if(null != id){
          this.initWebsocket(id)
        }
      }
    },
    methods: {
   	//调用方法
      initWebsocket(id){
        websocket.Init(id)
      }
    }

9.登录页面直接调用父组件Vue的 initWebsocket方法

login.vue 在登录成功存储及调用

localStorage.setItem("clientId",token.userInfo.id)
this.$parent.initWebsocket(token.userInfo.id)

10.页面刷新Websocket会断开,全局监听重连Websocket

main.js监听localStorage值的变化

var orignalSetItem = localStorage.setItem;
localStorage.setItem = function(key,newValue){
  var setItemEvent = new Event("setItemEvent");
  setItemEvent.key = key
  setItemEvent.newValue = newValue;
  window.dispatchEvent(setItemEvent);
  orignalSetItem.apply(this,arguments);
}

11.页面应用

在需要应用的页面比如user.vue使用

 <el-table-column  label="用户在线状态" width="100">
   <template slot-scope="scope">
      <span>{{isOnline(scope.row.id)}}</span>
    </template>
  </el-table-column>

 created() {
      let onlineUser = localStorage.getItem('onlineUser')
      if(onlineUser instanceof Array){
        this.online = onlineUser
      }else {
        if(onlineUser.indexOf(",") !== -1){
          let arr = onlineUser.split(",")
          this.online = arr
        }else {
          let arr = []
          arr.push(onlineUser)
          this.online = arr
        }
      }
    },
    computed: {
      isOnline(){
        return function (val) {
          return this.online.findIndex(item => item == val) !== -1
        }
      }
    },
    mounted() {
      //监听在线用户人数变化id
      window.addEventListener('setItemEvent', (e)=> {
       // let newVal = localStorage.getItem('onlineUser');
        if(e.key==="onlineUser"){
          this.online = e.newValue
        }
      })
    },

最终的最终提示

退出登录时一定要清除localStorage

  • localStorage.clear()

socket代理配置

  • dev: {
    ‘/socket’: {
    //webSocket代理
    target: ‘ws://127.0.0.1:8888’, // 内网ip
    ws:true,//开启ws, 如果是http代理此处可以不用设置
    changeOrigin: true,
    pathRewrite: {
    ‘^/socket’: ‘/’
    }
    }
    }
Logo

前往低代码交流专区

更多推荐