在Java中使用Rabbitmq

使用SpringBoot组件Spring AMQP

添加依赖

<dependency>
    <!-- Spring AMQP-->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置消息转换器

Spring AMQP 默认使用 JDK 序列化方式,推荐替换为 JSON 序列化。在生产者和消费者启动类中配置:

@Bean
public MessageConverter jacksonToJsonMessageConverter() {
    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
    messageConverter.setCreateMessageIds(true); // 自动生成消息ID,保证消息唯一性
    return messageConverter;
}

配置后,RabbitTemplate 发送对象时会自动序列化为 JSON,消费者接收时自动反序列化。

配置文件

生产者配置文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: rabbitmqDemo
    password: rabbitmqDemo
    virtual-host: rabbitmqDemo
    connection-timeout: 1s # 设置MQ的链接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后初始等待时间
        multiplier: 1 # 失败后下次等待时间倍长,下次等待时间 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
    publisher-confirm-type: correlated # 开启publish confirm机制,并设置confirm类型 none: 关闭confirm机制; simple: 同步阻塞等待MQ的回执消息; correlated: MQ异步回调方式返回回执消息
    publisher-returns: true # 开启publish return机制
    

消费者配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: rabbitmqDemo
    username: rabbitmqDemo
    password: rabbitmqDemo
    listener:
      simple:
        prefetch: 1  # 预取消息数量,即最多推送多少条未确认消息给消费者,确认后才能获取下一条
        acknowledge-mode: auto # 消息确认机制 none: 关闭 auto: 自动 manual: 手动
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始失败等待时长
          multiplier: 1 # 失败等待时长倍数,下次等待时长 = 初始时长 * 倍数
          max-attempts: 3 # 最大失败重试次数
          stateless: true # true: 无状态; false: 有状态 如果业务中包含事务,改为false
        

定义交换机队列

两种方式,一种是通过配置Bean定义交换机队列,一种是通过注解定义交换机队列

配置类

@Configuration
public class MqConfig {

    // --- 队列 ---
    @Bean
    public Queue test1Queue() {
        return QueueBuilder.durable("test1.queue").build();
    }

    @Bean
    public Queue test2Queue() {
        return new Queue("test2.queue");
    }

    // --- 交换机 ---
    @Bean
    public DirectExchange test1DirectExchange() {
        return new DirectExchange("test1.direct.exchange", true, false);
    }

    @Bean
    public TopicExchange test1TopicExchange() {
        return new TopicExchange("test1.topic.exchange", true, false);
    }

    @Bean
    public FanoutExchange test1FanoutExchange() {
        return new FanoutExchange("test1.fanout.exchange", true, false);
    }

    // --- Binding ---
    // Direct Exchange:精确匹配 routing key
    @Bean
    public Binding test1DirectBinding(Queue test1Queue, DirectExchange test1DirectExchange) {
        return BindingBuilder.bind(test1Queue).to(test1DirectExchange).with("test1.key");
    }

    // Topic Exchange:通配符匹配(# 匹配0或多个单词,* 匹配1个单词)
    @Bean
    public Binding test1TopicBinding(Queue test1Queue, TopicExchange test1TopicExchange) {
        return BindingBuilder.bind(test1Queue).to(test1TopicExchange).with("test1.#");
    }

    // Fanout Exchange:广播,不需要 routing key
    @Bean
    public Binding test1FanoutBinding(Queue test1Queue, FanoutExchange test1FanoutExchange) {
        return BindingBuilder.bind(test1Queue).to(test1FanoutExchange);
    }
}

Fanout 广播模式示例(一对多绑定):

@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("demo.fanout2");
    }

    @Bean
    public Queue fanoutQueue3() {
        return new Queue("fanout.queue3");
    }

    @Bean
    public Queue fanoutQueue4() {
        return new Queue("fanout.queue4");
    }

    @Bean
    public Binding fanoutBinding(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue4, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue4).to(fanoutExchange);
    }
}

注解定义

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue4",declare = "true"),
    exchange = @Exchange(name = "annotation.direct",type = ExchangeTypes.DIRECT),
    key = {"key2","key3"}
))
public void listenToDirectQueue4CreateByAnnotation(String message){
    log.info("listenToDirectQueue4CreateByAnnotation 接收到消息: {}", message);
}

发送消息

发送字符串

@Test
public void testSendMessage() {
    String exchangeName = "test1.direct.exchange";
    String message = "这是一条测试消息";
    rabbitTemplate.convertAndSend(exchangeName, "test1.key", message);
}

发送对象(Map)

配置了 Jackson2JsonMessageConverter 后,可以直接发送对象,会自动序列化为 JSON:

@Test
public void testSendMapMessage() {
    Map<String, Object> map = new HashMap<>();
    map.put("name", "张三");
    map.put("age", 18);
    rabbitTemplate.convertAndSend("test1.direct.exchange", "test1.key", map);
}

Fanout 广播发送

Fanout 交换机会忽略 routing key,发送到所有绑定的队列:

@Test
public void testSendToFanoutExchange() {
    rabbitTemplate.convertAndSend("demo.fanout2", null, "广播消息");
}

Topic 通配符发送

@Test
public void testSendToTopicExchange() {
    // routing key 为 "1.1.b",会匹配 "test1.#" 但不会匹配 "test1.*.info"
    rabbitTemplate.convertAndSend("test1.topic.exchange", "1.1.b", "Topic消息");
}

如何保证Rabbitmq消息的可靠性

一共三个方面:生产者可靠;mq可靠;消费者可靠

生产者可靠

生产者重试机制 + 生产者确认机制(Publisher Confirm)+ 消息回退机制(Publisher Return)

生产者 Confirm 机制

配置 publisher-confirm-type: correlated 后,MQ 收到消息会异步回调通知生产者。通过 CorrelationData 绑定消息与回调:

@Test
public void testConfirmCallBack() {
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("回调失败: {}", ex.getMessage());
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            if (result.isAck()) {
                log.debug("消息发送成功,id: {}", cd.getId());
            } else {
                log.error("消息发送失败,id: {}, 原因: {}", cd.getId(), result.getReason());
            }
        }
    });
    rabbitTemplate.convertAndSend("demo.direct1", "key", "消息内容", cd);
}

生产者 Return 机制

配置 publisher-returns: true 后,如果消息从交换机无法路由到队列(routing key 不匹配),MQ 会回调通知生产者:

@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(returnedMessage -> log.debug(
            "消息丢失: exchange:{}, route:{}, replyCode:{}, replyText:{}, message:{}",
            returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
            returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
            returnedMessage.getMessage()));
    }
}

mq可靠

交换机持久化 + 队列持久化 + 消息持久化

消费者可靠

消费者确认机制

手动ACK示例

消费者配置 acknowledge-mode: manual,然后在消费方法中手动确认:

@RabbitListener(queues = "test1.queue")
public void listenManualAck(String message, Channel channel, Message msg) throws IOException {
    try {
        log.info("接收到消息: {}", message);
        // 处理业务逻辑...
        // 手动确认,第二个参数 false 表示不批量确认
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        log.error("消息处理失败", e);
        // 第三个参数 true 表示消息重新入队
        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
    }
}

消费者失败重试 + 错误队列

开启消费者重试后,如果重试次数耗尽消息仍然消费失败,默认会丢弃消息。可以通过配置 MessageRecoverer 将失败消息转发到错误队列,避免消息丢失:

@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {

    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.routing.key");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        // 重试耗尽后,将消息重新发布到错误交换机
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error.routing.key");
    }
}

Rabbitmq的延迟消息

两种实现方式,第一种是通过死信队列,设置消息过期时间;第二种是通过rabbitmq插件定义延迟消息交换机。

方式一:死信队列 + TTL

消息在队列中过期后,会被转发到死信交换机(Dead Letter Exchange),再路由到死信队列,消费者监听死信队列即可实现延迟消费。

原理:给普通队列设置 TTL 和死信交换机,消息过期后自动转发到死信队列。

配置类方式

@Configuration
public class DelayConfig {

    // 普通队列:设置 TTL 和死信交换机
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable("delay.queue")
                .ttl(10000) // 消息过期时间 10s
                .deadLetterExchange("DLX.direct")
                .deadLetterRoutingKey("DLX.key")
                .build();
    }

    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange("delay.direct");
    }

    @Bean
    public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key");
    }

    // 死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("DLX.direct");
    }

    // 死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue("DLX.queue");
    }

    @Bean
    public Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("DLX.key");
    }
}

也可以通过 withArgument 方式设置死信参数(效果相同):

@Bean
public Queue delayQueue() {
    return QueueBuilder.durable("delay.queue")
            .withArgument("x-dead-letter-exchange", "DLX.direct")
            .withArgument("x-dead-letter-routing-key", "DLX.key")
            .build();
}

发送和消费

// 发送消息到普通队列
rabbitTemplate.convertAndSend("delay.direct", "delay.key", "延迟消息");

// 监听死信队列,10s 后消费
@RabbitListener(queues = "DLX.queue")
public void listenDlx(String message) {
    log.info("延迟消费: {}", message);
}

单条消息设置 TTL

除了在队列级别设置 TTL,也可以对单条消息单独设置过期时间(但存在队头阻塞问题:队头消息 TTL 长,会阻塞后面短 TTL 的消息过期):

@Test
public void testDelayMessage() {
    rabbitTemplate.convertAndSend("delay.direct", "delay.key", "消息内容", message -> {
        message.getMessageProperties().setExpiration("10000"); // 单条消息 TTL 10s
        return message;
    });
}

缺点:每种延迟时间需要一个独立队列,或者存在队头阻塞问题。

方式二:延迟消息插件(推荐)

安装 rabbitmq_delayed_message_exchange 插件后,可以使用 x-delayed-message 类型的交换机,消息在交换机层面延迟投递,支持任意延迟时间。

注解方式定义

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue2"),
        exchange = @Exchange(name = "delay.direct2", type = ExchangeTypes.DIRECT, delayed = "true"),
        key = "delay.key"
))
public void listenDelayQueue2(String message) {
    log.info("listenDelayQueue2 接收到消息: {}", message);
}

注意 @Exchange 注解中 delayed = "true",标识这是一个延迟交换机。

配置类方式定义

@Configuration
public class DelayedMqConfig {

    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable("delayed.queue").build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.key");
    }
}

发送延迟消息

通过 setDelay 设置延迟时间(毫秒):

@Test
public void testSendMessageWithPlugin() {
    rabbitTemplate.convertAndSend("delay.direct2", "delay.key", "插件延迟消息", message -> {
        message.getMessageProperties().setDelay(10000); // 延迟 10s
        return message;
    });
}

两种方式对比

对比项 死信队列 + TTL 延迟消息插件
实现原理 消息在队列中过期后转发到死信队列 消息在交换机中延迟后直接投递
延迟精度 每种延迟时间需要独立队列 支持任意延迟时间
队头阻塞 单条消息 TTL 存在此问题 无此问题
依赖 无需额外插件 需安装 rabbitmq_delayed_message_exchange 插件
推荐 简单场景 复杂延迟场景

更多推荐