目的:spring-amqp与SpringBoot整合,实现direct模式的消息接收,exchange、queue、routing_key以及bind关系在spring容器启动时初始化,实现消息的自动接收和处理。

一、创建AMQP连接

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setUri(AMQP_URI);
        //配置通道缓存的大小(默认值为25)
        connectionFactory.setChannelCacheSize(5);
        return connectionFactory;
    }

connectionFactory.setChannelCacheSize(5);指定保持的channel通道数量,默认值是25。查看CachingConnectionFactory源码如下:

@ManagedResource
public class CachingConnectionFactory extends AbstractConnectionFactory
		implements InitializingBean, ShutdownListener, PublisherCallbackChannelConnectionFactory {

	private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;

二、声明AmqpAdmin

使用AmqpAdmin初始化队列、交换器以及路由键。查看AmqpAdmin类源码介绍

@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
   return new RabbitAdmin(connectionFactory);
}

spring容器启动时,会根据@Bean注解自动初始化一个AmqpAdmin对象

三、声明一个Direct类型的交换器

    @Bean
    DirectExchange exchange(AmqpAdmin amqpAdmin) {
        DirectExchange exchange = new DirectExchange("direct_exchange", true, false);
        amqpAdmin.declareExchange(exchange);
        LOGGER.info("declare rabbit exchange [{}];", exchange);
        return exchange;
    }

Exchange类所在的包目录中,包含FanoutExchange、DirectExchange等其他的交换器类型,查看接口的实现类可以看出,声明是 Interface for all exchanges.

四、初始化消息处理类

对接收的消息做业务处理的类,需要实现ChannelAwareMessageListener接口的onMessage(Message message, Channel channel)方法。AbstractAdaptableMessageListener类介绍

@Bean
    MessageReceive weyneEventListener() {
        return new MessageReceive();
    }

五、创建队列

    @Bean
    Queue listenerQueueBinding(
            AmqpAdmin amqpAdmin) {
        Queue queue = new Queue("direct_queue");
        amqpAdmin.declareQueue(queue);
        LOGGER.info("declare rabbit queue [{}];", queue);
        return queue;
    }

六、创建一个队列与exchange的绑定关系

    @Bean
    Binding initBindings(AmqpAdmin amqpAdmin, Queue queue, DirectExchange exchange){
        Binding binding = BindingBuilder.bind(queue).to(exchange).with("direct_routing_key");
        amqpAdmin.declareBinding(binding);
        return binding;
    }

创建Binding关系的另一种方法

Binding binding = new Binding(queueName,DestinationType,exchangeName,routing_key,arguments);

eg:

Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), "direct_routing_key", null);

Binding与BindingBuilder源码解读

最后

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            ConnectionFactory connectionFactory,
            MessageReceive listener,
            Queue queue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(listener);
        return container;
    }

初始化一个MessageListener,用于监听指定的消息队列,当有消息来时,通过container.setMessageListener(listener);指定自己定义的消息处理类进行消息处理。

完整代码如下:

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
public class WeyneEventAutoConfiguration1 {

    private static Logger LOGGER = LoggerFactory.getLogger(WeyneEventAutoConfiguration1.class);
    private static final String AMQP_URI = "amqp://guest:guest@localhost:5672";

    /**
     * @ConditionalOnMissingBean 仅仅在当前上下文中不存在某个对象时,才会实例化
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setUri(AMQP_URI);
        //配置通道缓存的大小(默认值为25)
        connectionFactory.setChannelCacheSize(5);
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    DirectExchange exchange(AmqpAdmin amqpAdmin) {
        DirectExchange exchange = new DirectExchange("direct_exchange", true, false);
        amqpAdmin.declareExchange(exchange);
        LOGGER.info("declare rabbit exchange [{}];", exchange);
        return exchange;
    }

    @Bean
    MessageReceive weyneEventListener() {
        return new MessageReceive();
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            ConnectionFactory connectionFactory,
            MessageReceive listener,
            Queue queue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(listener);
        return container;
    }

    @Bean
    Queue listenerQueueBinding(
            AmqpAdmin amqpAdmin) {
        Queue queue = new Queue("direct_queue");
        amqpAdmin.declareQueue(queue);
        LOGGER.info("declare rabbit queue [{}];", queue);
        return queue;
    }

    @Bean
    Binding initBindings(AmqpAdmin amqpAdmin, Queue queue, DirectExchange exchange){
        Binding binding = BindingBuilder.bind(queue).to(exchange).with("direct_routing_key");
        amqpAdmin.declareBinding(binding);
        return binding;
    }
}

程序测试建议:先启动以上配置类的项目,初始化队列、交换器,并生成bind关系,然后使用MQ发送类发送消息。

创建一个SpringBoot的Maven项目,引入spring-rabbit包

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.0.0.RELEASE</version>
</dependency>

目录结构

MQ发送类

public class DirectSender {
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String []args) throws IOException, TimeoutException {
        //create rabbit connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //crate a connect to rabbit server
        Connection connection = factory.newConnection();
        //create channel
        Channel channel = connection.createChannel();
        String message = "this is a direct message";
        //send message to rabbit exchange
        channel.basicPublish(EXCHANGE_NAME, "direct_routing_key", null, ("message2:"+message).getBytes());
        channel.close();
        factory.clone();
    }
}

控制台显示

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐