问题复现:

docker 下清空对应的队列信息

exec rabbitmq /bin/bash #rabbitmq是容器名,也可以指定为Id
rabbitmqctl purge_queue queue.order # queue.order是队列名

RabbitMQ控制台新建消息,等待消费者消费
http://localhost:15672
在这里插入图片描述

前置知识:

一、 @RabbitListener的使用

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

1.1 作用在方法上

指定队列名,多个方法使用@RabbitListener,可以绑定多个消费实现

  • 指定队列的消费方法
    普通用法
    @RabbitListener(queues = "queue.order")
    public void handleMsg(String msg) {
    }
    

1.2 作用在类上

指定队列名,在上配置@RabbitListener 指定队列名 。
@RabbitHandler绑定在类中的多个方法上,视为队列名的多个消费实现

  • 需要配合 @RabbitHandler

    @RabbitListener(queues = "queue.order")
    public class PayOrderUpdateListener {
    	//监听队列的消费方法之一
    	@RabbitHandler
    	public void handlerMsg(String msg) {
    	}
    }
    
  • 注解可选参数: @RabbitListener(isDefault = true),这个参数值得注意

    @RabbitHandler(queues = "queue.order", isDefault = true)
    public void handleMsg(String msg) {
    }
    

二、 消息的封装类Message ,及解析规则

@RabbitHandler(queues = "queue.order")
public void handleMsg(Message msg) {
}
  • 关键属性content_type, 用于约束body内容
  1. http://localhost:15672 进入RabbitMQ的控制台
  2. 设置消息的content_type
    在这里插入图片描述
  3. content_type 的常见取值
content_typehandler的可接收形参
text/plainString
application/json/
application/x-java-serialized-object实体类
application/octet-streambyte[]
未指定byte[]
未指定org.springframework.amqp.core.Message

未指定 content_type 的时候,可以用byte[]接收,
值得一提的是 Message 可用于接收所有可能的序列化场景,都不会报错。

异常捕获

先分析错误,再分析死循环

【异常1】:No method found for class

在这里插入图片描述
意思是消费者已经准备消费,队列里存在消息,但是找不到指定的消费实现方法来处理。

异常1问题:为什么会找不到消费实现?

  1. @RabbitListener@RabbitHandler 配置出错
    很大原因是取决于content_type 的配置和 方法的形参
    如果通过客户端放入队列中有个content_type为空的的消息,@RabbitListener只有形参为String 的Handler,是无法对应上消费实现的。
  2. @RabbitHandler 没有使用可选参数isDefault
    消费者找不到任何一个消费实现,就回去找isDefault = truehandler,类似一个兜底策略。

异常1问题:处理思路

  1. 使用Message 作为方法形参
  2. 尽量将@RabbitListener 放在类上, 使用@RabbitHandler(isDefault = true) 做兜底策略

异常1分析 :死循环分析

这是一种应用级别的死循环,消息找不到消费实现,一直重试直到找到消费实现。这种死循环原因是配置失误,要在源头避免,测试阶段就要消灭。【找到消除这种死循环的方法再来填坑】。另外一种必须处理的死循环是已经找到消费实现,但是在消费的过程中造成死循环,见异常2:


【异常2】:消费过程中抛出未捕获Exception

通常是业务逻辑导致的异常NullPointerException,无脑的做法是try-catch,处理不当依旧会造成死循环。

异常2问题:try-catch后仍然会死循环

详细分析
这里简要概括下:RabbitMQ 默认的异常策略是不断重试除非抛出了fatal类型的异常,这种异常类型如下

异常类
MessageConversionException
MessageConversionException
MethodArgumentNotValidException
MethodArgumentTypeMismatchException
NoSuchMethodException
ClassCastException

类似于NullPointerException,是不再抛弃重试的范围的,也就是不主动捕获并处理,RabbitMQ会一直尝试消费该消息,导致死循环,从而使大量CPU资源被占用。

异常2处理: 终止死循环

最简单的办法就是在catch语句块中抛出fatal 类型的异常,如

    @RabbitHandler
    public void handlerMsg(String msg) {
        try { // try 住语句块 抛出致命异常
            //1.获取字符串转成map
            Map<String, String> map = JSON.parseObject(msg, Map.class);
            String out_trade_no = map.get("out_trade_no");
            if (map != null && map.get("return_code").equalsIgnoreCase("SUCCESS")) {
            }
        } catch (Exception e) {
            e.printStackTrace();
            // todo  解决消息队列重复试错的bug 抛出一个致命异常就会抛弃消费这个消息
            throw new MessageConversionException("消息消费失败,移出消息队列,不再试错");
        }
    }

更健壮的方法:使用异常处理器 + 死信队列
参考网上的资料,比较全面

解决方案

  1. 测试阶段暴力抛出fatal异常,终止死循环
  2. 线上不暴力抛出fatal异常,先设置重试次数,详细参数
  3. 超过重试次数,把消息加入死信队列,或者持久化到db,从消息队列暂时移除
  4. 根据失败的消息做预警,微信通知API 或者其他方式触发人工干预

参考

RabbitMQ的术语与参数配置
@RabbitListener 详解
@RabbitListener 详解, @Payload 和 @headers
MessageConvert 序列化相关
Spring Boot2.0 下 RabbitMQ的异常处理

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐