springboot整合rabbitmq(发送基本和对象数据)
1、前言SpringBoot整合RabbitMQ非常简单,官网有对应的starter,可以自动装配。本文使用概述了最简单的MQ信息发送和接收。2、环境安装首先得有一个RabbitMQ的环境,我是用了docker安装,非常简单。点击 Queues、Exchanges 栏目,即可看到我们创建好的队列、交换机。3、pom引入<!--RabbitMQ--><dependency>&
·
1、前言
SpringBoot整合RabbitMQ非常简单,官网有对应的starter,可以自动装配。本文使用概述了最简单的MQ信息发送和接收。
2、环境安装
首先得有一个RabbitMQ的环境,我是用了docker安装,非常简单。点击 Queues、Exchanges 栏目,即可看到我们创建好的队列、交换机。
3、pom引入
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4、application.yml配置
spring
rabbitmq: #mq配置
host: 47.96.000.000
port: 5672
username: guest
password: guest
listener:
concurrency: 10
max-concurrency: 20
prefetch: 5
mq:
env: local
exchange:
name: ${mq.env}.mq.exchange
routingKey:
name: ${mq.env}.mq.routingKey
queue:
name: ${mq.env}.mq.queue
5、RabbitMqConfig配置,并创建基本的队列、交换机、以及相互绑定
package com.example.mybatiesplus.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* @DESCRIPTION RabbitMQConfig插件配置
* @Author lst
* @Date 2020-05-22 10:00
*/
@Configuration
@Slf4j
public class RabbitMqConfig {
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
/**
* 多个消费者
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",Integer.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",Integer.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",Integer.class));
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
/****************************************基本信息模型构建-发送简单消息***********************************************/
/**
* 创建基本信息交换机
* @author lst
* @date 2020-5-22 14:08
* @param
* @return org.springframework.amqp.core.DirectExchange
*/
@Bean
public DirectExchange basicExchange(){
return new DirectExchange(env.getProperty("mq.exchange.name"), true,false);
}
/**
* 创建基本信息队列
* @author lst
* @date 2020-5-22 14:16
* @param
* @return org.springframework.amqp.core.Queue
*/
@Bean(name = "basicQueue")
public Queue basicQueue(){
return new Queue(env.getProperty("mq.queue.name"), true);
}
/**
* 基本信息交换机和基本信息队列绑定
* @author lst
* @date 2020-5-22 14:16
* @param
* @return org.springframework.amqp.core.Binding
*/
@Bean
public Binding basicBinding(){
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.routingKey.name"));
}
}
6、MQ的demo编写
RabbitController控制层
package com.example.mybatiesplus.controller;
import com.example.mybatiesplus.entity.User;
import com.example.mybatiesplus.result.BaseResponse;
import com.example.mybatiesplus.result.ResultGenerator;
import com.example.mybatiesplus.service.RabbitServicel;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @DESCRIPTION MQ消息发送
* @Author lst
* @Date 2020-05-22 16:00
*/
@RestController
@RequestMapping("/rabbit")
@Api(value = "RabbitController", tags = "MQ消息发送")
public class RabbitController {
@Autowired
private RabbitServicel rabbitServicel;
/**
* 通过MQ发送简单消息
* @author lst
* @date 2020-5-22 15:56
* @param message 消息
* @return void
*/
@PostMapping(value = "/message/send",produces = "application/json")
@ApiOperation(value = "通过MQ发送简单消息", notes = "通过MQ发送简单消息", produces = "application/json")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "query", dataType = "string", name = "message",required = true ,value = "消息")
})
public BaseResponse sendMsg(@RequestParam(value = "message",required = true) String message) {
rabbitServicel.sendMsg(message);
return ResultGenerator.genSuccessResult("请求成功");
}
/**
* 通过MQ发送对象消息
* @author lst
* @date 2020-5-22 15:56
* @param user 用户消息
* @return void
*/
@PostMapping(value = "/message/send-object",produces = "application/json")
@ApiOperation(value = "通过MQ发送对象消息", notes = "通过MQ发送对象消息", produces = "application/json")
public BaseResponse sendObjectMsg(@RequestBody User user) {
rabbitServicel.sendObjectMsg(user);
return ResultGenerator.genSuccessResult("请求成功");
}
}
RabbitServicelmpl实现层
package com.example.mybatiesplus.service.impl;
import com.example.mybatiesplus.exceptionhandler.BaseException;
import com.example.mybatiesplus.exceptionhandler.BaseExceptionEnum;
import com.example.mybatiesplus.service.RabbitServicel;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
/**
* @DESCRIPTION MQ消息发送
* @Author lst
* @Date 2020-05-22 16:00
*/
@Service
@Slf4j
public class RabbitServicelmpl implements RabbitServicel {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private Environment env;
/**
* 通过MQ发送简单消息
* @author lst
* @date 2020-5-22 15:56
* @param message 消息
* @return void
*/
@Override
public void sendMsg(String message) {
try {
log.info("待发送的消息: {} ",message);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.routingKey.name"));
//Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
rabbitTemplate.convertAndSend(message);
}catch (Exception e) {
log.error("发送简单消息发生异常: ", e.getMessage());
throw new BaseException(BaseExceptionEnum.SEND_MSG_ERROR);
}
}
/**
* 通过MQ发送对象消息
* @author lst
* @date 2020-5-22 15:56
* @param user 用户消息
* @return void
*/
@Override
public void sendObjectMsg(User user) {
try {
log.info("待发送的消息: {} ",user.toString());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.routingKey.name"));
//Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
rabbitTemplate.convertAndSend(user);
}catch (Exception e) {
log.error("发送对象消息发生异常: ", e.getMessage());
throw new BaseException(BaseExceptionEnum.SEND_MSG_ERROR);
}
}
/**
* 监听消费消息
* @author lst
* @date 2020-5-22 15:58
* @param message
* @return void
*/
@RabbitListener(queues = "${mq.queue.name}",containerFactory = "singleListenerContainer")
public void consumeMessage( Object message){
try {
//TODO:接收String
//String result = new String(message,"UTF-8");
log.info("接收String消息: {} ",message);
}catch (Exception e){
log.error("监听消费消息 发生异常: ",e.getMessage());
}
}
}
监听消费我是放在了实现层,可以单独抽离出来的。
7、通过swagger-bootstrap-ui测试
1.1 首先将监听注释,测试将发送的消息是否发在了MQ服务上。
从队列和后台日志可以看出信息以及发送到MQ的服务上,由于我把监听消息的代码注释了,所以没有消费。那么我们放开监听消息的代码。
从上面两个图可以看出,当监听代码放开,服务启动后,就将队列中的消息消费了。
更多推荐
已为社区贡献7条内容
所有评论(0)