RabbitMQ实现(并发)多线程处理消息
@RabbitListener默认是单线程监听队列(@RabbitListener(queues = {MqConstants.OA_TAB_REFRESH}, concurrency = "10") 也能实现十个并发)缺点 : 单线程处理消息, 当消息队列有多个任务时消费端监听队列每次只消费一个消息 , 容易引起消息堆积 , 处理效率慢…解决方法 : 多线程处理消息 , 配置mq容器工厂参数 ,
·
- @RabbitListener默认是单线程监听队列(@RabbitListener(queues = {MqConstants.OA_TAB_REFRESH}, concurrency = "10") 也能实现十个并发)
- 缺点 : 单线程处理消息, 当消息队列有多个任务时消费端监听队列每次只消费一个消息 , 容易引起消息堆积 , 处理效率慢…
- 解决方法 : 多线程处理消息 , 配置mq容器工厂参数 , 增加并发处理量 .
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
// 声明交换机
// 声明队列
// 队列绑定交换机,指定routingKey
...
//并发数量
public static final int DEFAULT_CONCURRENT = 10;
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
configurer.configure(factory, connectionFactory);
return factory;
}
}
@Component
public class ReceiveHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveHandler.class);
// 在@RabbitListener注解中指定容器工厂
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL},
containerFactory = "customContainerFactory")
//@RabbitListener(queues = {MqConstants.OA_TAB_REFRESH}, concurrency = "10") 也能实现十个并发
public void send_email(String msg,Message message,Channel channel){
LOGGER.info("receive message is:"+msg);
// work
...
}
}
更多推荐
已为社区贡献7条内容
所有评论(0)