RabbitMQ 处理unacked消息
原因:消费端由于没有确认消息,导致队列阻塞,这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。RabbitMQ提供了一种QOS(服务质量保证)功能,即在开启手动确认消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。 举例说明:可以理解为在consumer前面加了一个缓冲容器,容
·
原因:消费端由于没有确认消息,导致队列阻塞,这是RabbitMQ
的一种保护机制。防止当消息激增的时候,海量的消息进入consumer
而引发consumer
宕机。
RabbitMQ提供了一种QOS(服务质量保证)功能,即在开启手动确认消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。
举例说明:可以理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。
// 最小并发数
container.setConcurrentConsumers(1);
// 最大并发数
container.setMaxConcurrentConsumers(5);
// 削峰限流:消费端限流,一次处理一条数据。
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
//一次处理的消息数量
container.setPrefetchCount(2);
//参数为0,轮询发送处理的消息,一次一条
container.setPrefetchCount(0);
判断队列的阻塞风险
当ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。
channlCount就是由concurrency,max-concurrency决定的。
min = concurrency * prefetch * 节点数量
max = max-concurrency * prefetch * 节点数量
由此可以的出结论
unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
unacked_msg_count >= min 可能会出现堵塞。
unacked_msg_count >= max 队列一定阻塞。
更多推荐
已为社区贡献1条内容
所有评论(0)