消息通知系统--websocket+netty+rabbitmq
消息通知系统改进rabbitMQ中间件一.rabbitMQ基本配置(1)docker中启动docker run -id --name=tensquare_rabbit -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15672:15672 -p 25672:25672 rabbitmq:management(2)application.ymlrabbitmq:h
消息通知系统改进
rabbitMQ中间件
一.rabbitMQ基本配置
(1)docker中启动
docker run -id --name=tensquare_rabbit -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15672:15672 -p 25672:25672 rabbitmq:management
(2)application.yml
rabbitmq:
host: 192.168.200.128
(3)pom.xml依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二.rabbitMQ应用
1.rabbitMQ建立队列与之操作,一般用于订阅取消订阅等
//1 创建rabbit管理器
RabbitAdmin rabbitAdmin=new RabbitAdmin(rabbitTemplate.getConnectionFactory());
//2 声明Direct交换机,处理新增文章,参数内容其实不会显示在页面,只做日后访问标识
DirectExchange directExchange=new DirectExchange("article_subscribe");
rabbitAdmin.declareExchange(directExchange);
//3 声明队列,每个用户都有自己的队列,通过用户id区分
org.springframework.amqp.core.Queue queue=new org.springframework.amqp.core.Queue("article_subscribe"+userId,true);
//4 声明交换机和用户的绑定关系,确保队列只收到对应作者的新增文章
//通过路由键进行绑定作者
//第一个是队列,第二个是交换机,第三个是!!路由键作者id!!路由键才是真正队列绑定的关键标识,上边的queue后参数只是队列名字
Binding binding= BindingBuilder.bind(queue).to(directExchange).with(authorId);
//声明队列和绑定队列
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
//解绑队列
rabbitAdmin.removeBinding(binding);
2.rabbitMQ给队列发送消息
//交換机名,路由鍵,新消息提醒(路由键与之匹配的队列会接受消息,id就是队列里最终存储的内容)
rabbitTemplate.convertAndSend("article_subscribe",userId,id);
NIO编程
一.IO与NIO
传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环。
public class IOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8000);
while (true) { // (1) 阻塞方法获取新的连接
Socket socket = serverSocket.accept();
new Thread() { @Override public void run() {
String name = Thread.currentThread().getName();
try {// (2) 每一个新的连接都创建一个线程,负责读取数 据
byte[] data = new byte[1024]; InputStream inputStream = socket.getInputStream();
while (true) { int len;
// (3) 按字节流方式读取数据
while ((len = inputStream.read(data)) != -1) {
System.out.println("线程" + name + ":" + new String(data, 0, len)); } } }
catch (Exception e) { }
}
}.start();
} } }
在NIO模型中,可以把这么多的while死循环变成一个死循环,这个死循环由一个线程控制。这就是NIO模型中选择器(Selector)的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到选择器上,通过检查这个选择器,就可以批量监测出有数据可读的连接,进而读取数据。
※NIO相对于IO的优势:
- IO是面向流的,每次都是从操作系统底层一个字节一个字节地读取数据,并且数据只能从一端读取到另一端,不能前后移动流中的数据。NIO则是面向缓冲区的,每次可以从这个缓冲区里面读取一块的数据,并且可以在需要时在缓冲区中前后移动。
- IO是阻塞的,这意味着,当一个线程读取数据或写数据时,该线程被阻塞,直到有一些数据被读取,或数据完全写入,在此期间该线程不能干其他任何事情。而NIO是非阻塞的,不需要一直等待操作完成才能干其他事情,而是在等待的过程中可以同时去做别的事情,所以能最大限度地使用服务器的资源。
- NIO引入了IO多路复用器selector。selector是一个提供channel注册服务的线程,可以同时对接多个Channel,并在线程池中为channel适配、选择合适的线程来处理channel。由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高。
二.Netty
1.Netty是什么?
Netty是一个异步事件驱动的网络应用框架
※2.为什么使用Netty?
①使用JDK自带的NIO需要了解太多的概念,编程复杂
②Netty底层IO模型随意切换,而这一切只需要做微小的改动,就可以直接从NIO模型变身为IO模型
③Netty自带的拆包解包,异常检测等机制,可以从NIO的繁重细节中脱离出来,只需要关心业务逻辑
④Netty解决了JDK的很多包括空轮询在内的bug
⑤Netty底层对线程,selector做了很多细小的优化,精心设计的线程模型做到非常高效的并发处理
⑥自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
⑦Netty社区活跃,遇到问题随时邮件列表或者issue
⑧Netty已经历各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,
健壮性无比强大
3.Netty与事件驱动
3.1为什么使用事件驱动?
①程序中的任务可以并行执行;
②任务之间高度独立,彼此之间不需要互相等待;
③在等待的事件到来之前,任务不会阻塞
4.Netty框架包含如下的组件:
ServerBootstrap :用于接受客户端的连接以及为已接受的连接创建子通道,一
般用于服务端。
Bootstrap:不接受新的连接,并且是在父通道类完成一些操作,一般用于客户
端的。
Channel:对网络套接字的I/O操作,例如读、写、连接、绑定等操作进行适配
和封装的组件。
EventLoop:处理所有注册其上的channel的I/O操作。通常情况一个
EventLoop可为多个channel提供服务。
EventLoopGroup:包含有多个EventLoop的实例,用来管理 event Loop 的组
件,可以理解为一个线程池,内部维护了一组线程。
ChannelHandler和ChannelPipeline:例如一个流水线车间,当组件从流水线
头部进入,穿越流水线,流水线上的工人按顺序对组件进行加工,到达流水线
尾部时商品组装完成。流水线相当于 ChannelPipeline ,流水线工人相当于
ChannelHandler ,源头的组件当做event。
ChannelInitializer:用于对刚创建的channel进行初始化,将ChannelHandler
添加到channel的ChannelPipeline处理链路中。
ChannelFuture:与jdk中线程的Future接口类似,即实现并行处理的效果。可
以在操作执行成功或失败时自动触发监听器中的事件处理方法。
服务端代码
public class NettyServer { public static void main(String[] args) {
// 用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端。
ServerBootstrap serverBootstrap = new ServerBootstrap();
// EventLoopGroup包含有多个EventLoop的实例,用来管理event Loop的组件
// 接受新连接线程
NioEventLoopGroup boos = new NioEventLoopGroup();
// 读取数据的线程
NioEventLoopGroup worker = new NioEventLoopGroup();
//服务端执行
serverBootstrap .group(boos, worker)
// Channel对网络套接字的I/O操作, // 例如读、写、连接、绑定等操作进行适配和封装的组件。
.channel(NioServerSocketChannel.class)
// ChannelInitializer用于对刚创建的channel进行初始化
// 将ChannelHandler添加到channel的ChannelPipeline处 理链路中。
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 组件从流水线头部进入,流水线上的工人按顺序对组 件进行加工
// 流水线相当于ChannelPipeline
// 流水线工人相当于ChannelHandler
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { //这个工人有点麻烦,需要我们告诉他干啥事
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } })
.bind(8000); } }
客户端类似,用ChannelFuture future = channel.writeAndFlush("测试数据");
发送数据
三.Netty与WebSocket整合
1.配置
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>
application.yml
rabbitmq:
host: 192.168.200.128
2.整合所需添加类
- 编写 NettyServer ,启动Netty服务。
public class NettyServer {
public void start(int port) {
System.out.println("准备启动Netty。。。");
ServerBootstrap serverBootstrap = new ServerBootstrap();
//用来处理新连接的
EventLoopGroup boos = new NioEventLoopGroup();
//用来处理业务逻辑的,读写。。。
EventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boos, worker)
// .localAddress(port)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
//请求消息解码器
ch.pipeline().addLast(new HttpServerCodec());
// 将多个消息转换为单一的request或者response对象
ch.pipeline().addLast(new HttpObjectAggregator(65536));
//处理WebSocket的消息事件,基本都是以/ws开头
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
//创建自己的webSocket处理器,就是用来编写业务逻辑的
MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
ch.pipeline().addLast(myWebSocketHandler);
}
}).bind(port);
}
}
- 使用配置Bean创建Netty服务。编写 NettyConfig 。(主要是需要start一下,其余与其他config类大致相同)
@Configuration
public class NettyConfig
{
@Bean
public NettyServer createNettyServer(){
NettyServer nettyServer=new NettyServer();
new Thread(){
public void run(){
nettyServer.start(1234);
}
}.start();
return nettyServer;
}
}
※ 3. 编写和WebSocket进行通讯处理类 MyWebSocketHandler ,进行MQ和 WebSocket的消息处理。此类为业务代码重点
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static ObjectMapper MAPPER = new ObjectMapper();
// 送Spring容器中获取消息监听器容器,处理订阅消息sysNotice
SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext()
.getBean("sysNoticeContainer");
// 送Spring容器中获取消息监听器容器,处理点赞消息userNotice
SimpleMessageListenerContainer userNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext()
.getBean("userNoticeContainer");
//从Spring容器中获取RabbitTemplate
RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext()
.getBean(RabbitTemplate.class);
//存放WebSocket连接Map,根据用户id存放
public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap();
//用户请求WebSocket服务端,执行的方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//约定用户第一次请求携带的数据:{"userId":"1"}
//获取用户请求数据并解析
String json = msg.text();
//解析json数据,获取用户id
String userId = MAPPER.readTree(json).get("userId").asText();
//第一次请求的时候,需要建立WebSocket连接
Channel channel = userChannelMap.get(userId);
if (channel == null) {
//获取WebSocket的连接
channel = ctx.channel();
//把连接放到容器中
userChannelMap.put(userId, channel);
}
//只用完成新消息的提醒即可,只需要获取消息的数量
//获取RabbitMQ的消息内容,并发送给用户
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
//拼接获取队列名称
String queueName = "article_subscribe_" + userId;
//获取Rabbit的Properties容器
Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
//获取消息数量
int noticeCount = 0;
//判断Properties是否不为空
if (queueProperties != null) {
// 如果不为空,获取消息的数量
noticeCount = (int) queueProperties.get("QUEUE_MESSAGE_COUNT");
}
//---------------以上获取订阅类消息,以下获取点赞类消息---------------------
//拼接获取队列名称
String userQueueName = "article_thumbup_" + userId;
//获取Rabbit的Properties容器
Properties userQueueProperties = rabbitAdmin.getQueueProperties(userQueueName);
//获取消息数量
int userNoticeCount = 0;
//判断Properties是否不为空
if (userQueueProperties != null) {
// 如果不为空,获取消息的数量
userNoticeCount = (int) userQueueProperties.get("QUEUE_MESSAGE_COUNT");
}
//封装返回的数据
HashMap countMap = new HashMap();
//订阅类消息数量
countMap.put("sysNoticeCount", noticeCount);
//点赞类消息数量
countMap.put("userNoticeCount", userNoticeCount);
Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
//把数据发送给用户
channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
//把消息从队列里面清空,否则MQ消息监听器会再次消费一次
if (noticeCount > 0) {
rabbitAdmin.purgeQueue(queueName, true);
}
if (noticeCount > 0) {
rabbitAdmin.purgeQueue(userQueueName, true);
}
//为用户的消息通知队列注册监听器,便于用户在线的时候,
//一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据
sysNoticeContainer.addQueueNames(queueName);
userNoticeContainer.addQueueNames(userQueueName);
}
}
- 使用配置Bean创建Rabbit监听器容器,使用监听器。编写 RabbitConfig 。
@Configuration
public class RabbitConfig {
@Bean("sysNoticeContainer")
public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//使用Channel
container.setExposeListenerChannel(true);
//设置自己编写的监听器
container.setMessageListener(new SysNoticeListener());
return container; } }
- 编写Rabbit监听器 SysNoticeListener ,用来获取MQ消息并进行处理
public class SysNoticeListener implements ChannelAwareMessageListener {
private static ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取用户id,可以通过队列名称获取
String queueName = message.getMessageProperties().getConsumerQueue();
String userId = queueName.substring(queueName.lastIndexOf("_") + 1);
io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
//判断用户是否在线
if (wsChannel != null) {
//如果连接不为空,表示用户在线
//封装返回数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount", 1);
Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
// 把数据通过WebSocket连接主动推送用户
wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
}
}
}
public class UserNoticeListener implements ChannelAwareMessageListener {
private static ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取用户id,可以通过队列名称获取
String queueName = message.getMessageProperties().getConsumerQueue();
String userId = queueName.substring(queueName.lastIndexOf("_") + 1);
io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
//判断用户是否在线
if (wsChannel != null) {
//如果连接不为空,表示用户在线
//封装返回数据
HashMap countMap = new HashMap();
countMap.put("userNoticeCount", 1);
Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
// 把数据通过WebSocket连接主动推送用户
wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
}
}
}
3.修改启动类,添加netty服务启动
public static void main(String[] args) {
SpringApplication.run(NoticeApplication.class, args);
NettyServer server = ApplicationContextProvider.getApplicationContext().getBean(NettyServer.class);
try {server.start(12345); }
catch (Exception e) {
e.printStackTrace(); } }
最终MyWebSocketHandler修改为
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static ObjectMapper MAPPER = new ObjectMapper();
// 送Spring容器中获取消息监听器容器,处理订阅消息sysNotice
SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext() .getBean("sysNoticeContainer");
// 送Spring容器中获取消息监听器容器,处理点赞消息userNotice
SimpleMessageListenerContainer userNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext() .getBean("userNoticeContainer");
//从Spring容器中获取RabbitTemplate
RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext() .getBean(RabbitTemplate.class);
//存放WebSocket连接Map,根据用户id存放
public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap();
//用户请求WebSocket服务端,执行的方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//约定用户第一次请求携带的数据:{"userId":"1"}
//获取用户请求数据并解析
String json = msg.text();
//解析json数据,获取用户id
String userId = MAPPER.readTree(json).get("userId").asText();
//第一次请求的时候,需要建立WebSocket连接 Channel
channel = userChannelMap.get(userId);
if (channel == null) {
//获取WebSocket的连接
channel = ctx.channel();
//把连接放到容器中
userChannelMap.put(userId, channel); }
//只用完成新消息的提醒即可,只需要获取消息的数量
//获取RabbitMQ的消息内容,并发送给用户
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
//拼接获取队列名称
String queueName = "article_subscribe_" + userId;
//获取Rabbit的Properties容器
Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
//获取消息数量
int noticeCount = 0;
//判断Properties是否不为空
if (queueProperties != null) {
// 如果不为空,获取消息的数量
noticeCount = (int) queueProperties.get("QUEUE_MESSAGE_COUNT"); }
//-----------------------------------------------------
//拼接获取队列名称
String userQueueName = "article_thumbup_" + userId;
//获取Rabbit的Properties容器
Properties userQueueProperties = rabbitAdmin.getQueueProperties(userQueueName);
//获取消息数量
int userNoticeCount = 0;
//判断Properties是否不为空
if (userQueueProperties != null) {
// 如果不为空,获取消息的数量
userNoticeCount = (int) userQueueProperties.get("QUEUE_MESSAGE_COUNT"); }
//封装返回的数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount", noticeCount);
countMap.put("userNoticeCount", userNoticeCount);
Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
//把数据发送给用户
channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
//把消息从队列里面清空,否则MQ消息监听器会再次消费一次
if (noticeCount > 0) { rabbitAdmin.purgeQueue(queueName, true); }
if (userNoticeCount > 0) { rabbitAdmin.purgeQueue(userQueueName, true); }
//为用户的消息通知队列注册监听器,便于用户在线的时候,
//一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据
sysNoticeContainer.addQueueNames(queueName);
userNoticeContainer.addQueueNames(userQueueName); } }
更多推荐
所有评论(0)