1、前言

SpringBoot整合RabbitMQ非常简单,官网有对应的starter,可以自动装配。本文使用概述了最简单的MQ信息发送和接收。

2、环境安装

首先得有一个RabbitMQ的环境,我是用了docker安装,非常简单。点击 Queues、Exchanges 栏目,即可看到我们创建好的队列、交换机。

3、pom引入

        <!--RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

4、application.yml配置

spring 
 rabbitmq:  #mq配置
    host: 47.96.000.000
    port: 5672
    username: guest
    password: guest
    listener:
      concurrency: 10
      max-concurrency: 20
      prefetch: 5

mq:
  env: local
  exchange:
      name: ${mq.env}.mq.exchange
  routingKey:
      name: ${mq.env}.mq.routingKey
  queue:
      name: ${mq.env}.mq.queue

5、RabbitMqConfig配置,并创建基本的队列、交换机、以及相互绑定

package com.example.mybatiesplus.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

/**
 * @DESCRIPTION RabbitMQConfig插件配置
 * @Author lst
 * @Date 2020-05-22 10:00
 */
@Configuration
@Slf4j
public class RabbitMqConfig {

    @Autowired
    private Environment env;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 单一消费者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    /**
     * 多个消费者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",Integer.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",Integer.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",Integer.class));
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }

    /****************************************基本信息模型构建-发送简单消息***********************************************/
    /**
      * 创建基本信息交换机
      * @author lst
      * @date 2020-5-22 14:08
      * @param
      * @return org.springframework.amqp.core.DirectExchange
     */
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(env.getProperty("mq.exchange.name"), true,false);
    }

    /**
      * 创建基本信息队列
      * @author lst
      * @date 2020-5-22 14:16
      * @param
      * @return org.springframework.amqp.core.Queue
     */
    @Bean(name = "basicQueue")
    public Queue basicQueue(){
        return new Queue(env.getProperty("mq.queue.name"), true);
    }

    /**
      * 基本信息交换机和基本信息队列绑定
      * @author lst
      * @date 2020-5-22 14:16
      * @param
      * @return org.springframework.amqp.core.Binding
     */
    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.routingKey.name"));
    }


}

6、MQ的demo编写

RabbitController控制层
package com.example.mybatiesplus.controller;

import com.example.mybatiesplus.entity.User;
import com.example.mybatiesplus.result.BaseResponse;
import com.example.mybatiesplus.result.ResultGenerator;
import com.example.mybatiesplus.service.RabbitServicel;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @DESCRIPTION MQ消息发送
 * @Author lst
 * @Date 2020-05-22 16:00
 */
@RestController
@RequestMapping("/rabbit")
@Api(value = "RabbitController", tags = "MQ消息发送")
public class RabbitController {

    @Autowired
    private RabbitServicel rabbitServicel;

    /**
     * 通过MQ发送简单消息
     * @author lst
     * @date 2020-5-22 15:56
     * @param message 消息
     * @return void
     */
    @PostMapping(value = "/message/send",produces = "application/json")
    @ApiOperation(value = "通过MQ发送简单消息", notes = "通过MQ发送简单消息", produces = "application/json")
    @ApiImplicitParams({
            @ApiImplicitParam(paramType = "query", dataType = "string", name = "message",required = true ,value = "消息")
    })
    public BaseResponse sendMsg(@RequestParam(value = "message",required = true) String message) {
        rabbitServicel.sendMsg(message);
        return ResultGenerator.genSuccessResult("请求成功");
    }

    /**
     * 通过MQ发送对象消息
     * @author lst
     * @date 2020-5-22 15:56
     * @param user 用户消息
     * @return void
     */
    @PostMapping(value = "/message/send-object",produces = "application/json")
    @ApiOperation(value = "通过MQ发送对象消息", notes = "通过MQ发送对象消息", produces = "application/json")
    public BaseResponse sendObjectMsg(@RequestBody User user) {
        rabbitServicel.sendObjectMsg(user);
        return ResultGenerator.genSuccessResult("请求成功");
    }
}
RabbitServicelmpl实现层
package com.example.mybatiesplus.service.impl;

import com.example.mybatiesplus.exceptionhandler.BaseException;
import com.example.mybatiesplus.exceptionhandler.BaseExceptionEnum;
import com.example.mybatiesplus.service.RabbitServicel;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

/**
 * @DESCRIPTION MQ消息发送
 * @Author lst
 * @Date 2020-05-22 16:00
 */
@Service
@Slf4j
public class RabbitServicelmpl implements RabbitServicel {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private Environment env;

    /**
      * 通过MQ发送简单消息
      * @author lst
      * @date 2020-5-22 15:56
      * @param message 消息
      * @return void
     */
    @Override
    public void sendMsg(String message) {
        try {
            log.info("待发送的消息: {} ",message);
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setExchange(env.getProperty("mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("mq.routingKey.name"));
            //Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
            rabbitTemplate.convertAndSend(message);
        }catch (Exception e) {
            log.error("发送简单消息发生异常: ", e.getMessage());
            throw new BaseException(BaseExceptionEnum.SEND_MSG_ERROR);
        }
    }

    /**
     * 通过MQ发送对象消息
     * @author lst
     * @date 2020-5-22 15:56
     * @param user 用户消息
     * @return void
     */
    @Override
    public void sendObjectMsg(User user) {
        try {
            log.info("待发送的消息: {} ",user.toString());
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setExchange(env.getProperty("mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("mq.routingKey.name"));
            //Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
            rabbitTemplate.convertAndSend(user);
        }catch (Exception e) {
            log.error("发送对象消息发生异常: ", e.getMessage());
            throw new BaseException(BaseExceptionEnum.SEND_MSG_ERROR);
        }
    }

    /**
      * 监听消费消息
      * @author lst
      * @date 2020-5-22 15:58
      * @param message
      * @return void
     */
    @RabbitListener(queues = "${mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage( Object message){
        try {
            //TODO:接收String
            //String result = new String(message,"UTF-8");
           
            log.info("接收String消息: {} ",message);
        }catch (Exception e){
            log.error("监听消费消息 发生异常: ",e.getMessage());
        }
    }
}

监听消费我是放在了实现层,可以单独抽离出来的。

7、通过swagger-bootstrap-ui测试

1.1 首先将监听注释,测试将发送的消息是否发在了MQ服务上。

从队列和后台日志可以看出信息以及发送到MQ的服务上,由于我把监听消息的代码注释了,所以没有消费。那么我们放开监听消息的代码。

从上面两个图可以看出,当监听代码放开,服务启动后,就将队列中的消息消费了。

 

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐