RabbitMQ 简单消息监听容器--SimpleMessageListenerContainer 笔记
这个类非常强大,我们可以对他做很多设置,对于消费者的配置项,这个类都可以满足监听队列(多个队列)、自动启动、自动声明功能可以设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等可以设置消费者数量、最大最小数量、批量消费设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数设置消费者标签生成策略、是否独占模式、消费者属性等设置具体的转换器、消息转换...
·
这个类非常强大,我们可以对他做很多设置,对于消费者的配置项,这个类都可以满足
监听队列(多个队列)、自动启动、自动声明功能
可以设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
可以设置消费者数量、最大最小数量、批量消费
设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的转换器、消息转换器等
很多基于RabbitMQ的自制定化后端管控台在进行动态配置的时候,也是根据这一特性去实现的。
注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态
修改其消费者数量的大小、接收消息的模式等
package com.dwz.spring;
import java.util.UUID;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import com.rabbitmq.client.Channel;
@Configuration
@ComponentScan("com.dwz.spring.*")
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setVirtualHost("/vhost_dwz");
connectionFactory.setUsername("root_dwz");
connectionFactory.setPassword("123456");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
System.err.println("RabbitAdmin启动了。。。");
//设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置)
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
/**
* 针对消费者的配置
* 1.设置交换机的类型
* 2.将队列绑定到交换机
* FanoutExchange:将消息分发到所有绑定的队列,无routingkey的概念
* TopicExchange:多关键字匹配
* HeadersExchange:通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true);//队列持久化
}
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
@Bean
public TopicExchange exchange002() {
return new TopicExchange("topic002", true, false);
}
@Bean
public Queue queue002() {
return new Queue("queue002", true);//队列持久化
}
@Bean
public Binding binding002() {
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
}
@Bean
public TopicExchange exchange003() {
return new TopicExchange("topic003", true, false);
}
@Bean
public Queue queue003() {
return new Queue("queue003", true);//队列持久化
}
@Bean
public Binding binding003() {
return BindingBuilder.bind(queue003()).to(exchange003()).with("mq.*");
}
@Bean
public Queue queue_image() {
return new Queue("image_queue", true);//队列持久化
}
@Bean
public Queue queue_pdf() {
return new Queue("pdf_queue", true);//队列持久化
}
/*
* 简单消息监听容器
*/
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//同时监听多个队列
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//设置当前的消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
//设置是否重回队列
container.setDefaultRequeueRejected(false);
//设置自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置监听外露
container.setExposeListenerChannel(true);
//设置消费端标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//设置消息监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody(), "utf-8");
System.out.println("-----------消费者:" + msg);
}
});
return container;
}
}
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
更多推荐
已为社区贡献1条内容
所有评论(0)