消息通知系统改进

改进后模型

rabbitMQ中间件

一.rabbitMQ基本配置

(1)docker中启动

docker run -id --name=tensquare_rabbit -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15672:15672 -p 25672:25672 rabbitmq:management

(2)application.yml

rabbitmq:
 host: 192.168.200.128

(3)pom.xml依赖

<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-amqp</artifactId> 
</dependency>

二.rabbitMQ应用

1.rabbitMQ建立队列与之操作,一般用于订阅取消订阅等

         //1 创建rabbit管理器
        RabbitAdmin rabbitAdmin=new RabbitAdmin(rabbitTemplate.getConnectionFactory());
        //2 声明Direct交换机,处理新增文章,参数内容其实不会显示在页面,只做日后访问标识
        DirectExchange directExchange=new DirectExchange("article_subscribe");
        rabbitAdmin.declareExchange(directExchange);
        //3 声明队列,每个用户都有自己的队列,通过用户id区分
        org.springframework.amqp.core.Queue queue=new   org.springframework.amqp.core.Queue("article_subscribe"+userId,true);

        //4 声明交换机和用户的绑定关系,确保队列只收到对应作者的新增文章
         //通过路由键进行绑定作者
        //第一个是队列,第二个是交换机,第三个是!!路由键作者id!!路由键才是真正队列绑定的关键标识,上边的queue后参数只是队列名字
    
        Binding binding= BindingBuilder.bind(queue).to(directExchange).with(authorId);
        //声明队列和绑定队列 
        rabbitAdmin.declareQueue(queue); 
        rabbitAdmin.declareBinding(binding);
        //解绑队列
        rabbitAdmin.removeBinding(binding);
  

2.rabbitMQ给队列发送消息

           //交換机名,路由鍵,新消息提醒(路由键与之匹配的队列会接受消息,id就是队列里最终存储的内容)
            rabbitTemplate.convertAndSend("article_subscribe",userId,id);

NIO编程

一.IO与NIO

传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环。

 public class IOServer { 
 public static void main(String[] args) throws Exception {
  ServerSocket serverSocket = new ServerSocket(8000); 
  while (true) { // (1) 阻塞方法获取新的连接 
  Socket socket = serverSocket.accept(); 
  new Thread() { @Override public void run() { 
  String name = Thread.currentThread().getName(); 
  try {// (2) 每一个新的连接都创建一个线程,负责读取数 据
   byte[] data = new byte[1024]; InputStream inputStream = socket.getInputStream();
   while (true) { int len;
    // (3) 按字节流方式读取数据
     while ((len = inputStream.read(data)) != -1) {
System.out.println("线程" + name + ":" + new String(data, 0, len)); } } } 
catch (Exception e) { } 
} 
}.start();
 } } }

NIO与IO模式对比

在NIO模型中,可以把这么多的while死循环变成一个死循环,这个死循环由一个线程控制。这就是NIO模型中选择器(Selector)的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到选择器上,通过检查这个选择器,就可以批量监测出有数据可读的连接,进而读取数据。

※NIO相对于IO的优势:

  1. IO是面向流的,每次都是从操作系统底层一个字节一个字节地读取数据,并且数据只能从一端读取到另一端,不能前后移动流中的数据。NIO则是面向缓冲区的,每次可以从这个缓冲区里面读取一块的数据,并且可以在需要时在缓冲区中前后移动。
  2. IO是阻塞的,这意味着,当一个线程读取数据或写数据时,该线程被阻塞,直到有一些数据被读取,或数据完全写入,在此期间该线程不能干其他任何事情。而NIO是非阻塞的,不需要一直等待操作完成才能干其他事情,而是在等待的过程中可以同时去做别的事情,所以能最大限度地使用服务器的资源。
  3. NIO引入了IO多路复用器selector。selector是一个提供channel注册服务的线程,可以同时对接多个Channel,并在线程池中为channel适配、选择合适的线程来处理channel。由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高。
二.Netty

1.Netty是什么?
Netty是一个异步事件驱动的网络应用框架
※2.为什么使用Netty?
①使用JDK自带的NIO需要了解太多的概念,编程复杂
②Netty底层IO模型随意切换,而这一切只需要做微小的改动,就可以直接从NIO模型变身为IO模型
③Netty自带的拆包解包,异常检测等机制,可以从NIO的繁重细节中脱离出来,只需要关心业务逻辑
④Netty解决了JDK的很多包括空轮询在内的bug
⑤Netty底层对线程,selector做了很多细小的优化,精心设计的线程模型做到非常高效的并发处理
⑥自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
⑦Netty社区活跃,遇到问题随时邮件列表或者issue
⑧Netty已经历各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,
健壮性无比强大

3.Netty与事件驱动
3.1为什么使用事件驱动?
①程序中的任务可以并行执行;
②任务之间高度独立,彼此之间不需要互相等待;
③在等待的事件到来之前,任务不会阻塞

4.Netty框架包含如下的组件:
ServerBootstrap :用于接受客户端的连接以及为已接受的连接创建子通道,一
般用于服务端。
Bootstrap:不接受新的连接,并且是在父通道类完成一些操作,一般用于客户
端的。
Channel:对网络套接字的I/O操作,例如读、写、连接、绑定等操作进行适配
和封装的组件。
EventLoop:处理所有注册其上的channel的I/O操作。通常情况一个
EventLoop可为多个channel提供服务。
EventLoopGroup:包含有多个EventLoop的实例,用来管理 event Loop 的组
件,可以理解为一个线程池,内部维护了一组线程。
ChannelHandler和ChannelPipeline:例如一个流水线车间,当组件从流水线
头部进入,穿越流水线,流水线上的工人按顺序对组件进行加工,到达流水线
尾部时商品组装完成。流水线相当于 ChannelPipeline ,流水线工人相当于
ChannelHandler ,源头的组件当做event。
ChannelInitializer:用于对刚创建的channel进行初始化,将ChannelHandler
添加到channel的ChannelPipeline处理链路中。
ChannelFuture:与jdk中线程的Future接口类似,即实现并行处理的效果。可
以在操作执行成功或失败时自动触发监听器中的事件处理方法。

服务端代码

public class NettyServer { public static void main(String[] args) { 
// 用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端。 
ServerBootstrap serverBootstrap = new ServerBootstrap(); 
// EventLoopGroup包含有多个EventLoop的实例,用来管理event Loop的组件
// 接受新连接线程 
NioEventLoopGroup boos = new NioEventLoopGroup(); 
// 读取数据的线程 
NioEventLoopGroup worker = new NioEventLoopGroup(); 
//服务端执行 
serverBootstrap .group(boos, worker)
 // Channel对网络套接字的I/O操作, // 例如读、写、连接、绑定等操作进行适配和封装的组件。 
 .channel(NioServerSocketChannel.class) 
 // ChannelInitializer用于对刚创建的channel进行初始化
  // 将ChannelHandler添加到channel的ChannelPipeline处 理链路中。 
  .childHandler(new ChannelInitializer<NioSocketChannel>() {
   protected void initChannel(NioSocketChannel ch) { 
  // 组件从流水线头部进入,流水线上的工人按顺序对组 件进行加工 
  // 流水线相当于ChannelPipeline 
  // 流水线工人相当于ChannelHandler 
  ch.pipeline().addLast(new StringDecoder());
   ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { //这个工人有点麻烦,需要我们告诉他干啥事 
   @Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } })
.bind(8000); } }

客户端类似,用ChannelFuture future = channel.writeAndFlush("测试数据");发送数据

三.Netty与WebSocket整合

1.配置
pom.xml

<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-amqp</artifactId> 
</dependency> 
<dependency>
 <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
   <version>4.1.5.Final</version>
    </dependency>   

application.yml

rabbitmq: 
 host: 192.168.200.128

2.整合所需添加类
在这里插入图片描述

  1. 编写 NettyServer ,启动Netty服务。
public class NettyServer {

    public void start(int port) {
        System.out.println("准备启动Netty。。。");
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //用来处理新连接的
        EventLoopGroup boos = new NioEventLoopGroup();
        //用来处理业务逻辑的,读写。。。
        EventLoopGroup worker = new NioEventLoopGroup();

        serverBootstrap.group(boos, worker)
                // .localAddress(port)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        //请求消息解码器
                        ch.pipeline().addLast(new HttpServerCodec());
                        // 将多个消息转换为单一的request或者response对象
                        ch.pipeline().addLast(new HttpObjectAggregator(65536));
                        //处理WebSocket的消息事件,基本都是以/ws开头
                        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));

                        //创建自己的webSocket处理器,就是用来编写业务逻辑的
                        MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
                        ch.pipeline().addLast(myWebSocketHandler);
                    }
                }).bind(port);
    }
}
  1. 使用配置Bean创建Netty服务。编写 NettyConfig 。(主要是需要start一下,其余与其他config类大致相同)
@Configuration
public class NettyConfig
{
    @Bean
    public NettyServer createNettyServer(){

    NettyServer nettyServer=new NettyServer();
    new Thread(){
        public void run(){
            nettyServer.start(1234);
        }
    }.start();
    return nettyServer;
    }
}

※ 3. 编写和WebSocket进行通讯处理类 MyWebSocketHandler ,进行MQ和 WebSocket的消息处理。此类为业务代码重点

public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static ObjectMapper MAPPER = new ObjectMapper();

    // 送Spring容器中获取消息监听器容器,处理订阅消息sysNotice
    SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext()
            .getBean("sysNoticeContainer");
    // 送Spring容器中获取消息监听器容器,处理点赞消息userNotice
    SimpleMessageListenerContainer userNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext()
            .getBean("userNoticeContainer");

    //从Spring容器中获取RabbitTemplate
    RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext()
            .getBean(RabbitTemplate.class);

    //存放WebSocket连接Map,根据用户id存放
    public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap();

    //用户请求WebSocket服务端,执行的方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //约定用户第一次请求携带的数据:{"userId":"1"}
        //获取用户请求数据并解析
        String json = msg.text();
        //解析json数据,获取用户id
        String userId = MAPPER.readTree(json).get("userId").asText();


        //第一次请求的时候,需要建立WebSocket连接

        Channel channel = userChannelMap.get(userId);
        if (channel == null) {
            //获取WebSocket的连接
            channel = ctx.channel();

            //把连接放到容器中
            userChannelMap.put(userId, channel);
        }

        //只用完成新消息的提醒即可,只需要获取消息的数量
        //获取RabbitMQ的消息内容,并发送给用户
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        //拼接获取队列名称
        String queueName = "article_subscribe_" + userId;
        //获取Rabbit的Properties容器
        Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);

        //获取消息数量
        int noticeCount = 0;
        //判断Properties是否不为空
        if (queueProperties != null) {
            // 如果不为空,获取消息的数量
            noticeCount = (int) queueProperties.get("QUEUE_MESSAGE_COUNT");
        }

        //---------------以上获取订阅类消息,以下获取点赞类消息---------------------
        //拼接获取队列名称
        String userQueueName = "article_thumbup_" + userId;
        //获取Rabbit的Properties容器
        Properties userQueueProperties = rabbitAdmin.getQueueProperties(userQueueName);

        //获取消息数量
        int userNoticeCount = 0;
        //判断Properties是否不为空
        if (userQueueProperties != null) {
            // 如果不为空,获取消息的数量
            userNoticeCount = (int) userQueueProperties.get("QUEUE_MESSAGE_COUNT");
        }


        //封装返回的数据
        HashMap countMap = new HashMap();
        //订阅类消息数量
        countMap.put("sysNoticeCount", noticeCount);
        //点赞类消息数量
        countMap.put("userNoticeCount", userNoticeCount);
        Result result = new Result(true, StatusCode.OK, "查询成功", countMap);

        //把数据发送给用户
        channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));

        //把消息从队列里面清空,否则MQ消息监听器会再次消费一次
        if (noticeCount > 0) {
            rabbitAdmin.purgeQueue(queueName, true);
        }
        if (noticeCount > 0) {
            rabbitAdmin.purgeQueue(userQueueName, true);
        }

        //为用户的消息通知队列注册监听器,便于用户在线的时候,
        //一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据
        sysNoticeContainer.addQueueNames(queueName);
        userNoticeContainer.addQueueNames(userQueueName);
    }
}
  1. 使用配置Bean创建Rabbit监听器容器,使用监听器。编写 RabbitConfig 。
@Configuration 
public class RabbitConfig { 
@Bean("sysNoticeContainer") 
public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//使用Channel 
container.setExposeListenerChannel(true); 
//设置自己编写的监听器 
container.setMessageListener(new SysNoticeListener());
 return container; } }
  1. 编写Rabbit监听器 SysNoticeListener ,用来获取MQ消息并进行处理
    public class SysNoticeListener implements ChannelAwareMessageListener {
    private static ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //获取用户id,可以通过队列名称获取
        String queueName = message.getMessageProperties().getConsumerQueue();
        String userId = queueName.substring(queueName.lastIndexOf("_") + 1);

        io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);

        //判断用户是否在线
        if (wsChannel != null) {
            //如果连接不为空,表示用户在线
            //封装返回数据
            HashMap countMap = new HashMap();
            countMap.put("sysNoticeCount", 1);
            Result result = new Result(true, StatusCode.OK, "查询成功", countMap);

            // 把数据通过WebSocket连接主动推送用户
            wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
        }
    }
}
public class UserNoticeListener implements ChannelAwareMessageListener {

    private static ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //获取用户id,可以通过队列名称获取
        String queueName = message.getMessageProperties().getConsumerQueue();
        String userId = queueName.substring(queueName.lastIndexOf("_") + 1);

        io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);

        //判断用户是否在线
        if (wsChannel != null) {
            //如果连接不为空,表示用户在线
            //封装返回数据
            HashMap countMap = new HashMap();
            countMap.put("userNoticeCount", 1);
            Result result = new Result(true, StatusCode.OK, "查询成功", countMap);

            // 把数据通过WebSocket连接主动推送用户
            wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
        }
    }
}

3.修改启动类,添加netty服务启动

public static void main(String[] args) {
 SpringApplication.run(NoticeApplication.class, args);
  NettyServer server = ApplicationContextProvider.getApplicationContext().getBean(NettyServer.class); 
  try {server.start(12345); } 
catch (Exception e) {
 e.printStackTrace(); } }

最终MyWebSocketHandler修改为

public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 private static ObjectMapper MAPPER = new ObjectMapper(); 
 // 送Spring容器中获取消息监听器容器,处理订阅消息sysNotice
  SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext() .getBean("sysNoticeContainer"); 
  // 送Spring容器中获取消息监听器容器,处理点赞消息userNotice 
  SimpleMessageListenerContainer userNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext() .getBean("userNoticeContainer"); 
  //从Spring容器中获取RabbitTemplate
   RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext() .getBean(RabbitTemplate.class); 
   //存放WebSocket连接Map,根据用户id存放 
   public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap();
    //用户请求WebSocket服务端,执行的方法
     @Override 
     protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//约定用户第一次请求携带的数据:{"userId":"1"}
 //获取用户请求数据并解析 
 String json = msg.text(); 
 //解析json数据,获取用户id 
 String userId = MAPPER.readTree(json).get("userId").asText(); 
 //第一次请求的时候,需要建立WebSocket连接 Channel 
 channel = userChannelMap.get(userId); 
 if (channel == null) { 
 //获取WebSocket的连接 
 channel = ctx.channel(); 
 //把连接放到容器中
  userChannelMap.put(userId, channel); }
  //只用完成新消息的提醒即可,只需要获取消息的数量
   //获取RabbitMQ的消息内容,并发送给用户 
   RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate); 
   //拼接获取队列名称 
   String queueName = "article_subscribe_" + userId; 
   //获取Rabbit的Properties容器 
   Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
    //获取消息数量 
    int noticeCount = 0; 
    //判断Properties是否不为空
     if (queueProperties != null) { 
     // 如果不为空,获取消息的数量
      noticeCount = (int) queueProperties.get("QUEUE_MESSAGE_COUNT"); }
      //----------------------------------------------------- 
      //拼接获取队列名称 
      String userQueueName = "article_thumbup_" + userId; 
      //获取Rabbit的Properties容器
       Properties userQueueProperties = rabbitAdmin.getQueueProperties(userQueueName); 
       //获取消息数量
int userNoticeCount = 0; 
//判断Properties是否不为空 
if (userQueueProperties != null) { 
// 如果不为空,获取消息的数量 
userNoticeCount = (int) userQueueProperties.get("QUEUE_MESSAGE_COUNT"); }
//封装返回的数据
 HashMap countMap = new HashMap(); 
 countMap.put("sysNoticeCount", noticeCount);
  countMap.put("userNoticeCount", userNoticeCount);
   Result result = new Result(true, StatusCode.OK, "查询成功", countMap); 
   //把数据发送给用户 
   channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
    //把消息从队列里面清空,否则MQ消息监听器会再次消费一次 
    if (noticeCount > 0) { rabbitAdmin.purgeQueue(queueName, true); }
    if (userNoticeCount > 0) { rabbitAdmin.purgeQueue(userQueueName, true); }
    //为用户的消息通知队列注册监听器,便于用户在线的时候,
     //一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据 
     sysNoticeContainer.addQueueNames(queueName); 
     userNoticeContainer.addQueueNames(userQueueName); } }
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐