WebSocket + SpringBoot + VUE实现后端实时向前端推送消息
一、目的众所周知,正常来说在前后端通信中,后端往往处于一个被动的状态,对前端的请求作出对应的响应。但有的时候我们会遇到需要后端前前端推送数据的需求,比如消息、邮件等的推送。这个时候,实现的一种方式是使用webSocket,在前后端之间建立唯一的通信连接。二、小知识1.WebSocket连接用的是ws,而不是http2.WebSocket在连接期间是一直保留的三、核心代码1.后端依赖<depe
一、目的
众所周知,正常来说在前后端通信中,后端往往处于一个被动的状态,对前端的请求作出对应的响应。但有的时候我们会遇到需要后端前前端推送数据的需求,比如消息、邮件等的推送。这个时候,实现的一种方式是使用webSocket,在前后端之间建立唯一的通信连接。
二、小知识
1.WebSocket连接用的是ws,而不是http
2.WebSocket在连接期间是一直保留的
三、核心代码
1.后端依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
<exclusions>
<exclusion>
<!-- 项目中已经引入了netty,zookeeper可以不使用netty通信 -->
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
------------------------------------------------------------------------------------------------------------------------
2.NettyConfig
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 在NettyConfig中定义一个channel组,管理所有的channel,再定义一个map,管理用户与channel的对应关系。
* @date 2021/8/12 3:17 下午
*/
public class NettyConfig {
/**
* 定义一个channel组,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放用户与Chanel的对应信息,用于给指定用户发送消息
*/
private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
private NettyConfig() {}
/**
* 获取channel组
* @return
*/
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
/**
* 获取用户channel map
* @return
*/
public static ConcurrentHashMap<String, Channel> getUserChannelMap(){
return userChannelMap;
}
}
-------------------------------------------------------------------------------------------------------------------------------
3.NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
/**
* @description: 创建NettyServer,定义两个EventLoopGroup,bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作,需要说明的是,需要开启一个新的线程来执行netty server,要不然会阻塞主线程,到时候就无法调用项目的其他controller接口了
* @date 2021/8/12 3:18 下午
*/
@Slf4j
@Component
public class NettyServer {
/**
* webSocket协议名
*/
private static final String WEBSOCKET_PROTOCOL = "WebSocket";
/**
* 端口号
*/
@Value("${webSocket.netty.port:58081}")
private int port;
/**
* webSocket路径
*/
@Value("${webSocket.netty.path:/webSocket}")
private String webSocketPath;
@Autowired
private WebSocketHandler webSocketHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
/**
* 启动
*
* @throws InterruptedException
*/
private void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// bossGroup负责客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
bootstrap.group(bossGroup, workGroup);
// 设置NIO类型的channel
bootstrap.channel(NioServerSocketChannel.class);
// 设置监听端口
bootstrap.localAddress(new InetSocketAddress(port));
// 连接到达时会创建一个通道
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 流水线管理通道中的处理程序(Handler),用来处理业务
// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ObjectEncoder());
// 以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
/* 说明: 1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合 2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求 */
ch.pipeline().addLast(new HttpObjectAggregator(8192));
/* 说明: 1、对应webSocket,它的数据是以帧(frame)的形式传递 2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri 3、核心功能是将http协议升级为ws协议,保持长连接 */
ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
// 自定义的handler,处理业务逻辑
ch.pipeline().addLast(webSocketHandler);
}
});
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}
/**
* 释放资源
* @throws InterruptedException
*/
@PreDestroy
public void destroy() throws InterruptedException {
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
}
@PostConstruct()
public void init() {
//需要开启一个新的线程来执行netty server 服务器
new Thread(() -> {
try {
start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
-----------------------------------------------------------------------------------------------------------------------------
4.WebSocketHandler
import com.example.demo.netty.config.NettyConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @title: WebSocketHandler
* @description: TextWebSocketFrame类型, 表示一个文本帧
* @date 2021/8/12 3:34 下午
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 一旦连接,第一个被执行
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//log.info("handlerAdded 被调用:{}", ctx.channel().id().asLongText());
// 添加到channelGroup 通道组
NettyConfig.getChannelGroup().add(ctx.channel());
}
/**
* 读取数据
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
log.info("服务器收到消息:{}", msg.text());
// JSONObject jsonObject = JSON.parseObject(msg.text());
// String staffId = jsonObject.getString("staffId");
// NettyConfig.getUserChannelMap().put(staffId, ctx.channel());
// // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
// AttributeKey<String> key = AttributeKey.valueOf("userId");
// ctx.channel().attr(key).setIfAbsent(staffId);
//
// if (jsonObject.containsKey("type") && jsonObject.containsValue("ping")) {
// // websocket 心跳连接不回复消息
// } else {
// // 回复消息
// ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器连接成功!"));
// }
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//log.info("handlerRemoved 被调用:{}", ctx.channel().id().asLongText());
// 删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("异常:{}", cause.getMessage());
// 删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
ctx.close();
}
/**
* 删除用户与channel的对应关系
* @param ctx
*/
private void removeUserId(ChannelHandlerContext ctx) {
AttributeKey<String> key = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(key).get();
NettyConfig.getUserChannelMap().remove(userId);
}
}
-----------------------------------------------------------------------------------------------------------------------
4.前端
<template>
<div class="test"></div>
</template>
<script>
export default {
name: "test",
data() {
return {
websock: null,
};
},
created() {
this.initWebSocket();
},
destroyed() {
this.websock.close(); //离开路由之后断开websocket连接
},
methods: {
initWebSocket() {
//初始化weosocket
if (window.WebSocket) {
const wsuri = "ws://10.11.30.31:58080/webSocket";
this.websock = new WebSocket(wsuri);
this.websock.onmessage = this.websocketonmessage;
this.websock.onopen = this.websocketonopen;
this.websock.onerror = this.websocketonerror;
this.websock.onclose = this.websocketclose;
} else {
alert("你的浏览器不支持WebSocket。请不要使用低版本的IE浏览器。");
}
},
websocketonopen() {
//连接建立之后执行send方法发送数据
let actions = {
userId: "workOrderSendId",
provinceCode: "01",
cityCode: "02",
areaCode: "03",
gridCode: "04",
roleIds: "01,02,03"
};
this.websocketsend(JSON.stringify(actions));
},
websocketonerror() {
},
websocketonmessage(e) {
//数据接收
console.log(e.data);
},
websocketsend(Data) {
//数据发送
this.websock.send(Data);
},
websocketclose(e) {
//关闭
console.log("断开连接", e);
},
},
};
</script>
<style lang='less'>
</style>
--------------------------------------------------------------------------------------------------------------------------------
更多推荐
所有评论(0)