spring-amqp与SpringBoot整合
目的:spring-amqp与SpringBoot整合,实现direct模式的消息接收,exchange、queue、routing_key以及bind关系在spring容器启动时初始化,实现消息的自动接收和处理。一、创建AMQP连接@Beanpublic ConnectionFactory connectionFactory() {CachingConnect...
目的:spring-amqp与SpringBoot整合,实现direct模式的消息接收,exchange、queue、routing_key以及bind关系在spring容器启动时初始化,实现消息的自动接收和处理。
一、创建AMQP连接
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUri(AMQP_URI);
//配置通道缓存的大小(默认值为25)
connectionFactory.setChannelCacheSize(5);
return connectionFactory;
}
connectionFactory.setChannelCacheSize(5);指定保持的channel通道数量,默认值是25。查看CachingConnectionFactory源码如下:
@ManagedResource
public class CachingConnectionFactory extends AbstractConnectionFactory
implements InitializingBean, ShutdownListener, PublisherCallbackChannelConnectionFactory {
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
二、声明AmqpAdmin
使用AmqpAdmin初始化队列、交换器以及路由键。查看AmqpAdmin类源码介绍
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
spring容器启动时,会根据@Bean注解自动初始化一个AmqpAdmin对象
三、声明一个Direct类型的交换器
@Bean
DirectExchange exchange(AmqpAdmin amqpAdmin) {
DirectExchange exchange = new DirectExchange("direct_exchange", true, false);
amqpAdmin.declareExchange(exchange);
LOGGER.info("declare rabbit exchange [{}];", exchange);
return exchange;
}
Exchange类所在的包目录中,包含FanoutExchange、DirectExchange等其他的交换器类型,查看接口的实现类可以看出,声明是 Interface for all exchanges.
四、初始化消息处理类
对接收的消息做业务处理的类,需要实现ChannelAwareMessageListener接口的onMessage(Message message, Channel channel)方法。AbstractAdaptableMessageListener类介绍
@Bean
MessageReceive weyneEventListener() {
return new MessageReceive();
}
五、创建队列
@Bean
Queue listenerQueueBinding(
AmqpAdmin amqpAdmin) {
Queue queue = new Queue("direct_queue");
amqpAdmin.declareQueue(queue);
LOGGER.info("declare rabbit queue [{}];", queue);
return queue;
}
六、创建一个队列与exchange的绑定关系
@Bean
Binding initBindings(AmqpAdmin amqpAdmin, Queue queue, DirectExchange exchange){
Binding binding = BindingBuilder.bind(queue).to(exchange).with("direct_routing_key");
amqpAdmin.declareBinding(binding);
return binding;
}
创建Binding关系的另一种方法
Binding binding = new Binding(queueName,DestinationType,exchangeName,routing_key,arguments);
eg:
Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), "direct_routing_key", null);
最后
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
ConnectionFactory connectionFactory,
MessageReceive listener,
Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(queue);
container.setMessageListener(listener);
return container;
}
初始化一个MessageListener,用于监听指定的消息队列,当有消息来时,通过container.setMessageListener(listener);指定自己定义的消息处理类进行消息处理。
完整代码如下:
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
public class WeyneEventAutoConfiguration1 {
private static Logger LOGGER = LoggerFactory.getLogger(WeyneEventAutoConfiguration1.class);
private static final String AMQP_URI = "amqp://guest:guest@localhost:5672";
/**
* @ConditionalOnMissingBean 仅仅在当前上下文中不存在某个对象时,才会实例化
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUri(AMQP_URI);
//配置通道缓存的大小(默认值为25)
connectionFactory.setChannelCacheSize(5);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
DirectExchange exchange(AmqpAdmin amqpAdmin) {
DirectExchange exchange = new DirectExchange("direct_exchange", true, false);
amqpAdmin.declareExchange(exchange);
LOGGER.info("declare rabbit exchange [{}];", exchange);
return exchange;
}
@Bean
MessageReceive weyneEventListener() {
return new MessageReceive();
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
ConnectionFactory connectionFactory,
MessageReceive listener,
Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(queue);
container.setMessageListener(listener);
return container;
}
@Bean
Queue listenerQueueBinding(
AmqpAdmin amqpAdmin) {
Queue queue = new Queue("direct_queue");
amqpAdmin.declareQueue(queue);
LOGGER.info("declare rabbit queue [{}];", queue);
return queue;
}
@Bean
Binding initBindings(AmqpAdmin amqpAdmin, Queue queue, DirectExchange exchange){
Binding binding = BindingBuilder.bind(queue).to(exchange).with("direct_routing_key");
amqpAdmin.declareBinding(binding);
return binding;
}
}
程序测试建议:先启动以上配置类的项目,初始化队列、交换器,并生成bind关系,然后使用MQ发送类发送消息。
创建一个SpringBoot的Maven项目,引入spring-rabbit包
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
目录结构
MQ发送类
public class DirectSender {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String []args) throws IOException, TimeoutException {
//create rabbit connection factory
ConnectionFactory factory = new ConnectionFactory();
//crate a connect to rabbit server
Connection connection = factory.newConnection();
//create channel
Channel channel = connection.createChannel();
String message = "this is a direct message";
//send message to rabbit exchange
channel.basicPublish(EXCHANGE_NAME, "direct_routing_key", null, ("message2:"+message).getBytes());
channel.close();
factory.clone();
}
}
控制台显示
更多推荐
所有评论(0)