微服务架构集成RabbitMQ给用户推送消息(发送短信,发送邮件,发送站内信息)
因为是分布式微服务项目所以发送方在一个微服务,接收方在另外的一个微服务在发送方,导入RabbitMQ依赖包<!--RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</arti
·
- 因为是分布式微服务项目,所以发送方在一个微服务,接收方在另外的一个微服务,在发送方,导入RabbitMQ依赖包
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在yml配置RabbitMQ的ip端口号名字密码等
server:
port: 80
#===mq start===
spring:
application:
name: course-server #服务名
rabbitmq:
host: 192.168.0.145
port: 5672
username: yz
password: cgm888666
addresses: 192.168.0.145
listener:
direct:
acknowledge-mode: manual #手动签收
template:
receive-timeout: 30000
reply-timeout: 30000
virtual-host: /
#===mq end===
- 写配置类:创建队列交换机,并将队列绑定到交换机
- 注意打上注解让spring扫描到配置,还有导包正确
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//三个队列名字 短信 邮件 站内信
//短信
private static final String QUEUE_SMS = "queue_sms";
//邮件
private static final String QUEUE_EMAIL = "queue_email";
//站内信
private static final String QUEUE_SYSTEM = "queue_system";
//交换机名称
public static final String EXCHANGE_TOPIC = "exchange_topic";
//定义交换机
@Bean(EXCHANGE_TOPIC)
public Exchange exchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC).durable(true).build();
}
//定义邮件队列
@Bean(QUEUE_EMAIL)
public Queue queueEmail() {
return new Queue(QUEUE_EMAIL, true);
}
//定义短信队列
@Bean(QUEUE_SMS)
public Queue queueSMS() {
return new Queue(QUEUE_SMS, true);
}
//定义队列
@Bean(QUEUE_SYSTEM)
public Queue queueSystem() {
return new Queue(QUEUE_SYSTEM, true);
}
//将队列绑定到交换机
@Bean
public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("message_Email").noargs();
}
@Bean
public Binding bindingSms(@Qualifier(QUEUE_SMS) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("message_Sms").noargs();
}
@Bean
public Binding bindingSystem(@Qualifier(QUEUE_SYSTEM) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("message_System").noargs();
}
}
- 发送消息:然后在课程服务层,课程上线以后通知用户(不同的业务需求,可以在不同的情况下去通知用户)
/**
* 课程上线
* @param id 课程id
*/
@Override
public void onLineCourse(Long id) {
//根据id查询课程
Course course = baseMapper.selectById(id);
//判断课程是否是下线状态 如果不是就直接结束
if(course.getStatus()==Course.ONLINE_STATUS||course.getStatus()==null){
logger.error("课程已经上线!");
return;
}
//修改数据库课程状态为上线
course.setStatus(Course.ONLINE_STATUS);
baseMapper.updateById(course);
//存到es
//写es控制层和feign接口
CourseDetail courseDetail = courseDetailMapper.selectById(id);
CourseMarket courseMarket = courseMarketMapper.selectById(id);
//将course装转成doc对象
CourseDoc courseDoc=new CourseDoc();
courseDoc.setSearchField(course.getName()+courseDetail.getDescription());
BeanUtils.copyProperties(course,courseDoc);
BeanUtils.copyProperties(courseDetail,courseDoc);
BeanUtils.copyProperties(courseMarket,courseDoc);
courseDoc.setPic(fastDfsUrl+course.getPic());
System.out.println(courseDoc.getPic());
AjaxResult ajaxResult = commonFeignClient.save(courseDoc);
System.out.println(courseDetail);
System.out.println(courseMarket);
System.out.println(courseDoc);
if(!ajaxResult.isSuccess()){
logger.error("存入Es失败!");
}
pushMessage(course);
}
/**
* 课程上线成功以后发送消息通知用户
* @param course
*/
private void pushMessage(Course course) {
//发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,
"message_Email",course.getName());//message_Email:队列绑定在交换机时设置的routingKey,发送的消息内容:course.getName()
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,
"message_Sms",course.getName());
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,
"message_System",course.getName());
}
- 在接收方导入RabbitMQ依赖包
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在yml配置RabbitMQ
spring:
application:
name: course-server #服务名
##配置rabbitmq
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
acknowledge-mode: manual #手动签收
- 写消息处理类,监听队列,然后对用户发送邮件短信,以及站内信等
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MQHandler {
//短信
private static final String QUEUE_SMS = "queue_sms";
//邮件
private static final String QUEUE_EMAIL = "queue_email";
//站内信
private static final String QUEUE_SYSTEM = "queue_system";
//监听队列接收消息
@RabbitListener(queues = QUEUE_SMS)
public void message_Sms(String msg, Message message, Channel channel){
//获取用户电话给用户发送短信
System.out.println("发送短信成功:"+msg);
}
@RabbitListener(queues = QUEUE_EMAIL)
public void message_Email(String msg, Message message,Channel channel){
//获取用户的邮箱 用户id 使用丰富邮箱发送超链接
System.out.println("邮箱发送成功:"+msg);
}
@RabbitListener(queues = QUEUE_SYSTEM)
public void meassge_System(String msg,Message message,Channel channel){
//站内消息存入表中,状态未处理
System.out.println("站内消息发送成功:"+msg);
}
}
- 注意点:MQ配置必须正确,@Bean注解 不要忘了。
更多推荐
已为社区贡献2条内容
所有评论(0)