前言

上一篇文章介绍了springboot如何整合RabbitMQ:springboot集成RabbitMQ_龙池小生的博客-CSDN博客

这里介绍一下RabbitMQ重复消费的场景,以及如何解决消息重复消费的问题。

注:本文只做粗略逻辑实现借鉴,实际业务场景需根据实际情况再做细化处理。

目录

消息重复消费:

MQ的一条消息被消费者消费了多次:

重复消费场景重现测试:

如何解决消息重复消费的问题:

编码:

解决消息重复消费测试:


消息重复消费:

什么是消息重复消费?首先我们来看一下消息的传输流程。消息生产者--》MQ--》消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在两个阶段1、生产者多发送了消息给MQ;2、MQ的一条消息被消费者消费了多次。第一种场景很好控制,只要保证消息生成者不重复发送消息给MQ即可。我们着重来看一下第二个场景。

MQ的一条消息被消费者消费了多次

在保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,
为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

重复消费场景重现测试

1、消息发送者发送1万条消息给MQ:

    @GetMapping("/rabbitmq/sendToClient")
    public String sendToClient() {
        String message = "server message sendToClient";
        for (int i = 0; i < 10000; i++) {
            amqpTemplate.convertAndSend("queueName3",message+": "+i);

        }
        return message;
    }

启动消息发送服务,调用接口发送消息,mq成功收到1万条消息。

2、消费者监听消费消息:

    @RabbitListener(queues = "queueName3")//发送的队列名称     @RabbitListener注解到类和方法都可以
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("接收者2--接收到queueName3队列的消息为:"+message);
    }

启动消费者服务,然后中断消费服务,此时消费到了第7913个消息:

此时查看MQ的消息,现在MQ队列中应该还有2087个消息,但还有2088个消息,说明最后一个消息被消费了没有被MQ服务确认。

再次启动消费者服务,消息从第7913个消息开始消费,而不是第7914个消息

如何解决消息重复消费的问题:

为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下

1、消费者监听到消息后获取id,先去查询这个id是否存中

2、如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)

3、如果存在则丢弃此消息

编码:

消息生产者服务

    /**
     * @Description:  发送消息 模拟消息重复消费
     *      消息重复消费情景:消息生产者已把消息发送到mq,消息消费者在消息消费的过程中突然因为网络原因或者其他原因导致消息消费中断
     *      消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,
     *      为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息
     * @param:
     * @return: java.lang.String
     * @Author: chenping
     * @Date: 2021/3/5 17:25
     */
    @GetMapping("/rabbitmq/sendMsgNoRepeat")
    public String sendMsgNoRepeat() {
        String message = "server message sendMsgNoRepeat";
        for (int i = 0; i <10000 ; i++) {
            Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
            amqpTemplate.convertAndSend("queueName4",msg);
        }
        return message;
    }

消息消费者服务

方案1:将id存入string中(单消费者场景):

这样一个队列,redis数据只有一条,每次消息过来都覆盖之前的消息,但是消费者多的情况不适用,可能会存在问题--一个消息被多个消费者消费

    @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
    @RabbitHandler
    public void receiveMessage(Message message) throws UnsupportedEncodingException {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"utf-8");

        String messageRedisValue = redisUtil.get("queueName4","");
        if (messageRedisValue.equals(messageId)) {
            return;
        }
        System.out.println("消息:"+msg+", id:"+messageId);

        redisUtil.set("queueName4",messageId);//以队列为key,id为value
    }

方案2:将id存入list中(多消费者场景)

这个方案可以解决多消费者的问题,但是随着mq的消息增加,redis数据越来越多,需要去清除redis数据。

    @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
    @RabbitHandler
    public void receiveMessage1(Message message) throws UnsupportedEncodingException {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"utf-8");

        List<String> messageRedisValue = redisUtil.lrange("queueName4");
        if (messageRedisValue.contains(messageId)) {
            return;
        }
        System.out.println("消息:"+msg+", id:"+messageId);

        redisUtil.lpush("queueName4",messageId);//存入list
    }

方案3:将id以key值增量存入string中并设置过期时间:

以消息id为key,消息内容为value存入string中,设置过期时间(可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)

    @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
    @RabbitHandler
    public void receiveMessage2(Message message) throws UnsupportedEncodingException {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"utf-8");

        String messageRedisValue = redisUtil.get(messageId,"");
        if (msg.equals(messageRedisValue)) {
            return;
        }
        System.out.println("消息:"+msg+", id:"+messageId);

        redisUtil.set(messageId,msg,10L);//以id为key,消息内容为value,过期时间10分钟
    }

 

解决消息重复消费测试:

首先,启动消息生成服务,发送一万条消息

启动消息消费服务,然后中断服务,消费了1934条消息

查看未被消费的消息条数为8067条,多了一条(10000-1934=8066 )

再次启动消费者服务,消费者舍弃了已被消费的第1934条消息

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐