• 第一种方式是进入容器内部去安装

       首先下载镜像和启动

  rabbitmq:
    restart: always
    image: rabbitmq:management
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    networks:
      default:
        ipv4_address: 172.18.0.5
    environment:
      TZ: Asia/Shanghai
    volumes:
      - ./rabbitmq/data:/var/lib/rabbitmq
      - /etc/localtime:/etc/localtime

      登录web页面查看mq的版本 

      官方网站下载对应版本的插件

插件下载地址:https://www.rabbitmq.com/community-plugins.html
找到rabbitmq_delayed_message_exchange下载
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

      将插件上传到服务器

     将插件复制到rabbitmq容器内,进入容器安装插件

拷贝至docker容器内
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq容器ID:/plugins
进入docker容器内
docker exec  -it rabbitmq  bash
赋予权限
chmod 777 /plugins/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
启动延时插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

验证是否成功, Exchanges的type多了一个x-delayed-message

这种方式容器删除后需要重新配置一遍,可以使用第二种方式

  • 使用DockerFile的方式进行安装

自己做一个含有rabbitmq_delayed_message_exchange插件的rabbitmq的镜像,以后都使用这个镜像启动rabbitmq

下载插件上传到服务器,编写Dockerfile

from rabbitmq:management
COPY ["rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez" , "/plugins/"]
RUN chmod 777 /plugins/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

构建镜像

我们取名为rabbitmq:delayed
docker build -t rabbitmq:delayed .

查看

使用

上点java代码测试

spring:
  rabbitmq:
    host: xxx # rabbitmq的连接地址
    port: 5672 # rabbitmq的连接端口号
package cc.sunni.rabbitmqdelayedmessagedemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedMessageConfig {
    public final static String QUEUE_NAME = "delayed.live.queue";
    public final static String EXCHANGE_NAME = "delayed.live.exchange";

    /**
     * 声明队列
     */
    @Bean
    public Queue queue() {
        return new Queue(DelayedMessageConfig.QUEUE_NAME);
    }

    /**
     * 声明交换机
     */
    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new CustomExchange(DelayedMessageConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    /**
     * 绑定队列到交换器
     */
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedMessageConfig.QUEUE_NAME).noargs();
    }
}
package cc.sunni.rabbitmqdelayedmessagedemo.component;

import cc.sunni.rabbitmqdelayedmessagedemo.config.DelayedMessageConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class DelayedMessageSender {
    private final Logger log = LoggerFactory.getLogger(DelayedMessageSender.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg, Integer delaySeconds) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("发送时间:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedMessageConfig.EXCHANGE_NAME, DelayedMessageConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 给消息设置延迟毫秒值
                message.getMessageProperties().setHeader("x-delay", delaySeconds * 1000);
                return message;
            }
        });
    }
}
package cc.sunni.rabbitmqdelayedmessagedemo.component;

import cc.sunni.rabbitmqdelayedmessagedemo.config.DelayedMessageConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@RabbitListener(queues = DelayedMessageConfig.QUEUE_NAME)
public class DelayedMessageReceiver {
    public static final Logger log = LoggerFactory.getLogger(DelayedMessageReceiver.class);

    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("接收时间:" + sdf.format(new Date()));
        log.info("消息内容:" + msg);
    }
}
package cc.sunni.rabbitmqdelayedmessagedemo.controller;

import cc.sunni.rabbitmqdelayedmessagedemo.component.DelayedMessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MqController {
    public static final Logger log = LoggerFactory.getLogger(MqController.class);

    @Autowired
    private DelayedMessageSender sender;

    @PostMapping("/mq/{message}/{delay}")
    public String messageWithMQ(@PathVariable(value = "message") String message,
                                @PathVariable(value = "delay") Integer delay) {
        log.info("Send: " + message);
        sender.send(message, delay);
        return "success";
    }

}

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐