这个类非常强大,我们可以对他做很多设置,对于消费者的配置项,这个类都可以满足
监听队列(多个队列)、自动启动、自动声明功能
可以设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
可以设置消费者数量、最大最小数量、批量消费
设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的转换器、消息转换器等
很多基于RabbitMQ的自制定化后端管控台在进行动态配置的时候,也是根据这一特性去实现的。
注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态
修改其消费者数量的大小、接收消息的模式等

package com.dwz.spring;

import java.util.UUID;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import com.rabbitmq.client.Channel;

@Configuration
@ComponentScan("com.dwz.spring.*")
public class RabbitMQConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setVirtualHost("/vhost_dwz");
        connectionFactory.setUsername("root_dwz");
        connectionFactory.setPassword("123456");
        return connectionFactory;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        System.err.println("RabbitAdmin启动了。。。");
        //设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置)
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
    
    /**
     *     针对消费者的配置
     * 1.设置交换机的类型
     * 2.将队列绑定到交换机
     * FanoutExchange:将消息分发到所有绑定的队列,无routingkey的概念
     * TopicExchange:多关键字匹配
     * HeadersExchange:通过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列
     */
    @Bean
    public TopicExchange exchange001() {
        return new TopicExchange("topic001", true, false);
    }
    
    @Bean
    public Queue queue001() {
        return new Queue("queue001", true);//队列持久化
    }
    
    @Bean
    public Binding binding001() {
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
    }
    
    @Bean
    public TopicExchange exchange002() {
        return new TopicExchange("topic002", true, false);
    }
    
    @Bean
    public Queue queue002() {
        return new Queue("queue002", true);//队列持久化
    }
    
    @Bean
    public Binding binding002() {
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
    }
    
    @Bean
    public TopicExchange exchange003() {
        return new TopicExchange("topic003", true, false);
    }
    
    @Bean
    public Queue queue003() {
        return new Queue("queue003", true);//队列持久化
    }
    
    @Bean
    public Binding binding003() {
        return BindingBuilder.bind(queue003()).to(exchange003()).with("mq.*");
    }
    
    @Bean
    public Queue queue_image() {
        return new Queue("image_queue", true);//队列持久化
    }
    
    @Bean
    public Queue queue_pdf() {
        return new Queue("pdf_queue", true);//队列持久化
    }
    
    /*
     *     简单消息监听容器
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //同时监听多个队列
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //设置当前的消费者数量
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        //设置是否重回队列
        container.setDefaultRequeueRejected(false);
        //设置自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置监听外露
        container.setExposeListenerChannel(true);
        //设置消费端标签策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        //设置消息监听
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody(), "utf-8");
                System.out.println("-----------消费者:" + msg);
            }
        });
        return container;
    }
}

@Test
public void testSendMessage2() throws Exception {
   //1 创建消息
   MessageProperties messageProperties = new MessageProperties();
   messageProperties.setContentType("text/plain");
   Message message = new Message("mq 消息1234".getBytes(), messageProperties);
   
   rabbitTemplate.send("topic001", "spring.abc", message);
   
   rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
   rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}

 

 

 

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐