点击上方“Java基基”,选择“设为星标”

做积极的人,而不是积极废人!

每天 14:00 更新文章,每天掉亿点点头发...

源码精品专栏

 

来源:blog.csdn.net/chenping1993/

article/details/114580954

8b547ecc9ce267446499d88c23ae6691.jpeg


这里介绍一下RabbitMQ重复消费的场景,以及如何解决消息重复消费的问题。

注:本文只做粗略逻辑实现借鉴,实际业务场景需根据实际情况再做细化处理。

消息重复消费

什么是消息重复消费?

首先我们来看一下消息的传输流程。消息生产者-->MQ-->消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在两个阶段:

1、生产者多发送了消息给MQ;

2、MQ的一条消息被消费者消费了多次。

第一种场景很好控制,只要保证消息生成者不重复发送消息给MQ即可。

我们着重来看一下第二个场景。

MQ的一条消息被消费者消费了多次

在保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

重复消费场景重现测试

1、消息发送者发送1万条消息给MQ

@GetMapping("/rabbitmq/sendToClient")
public String sendToClient() {
    String message = "server message sendToClient";
    for (int i = 0; i < 10000; i++) {
        amqpTemplate.convertAndSend("queueName3",message+": "+i);

    }
    return message;
}

启动消息发送服务,调用接口发送消息,mq成功收到1万条消息。

2、消费者监听消费消息

@RabbitListener(queues = "queueName3")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(String message) {
    System.out.println("接收者2--接收到queueName3队列的消息为:"+message);
}

启动消费者服务,然后中断消费服务,此时消费到了第7913个消息:

22c3cfc9f82fb7df634c93dd89b8f6ac.png

此时查看MQ的消息,现在MQ队列中应该还有2087个消息,但还有2088个消息,说明最后一个消息被消费了没有被MQ服务确认。

1d594587b39e1a3b23ea0a71c9df86a9.png

再次启动消费者服务,消息从第7913个消息开始消费,而不是第7914个消息

34a46ec59b9311ab20ec133405c9a3bc.png

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro

  • 视频教程:https://doc.iocoder.cn/video/

如何解决消息重复消费的问题

为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下:

  • 消费者监听到消息后获取id,先去查询这个id是否存中

  • 如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)

  • 如果存在则丢弃此消息

编码

消息生产者服务:

/**
 * @Description:  发送消息 模拟消息重复消费
 *      消息重复消费情景:消息生产者已把消息发送到mq,消息消费者在消息消费的过程中突然因为网络原因或者其他原因导致消息消费中断
 *      消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,
 *      为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息
 * @param:
 * @return: java.lang.String
 * @Author: chenping
 */
@GetMapping("/rabbitmq/sendMsgNoRepeat")
public String sendMsgNoRepeat() {
    String message = "server message sendMsgNoRepeat";
    for (int i = 0; i <10000 ; i++) {
        Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
        amqpTemplate.convertAndSend("queueName4",msg);
    }
    return message;
}

消息消费者服务:

方案1:将id存入string中(单消费者场景):

这样一个队列,redis数据只有一条,每次消息过来都覆盖之前的消息,但是消费者多的情况不适用,可能会存在问题--一个消息被多个消费者消费

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");

    String messageRedisValue = redisUtil.get("queueName4","");
    if (messageRedisValue.equals(messageId)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);

    redisUtil.set("queueName4",messageId);//以队列为key,id为value
}

方案2:将id存入list中(多消费者场景)

这个方案可以解决多消费者的问题,但是随着mq的消息增加,redis数据越来越多,需要去清除redis数据。

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");

    List<String> messageRedisValue = redisUtil.lrange("queueName4");
    if (messageRedisValue.contains(messageId)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);

    redisUtil.lpush("queueName4",messageId);//存入list
}

方案3:将id以key值增量存入string中并设置过期时间:

以消息id为key,消息内容为value存入string中,设置过期时间(可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");

    String messageRedisValue = redisUtil.get(messageId,"");
    if (msg.equals(messageRedisValue)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);

    redisUtil.set(messageId,msg,10L);//以id为key,消息内容为value,过期时间10分钟
}

解决消息重复消费测试:

首先,启动消息生成服务,发送一万条消息:

7dbe53d10f71accc6d487a67c013f0a3.png

启动消息消费服务,然后中断服务,消费了1934条消息:

0938a87844d7101ce54cd7a155f4387a.png

查看未被消费的消息条数为8067条,多了一条(10000-1934=8066 ):

909b7bcd61c5817f4dbae3fdd8a9b68e.png

再次启动消费者服务,消费者舍弃了已被消费的第1934条消息

8eaadebdc1106d2d0bf133d10fd4e92b.png

欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

72ffca7692b61c0624677e59734e8c42.png

已在知识星球更新源码解析如下:

712189832c4dd5174f39b04e9aa52a17.jpeg

a9513b7cc188f9cb38763141a577ecba.jpeg

05482dcfc5a2181c8c3c9f224cd9e9dc.jpeg

ddca5450adc9b9afbc2ce92d43847edf.jpeg

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 6W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

文章有帮助的话,在看,转发吧。
谢谢支持哟 (*^__^*)
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐