springboot-vue-websocket实现用户列表在线离线监听,用户下线实时推送
java实现类似好友列表在线离线状态实时检测,用户状态实时检测,监听浏览器关闭销毁session,websocket长链接全局登录时调用,也可应用于实时聊天并判断好友在线离线
·
1.pom.xml
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.登录方法中登录成功后将用户账号存入session
HttpSession session = request.getSession();
ServletContext application = session.getServletContext();
Integer userId = ShiroUtils.getUserId();
session.setAttribute("userId", userId);
Set<Integer> onlineUserSet = (Set)application.getAttribute("onlineUserSet");
onlineUserSet.add(userId);
application.setAttribute("onlineUserSet", onlineUserSet);
//System.out.println("当前在线用户:"+onlineUserSet.toString());
3.监听session变化判断登录用户人数和用户id
@WebListener
@Component
public class UserStatisticsListener implements HttpSessionListener,ServletContextListener {
private ServletContext application = null;
public static HttpSession httpSession = null;
@Override
public void contextDestroyed(ServletContextEvent s) {
System.out.println("context Destroyed");
}
@Override
public void contextInitialized(ServletContextEvent sce) {
application = sce.getServletContext();
Set<Integer> onlineUserSet = new HashSet<>();
application.setAttribute("onlineUserSet", onlineUserSet);
System.out.println("onlineUserSet2==>"+onlineUserSet);
}
/**
* 创建session
* @param se
*/
@Override
public void sessionCreated(HttpSessionEvent se) {
httpSession = se.getSession();
System.out.println("sessionId==>"+se.getSession().getId());
}
/**
* 销毁session
* @param se
*/
@Override
public void sessionDestroyed(HttpSessionEvent se) {
System.out.println("销毁session");
HttpSession session = se.getSession();
Set<Integer> onlineUserSet = (Set<Integer>)application.getAttribute("onlineUserSet");
Integer userId = (Integer)session.getAttribute("userId");
onlineUserSet.remove(userId);
application.setAttribute("onlineUserSet",onlineUserSet);
onlineUserSet = (Set<Integer>)application.getAttribute("onlineUserSet");
System.out.println("当前在线用户:"+onlineUserSet.toString());
System.out.println(userId + "-->退出");
//推送所有在线用户现有的在线用户id
if(!CollectionUtils.isEmpty(onlineUserSet)){
WebSocketConfiguration.sendInfo(onlineUserSet.toString(),onlineUserSet);
}
}
}
4.建立websocket连接
- 需要自定义HttpSessionConfigurator将Session
- 转化成HttpSession,否则websocket的sessionId和登录后存储的sessionId不一致,无法监听退出后的操作
注意: 在使用@ServerEndpoint时如果项目引入aop则会出现启动异常,可
在WebLogAspect 类中加入不扫描的包或类,如下:
!execution(public * com.demo.WebSocketConfiguration…*(…))
@Aspect
@Component
public class WebLogAspect { /**
/**
* 扫描的包,类
*/
@Pointcut("execution(public * com.demo.*.*..*(..)) && !execution(public * com.demo.WebSocketConfiguration..*(..)) ")
public void webLog(){}
......
}
@Component
@ServerEndpoint(value = "/websocket/{userId}",configurator = HttpSessionConfigurator.class)
@Slf4j
public class WebSocketConfiguration {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
* <p>
* Map的key为userId,List用于存放一个userId的多个终端,比如同一个userId,同时在手机端和PC登陆
*/
private static ConcurrentHashMap<Integer, List<WebSocketConfiguration>> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private Integer userId = null;
private HttpSession httpSession = null;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session,EndpointConfig config, @PathParam("userId") Integer userId) {
if(null != userId){
this.session = session;
this.userId = userId;
//httpSession赋值和登录使用同一个session
this.httpSession = (HttpSession)config.getUserProperties().get(HttpSession.class.getName());
List<WebSocketConfiguration> servers;
List<WebSocketConfiguration> webSocketServers = new ArrayList<>();
//将接入的客户端信息添加到内存
if (webSocketMap.containsKey(userId)) {
//查询当前userId以及当前的session是否已经存在,如果存在,先移除再新增,如果不存在,直接新增
webSocketServers = webSocketMap.get(userId).stream().filter(o -> o.session.getId().equals(session.getId())).collect(Collectors.toList());
}
if (webSocketMap.containsKey(userId) && webSocketServers.size() > 0) {
webSocketServers = webSocketMap.get(userId);
webSocketServers.removeIf(webSocketServer -> webSocketServer.session.getId().equals(session.getId()));
servers = webSocketServers;
servers.add(this);
webSocketMap.put(userId, servers);
} else {
servers = null == webSocketMap.get(userId) ? new ArrayList<>() : webSocketMap.get(userId);
servers.add(this);
webSocketMap.put(userId, servers);
}
log.info("用户【" + userId + "】sessionId:[" + session.getId() + "]连接成功");
//推送给所有在线用户现在已上线的用户id
if(null != this.httpSession){
Set<Integer> onlineUserSet = (Set)this.httpSession.getServletContext().getAttribute("onlineUserSet");
sendInfo(onlineUserSet.toString(),onlineUserSet);
}else {
sendInfo(userId);
}
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
List<WebSocketConfiguration> webSocketServers = new ArrayList<>();
if (webSocketMap.containsKey(userId)) {
webSocketServers = webSocketMap.get(userId).stream().filter(o -> o.session.getId().equals(session.getId())).collect(Collectors.toList());
}
if (webSocketMap.containsKey(userId) && webSocketServers.size() > 0) {
webSocketServers = webSocketMap.get(userId);
Iterator<WebSocketConfiguration> iterator = webSocketServers.iterator();
while (iterator.hasNext()) {
if (iterator.next().session.getId().equals(session.getId())) {
iterator.remove();
}
}
webSocketMap.put(userId, webSocketServers);
log.info("用户【" + userId + "】sessionId:[" + session.getId() + "]断开连接" );
}
//调用销毁session
if(null != userId && null != this.httpSession){
// this.httpSession.invalidate();
//发送断开消息
sendInfo(userId);
}
HttpSession httpSession1 = this.httpSession;
//todo 待优化,暂时用Timer判定销毁session,优点:可以和前端交互判定浏览器关闭后的session销毁及及时推送给其他用户;缺点:前端自动退出登录后会执行两次销毁程序,则此时会有空指针异常抛出,但不影响程序使用
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("隔1分钟执行销毁session....");
if(webSocketMap.containsKey(userId)){
List<WebSocketConfiguration> webSocketServers = webSocketMap.get(userId);
if(CollectionUtils.isEmpty(webSocketServers)){
if(null != httpSession1){
System.out.println("销毁...");
httpSession1.invalidate();
}
}
}
}
}, 1000 * 60);
}
/**
* 收到客户端消息后调用的方法
* 前端发送心跳包判断后台程序是否关闭
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户【" + userId + "】sessionId:[" + session.getId() + "]发送消息给服务端:" + message);
try {
//判断userId是否存在
if(webSocketMap.containsKey(userId)){
sendMessage("ok");
}else {
//退送断线消息MessageUtils.Offline自定义用于前端判断,如:String Offline = "404session失效"
sendMessage(MessageUtils.Offline);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @param error
*/
@OnError
public void onError(Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送自定义消息
*/
public static void sendInfo(String message, @PathParam("userIds") Set<Integer> userIds){
log.info(JSON.toJSONString("所有客户端:" + webSocketMap));
if (!CollectionUtils.isEmpty(userIds)) {
userIds.forEach(i->{
if(webSocketMap.containsKey(i)){
for (WebSocketConfiguration webSocketServer : webSocketMap.get(i)) {
try {
//哪个终端有消息,就推送哪个
// if (sessionId.equals(webSocketServer.session.getId())) {
// webSocketServer.sendMessage(message);
// }
//对该用户的所有终端都推送无需判断sessionId
webSocketServer.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
}
/**
* 发送自定义消息
*/
public static void sendInfo(@PathParam("userId") Integer userId){
if (userId != null && webSocketMap.containsKey(userId)) {
for (WebSocketConfiguration webSocketServer : webSocketMap.get(userId)) {
try {webSocketServer.sendMessage(MessageUtils.Offline);
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
log.error("用户" + userId + ",不在线!");
}
}
}
5.创建HttpSessionConfigurator.class
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
public class HttpSessionConfigurator extends ServerEndpointConfig.Configurator{
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
if(null != httpSession){
sec.getUserProperties().put(HttpSession.class.getName(), httpSession);
}
}
}
6.创建WebSocketConfig
自动注册使用@ServerEndpoint注解声明的websocket endpoint
@Configuration
public class WebSocketConfig {
/**
* ServerEndpointExporter 作用
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
7.vue部分websocket.js
import {Notification} from 'element-ui'
var url = 'ws://' + location.host + '/socket/websocket/'
var ws;
var tt;
var lockReconnect = false;//避免重复连接
var clientId = localStorage.getItem("clientId")//缓存中取出客户端id
let count = 0;
//连接失败3次后会自动跳转到登录页面
var websocket = {
Init: function(clientId) {
this.clientId = localStorage.getItem("clientId")
if ("WebSocket" in window) {
ws = new WebSocket(url + clientId);
} else if ("MozWebSocket" in window) {
ws = new MozWebSocket(url + clientId);
} else {
console.log("您的浏览器不支持 WebSocket!");
return;
}
ws.onmessage = function(e) {
messageHandle(e.data)
heartCheck.start()
}
ws.onclose = function() {
console.log("连接已关闭")
localStorage.clear()
if(count <= 3){
reconnect(clientId);
}else {
Notification({
title: '错误',
message: '连接已关闭',
type: 'error',
});
window.location.reload();
}
}
ws.onopen = function(e) {
console.log("连接成功")
messageHandle(e.data)
}
ws.onerror = function(e) {
console.log("数据传输发生错误,后台服务关闭");
if(count <= 3){
reconnect(clientId);
}else {
Notification({
title: '错误',
message: '数据传输发生错误,服务关闭或网络不通!',
type: 'error',
});
localStorage.clear()
window.location.reload();
}
messageHandle(undefined)
}
},
Send:function(sender,reception,body,flag){
let data = {
sender:sender,
reception:reception,
body:body,
flag:flag
}
let msg= JSON.stringify(data)
ws.send(msg)
},
getWebSocket(){
return ws;
},
getStatus() {
if (ws.readyState == 0) {
return "未连接";
} else if (ws.readyState == 1) {
return "已连接";
} else if (ws.readyState == 2) {
return "连接正在关闭";
} else if (ws.readyState == 3) {
return "连接已关闭";
}
}
}
export default websocket;
//根据消息标识做不同的处理
function messageHandle(message) {
console.log(message,"msg")
switch (message) {
case '404session失效':
//关闭连接跳转登录
ws.close();
break;
case undefined :
//连接失败
count++;
break;
case 'ok' :
//心跳消息成功
count = 0;
break;
default:
let msg = JSON.parse(message)
//list通知其他用户现在在线的人的id,存储到onlineUser
localStorage.setItem("onlineUser",msg)
count = 0
heartCheck.start();
}
}
function reconnect(sname) {
if(lockReconnect) {
return;
};
lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
tt && clearTimeout(tt);
tt = setTimeout(function () {
console.log("执行断线重连...")
websocket.Init(sname);
lockReconnect = false;
}, 5000);
}
//心跳检测
var heartCheck = {
timeout: 1000 * 10 * 3, //3 = 30s
timeoutObj: null,
serverTimeoutObj: null,
start: function(){
var self = this;
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function(){
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
console.log('心跳检测...');
ws.send("发送心跳检测:"+ clientId );
self.serverTimeoutObj = setTimeout(function() {
if(ws.readyState!==1){
ws.close();
}
}, self.timeout);
}, this.timeout)
}
}
8.App.vue页面全局引入websocket
import websocket from '@/config/websocket'
created: function () {
//从localStorage中获取用户信息,是登陆状态则能够进行webSocket重连
let onlineUser = localStorage.getItem('onlineUser');
if(onlineUser){
let id = localStorage.getItem("clientId")
console.log(id,"info")
if(null != id){
this.initWebsocket(id)
}
}
},
methods: {
//调用方法
initWebsocket(id){
websocket.Init(id)
}
}
9.登录页面直接调用父组件Vue的 initWebsocket方法
login.vue 在登录成功存储及调用
localStorage.setItem("clientId",token.userInfo.id)
this.$parent.initWebsocket(token.userInfo.id)
10.页面刷新Websocket会断开,全局监听重连Websocket
main.js监听localStorage值的变化
var orignalSetItem = localStorage.setItem;
localStorage.setItem = function(key,newValue){
var setItemEvent = new Event("setItemEvent");
setItemEvent.key = key
setItemEvent.newValue = newValue;
window.dispatchEvent(setItemEvent);
orignalSetItem.apply(this,arguments);
}
11.页面应用
在需要应用的页面比如user.vue使用
<el-table-column label="用户在线状态" width="100">
<template slot-scope="scope">
<span>{{isOnline(scope.row.id)}}</span>
</template>
</el-table-column>
created() {
let onlineUser = localStorage.getItem('onlineUser')
if(onlineUser instanceof Array){
this.online = onlineUser
}else {
if(onlineUser.indexOf(",") !== -1){
let arr = onlineUser.split(",")
this.online = arr
}else {
let arr = []
arr.push(onlineUser)
this.online = arr
}
}
},
computed: {
isOnline(){
return function (val) {
return this.online.findIndex(item => item == val) !== -1
}
}
},
mounted() {
//监听在线用户人数变化id
window.addEventListener('setItemEvent', (e)=> {
// let newVal = localStorage.getItem('onlineUser');
if(e.key==="onlineUser"){
this.online = e.newValue
}
})
},
最终的最终提示
退出登录时一定要清除localStorage
- localStorage.clear()
socket代理配置
- dev: {
‘/socket’: {
//webSocket代理
target: ‘ws://127.0.0.1:8888’, // 内网ip
ws:true,//开启ws, 如果是http代理此处可以不用设置
changeOrigin: true,
pathRewrite: {
‘^/socket’: ‘/’
}
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)