原因:消费端由于没有确认消息,导致队列阻塞,这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。

       RabbitMQ提供了一种QOS(服务质量保证)功能,即在开启手动确认消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。

  举例说明:可以理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。

// 最小并发数
container.setConcurrentConsumers(1);
// 最大并发数
container.setMaxConcurrentConsumers(5);
// 削峰限流:消费端限流,一次处理一条数据。
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
//一次处理的消息数量
container.setPrefetchCount(2);

//参数为0,轮询发送处理的消息,一次一条 
container.setPrefetchCount(0);
			

判断队列的阻塞风险

当ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。

channlCount就是由concurrency,max-concurrency决定的。

min = concurrency * prefetch * 节点数量
max = max-concurrency * prefetch * 节点数量
由此可以的出结论
unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
unacked_msg_count >= min 可能会出现堵塞。
unacked_msg_count >= max 队列一定阻塞。


 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐