之前博客写了 RabbitMQ 延迟队列的实现,是通过死信队列与转发队列配合实现延时消息的功能
本篇将利用RabbitMQ的延时队列插件,实现延时功能,下面分两部分介绍怎么实现延时功能

1.用Docker安装 RabbitMQ和延时插件

插件下载地址:https://www.rabbitmq.com/community-plugins.html
在这里插入图片描述

安装方式有两种:
  1.使用Dockerfile 做成镜像,运行镜像即可,简单方便
  2.启动rabbitmq容器,拿到容器Id,然后把插件复制进去,再启用插件

第一种方式(先下载好插件,放入Dockerfile文件所在目录):
 1.Dockerfile内容
   FROM rabbitmq:3.8.2-management
   COPY ["rabbitmq_delayed_message_exchange-3.8.0.ez" , "/plugins/"]
   RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 2.构建:docker build -t rabbitmq:3.8.2-management .
 3.运行:docker run -it -d --hostname my-rabbit --name rabbitmq -p 15672:15672 -p 5672:5672 -v /data/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin123456 rabbitmq:3.8.2-management

 
第二种方式:
  1.先运行rabbitmq: docker run -it -d --hostname rabbit-host --name rabbitmq -v /data/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=写你想要的名字-e RABBITMQ_DEFAULT_PASS=写你想要的密码 -p 15672:15672 -p 5672:5672 rabbitmq:3.8.2-management
  2.登录RabbitMq客户端  http://自己的地址:15672/
  3.把插件上传到服务器目录
  4.查看容器的ID:docker ps
  5.把插件复制到容器:
    格式:docker cp 本地文件路径 counterID全称:容器路径
    docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 982b190bf852:/plugins

  6.进入容器:docker exec -it rabbitmq /bin/bash ,查看插件是否复制进去了
  7.激活插件:cd /plugins ,rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  8.重启容器:docker restart rabbitmq
  9.进入15672后台,可以看到rabbitMq客户端就会出现消息延迟队列的类型:delayedExchange 队列

我使用第一种方式,在本地打包了一个镜像,运行该镜像后访问 http://localhost:15672,在Exchanges–下面的Add a new exchanges — type下拉框中看到 x-delayed-message 就说明插件安装、启动成功了
在这里插入图片描述

2.使用SpringBoot项目测试延时队列和普通队列
  • 1.引入依赖
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
  • 2.添加rabbitmq 地址和帐号
spring:
  rabbitmq:
    host: 192.168.68.134
    port: 5672
    username: admin
    password: admin123456
    listener:
      direct:
        acknowledge-mode: MANUAL
      simple:
        acknowledge-mode: MANUAL
  • 3.配置队列信息
@Configuration
public class RabbitConfig {

    // 支付超时延时交换机
    public static final String Delay_Exchange_Name = "delay.exchange";

    //exchange name
    public static final String Default_Exchange_Name = "test.exchange";

    //微信支付后,发送支付队列
    public static final String Order_Pay_Queue_Name = "order_pay";

    //超时订单关闭队列
    public static final String Timeout_Trade_Queue_Name = "close_trade";

    @Bean
    public Queue orderPayQueue(){
        return new Queue(RabbitConfig.Order_Pay_Queue_Name , true);
    }

    @Bean
    public Queue delayPayQueue() {
        return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true);
    }


    // 定义广播模式的延时交换机 无需绑定路由
    @Bean
    FanoutExchange delayExchange(){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args);
        topicExchange.setDelayed(true);
        return topicExchange;
    }
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(RabbitConfig.Default_Exchange_Name, true, false);
    }


    // 绑定延时队列与交换机
    @Bean
    public Binding delayPayBind() {
        return BindingBuilder.bind(delayPayQueue()).to(delayExchange());
    }

    // 绑定普通消息队列
    @Bean
    public Binding orderPayBind(){
        return BindingBuilder.bind(orderPayQueue()).to(defaultExchange()).with(RabbitConfig.Order_Pay_Queue_Name);
    }

    // 定义消息转换器
    @Bean
    Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    // 定义消息模板用于发布消息,并且设置其消息转换器
    @Bean
    RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
    @Bean
    RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

  • 4.添加生产者
@Component
public class RabbitmqPublish {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送普通消息
     * @param routingKey
     * @param content
     */
    public void sendMsg(String routingKey, String content){
        rabbitTemplate.convertAndSend(RabbitConfig.Default_Exchange_Name , routingKey , content);
    }

    /**
     * 发送延时信息
     * @param content 内容
     * @param routingKey   routingKey
     * @param delay   延时时间,秒
     */
    public void sendTimeoutMsg(String content , String routingKey, int delay){
        // 通过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列
        rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, routingKey, content, message ->{
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            message.getMessageProperties().setDelay(delay * 1000);   // 毫秒为单位,指定此消息的延时时长
            return message;
        });
    }
}
  • 5.添加消费者
@Component
public class RabbitmqConsumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    //消费延时消息
    @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name)
    public void process(String content, Message message, Channel channel) throws IOException {
        try {
            logger.info("延迟队列的内容[{}]", content);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            logger.info("超时信息处理完毕");
        } catch (Exception e) {
            logger.error("处理失败:{}", e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    //消费普通消息
    @RabbitListener(queues = RabbitConfig.Order_Pay_Queue_Name)
    public void process1(String content, Message message, Channel channel) throws IOException {
        try {
            logger.info("普通队列的内容[{}]", content);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("处理失败:{}", e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}
  • 6.添加一个控制器,发送队列消息,看控制台结果,发了3个延时消息,分别延时 40秒、20秒、60秒,看那个先被消费,再发送了两个普通消息,测试结果请看下面的截图
/**
 * @Author admin
 * @Date
 */
@RestController
public class IndexController {

    @Autowired
    RabbitmqPublish rabbitmqPublish;

    @RequestMapping("/send")
    public String send() {
        //发送多个延时消息
        rabbitmqPublish.sendTimeoutMsg("hello1" , "routingKey1" ,40);
        rabbitmqPublish.sendTimeoutMsg("hello2" , "routingKey2" ,20);
        rabbitmqPublish.sendTimeoutMsg("hello3" , "routingKey1" ,60);

        //发送普通消息
        rabbitmqPublish.sendMsg(RabbitConfig.Order_Pay_Queue_Name , "weixin");
        rabbitmqPublish.sendMsg(RabbitConfig.Order_Pay_Queue_Name , "alipay");

        return "success";
    }
}

可以看到普通消息先被消费,后面延时消息第一个被消费的是 延时20秒的 hello2,测试成功
在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐