监听器并发、专属消费者、监听容器队列、调试

监听器并发

    在默认的情况下,监听容器将启动一个消费者从队列中接收消息。
    在前一章节的表格中,你看到一部份属性用于控制并发。其中最简单的一个是concurrentConsumers,它简单的创建固定数目的消费者,并发的处理消息。
    在1.3.0版本之前,这是唯一的一个设置,并且必须停止容器来改变这一设置。
    自从1.3.0版本以来,你可以动态的设置concurrentConsumers这个属性。如果在改变的时候容器正在运行,消费者会被添加到或者移除。
    另外添加了一个新的属性:maxConcurrentConsumers。容器会根据负载来动态的调整并发数目。这项工作需要另外的四个属性来协同完成: consecutiveActiveTrigger,  startConsumerMinInterval, consecutiveIdleTrigger, stopConsumerMinInterval。
     根据默认设置增加消费者工作如下:
如果 maxConcurrentConsumers数目没有达到,并且目前已经活动的消费者已经连续运行了10个循环并且距离最后一个消费者启动已经超过10秒钟,在这种情况下,一个新的消费者将被启动。一个消费者在txSize*receiveTimeout毫秒内至少接收到一条消息,就被认为是活跃的。
   根据默认设置减少消费者工作如下:
如果多于 concurrentConsumers个消费者在运行,如果一个消费者连续10次被探测空闲,并且最后上一个消费者已经停止了60秒,那么一个消费者将被停掉。超时时间依赖于receiveTimeout和txSize属性。一个消费者被定位空闲,如果他在txSize*receiveTimeout毫秒内没有接收到任何消息。根据默认参数计算,如果一个消费者空闲1*4*10=40秒,那么将停止消费者。
    注意:
实践中,如果如果整个容器空闲了一段时间,消费者才会被停止。因为消费者要在所有的消费者中分担工作。

专属消费者

    自从1.3版本开始,监听容器可以配置一个专属消费者。这样将阻止其他的容器从队列中消费,直到当前的消费者被取消。这个容器的concurrency参数必须设置为1。
    当使用专属消费者的时候,其他容器视图根据recoveryInterval间隔定期的尝试从队列中消费,如果尝试失败,将记录警告信息。

监听容器队列

    自从1.3版本以来,有很大改进在一个监听容器中处理多个队列。
    这个容器必须被配置至少监听一个队列,这是先前的情况。这个时候,队列可以在运行时进行添加和移除。当预定数目的消息被处理之后,容器会取消并重新创建消费者。参见方法 addQueues, addQueueNames,  removeQueuesand ,removeQueueNames。当移除队列时,容器中至少要有一个队列。
    如果任何一个队列可以获得,那么消费者就会启动,而先前是容器将会停止,如果任何一个队列不可获得。而现在是如果所有队列都不可获得,容器将停止。如果不是所有的队列都不可获得,那么容器将每个60秒钟视图声明丢失的队列。
    同样,如果消费者从消息代理中获得取消(例如队列被删除),那么消费者试图恢复,恢复的消费者将继续处理来自于其它配置的队列。先前,队列的取消将导致整个消费者的取消,最终由于丢失队列整个容器都会停掉。
    如果你想永久的删除一个队列,你应该在删除队列之前更新容器,避免之后从这个队列中消费。

调试

    Spring AMQP提供了扩展级别的日志,尤其是在调试级别。
    如果你想监控应用和代理之间的AMQP协议,你可以使用工具,例如WireShark,他有一个插件来解码协议。或者RabbitMQ的java客户端提供了Tracer类。当你运行main函数时,它默认监听5673端口,并且连接到本地的5672端口。运行它,并且配置一个本地连接端口为5673.它将在本地控制台上显示解码的协议。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐