• 因为是分布式微服务项目,所以发送方在一个微服务,接收方在另外的一个微服务,在发送方,导入RabbitMQ依赖包
 <!--RabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 在yml配置RabbitMQ的ip端口号名字密码等
server:
  port: 80
#===mq start===
spring:
  application:
    name: course-server #服务名
  rabbitmq:
    host: 192.168.0.145
    port: 5672
    username: yz
    password: cgm888666
    addresses: 192.168.0.145
    listener:
      direct:
        acknowledge-mode: manual #手动签收
    template:
      receive-timeout: 30000
      reply-timeout: 30000
    virtual-host: /
#===mq end===
  • 写配置类:创建队列交换机,并将队列绑定到交换机
  • 注意打上注解让spring扫描到配置,还有导包正确
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    //三个队列名字  短信 邮件 站内信
    //短信
    private static final String QUEUE_SMS = "queue_sms";
    //邮件
    private static final String QUEUE_EMAIL = "queue_email";
    //站内信
    private static final String QUEUE_SYSTEM = "queue_system";
    //交换机名称
    public static final String EXCHANGE_TOPIC = "exchange_topic";

    //定义交换机
    @Bean(EXCHANGE_TOPIC)
    public Exchange exchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC).durable(true).build();
    }

    //定义邮件队列
    @Bean(QUEUE_EMAIL)
    public Queue queueEmail() {
        return new Queue(QUEUE_EMAIL, true);
    }

    //定义短信队列
    @Bean(QUEUE_SMS)
    public Queue queueSMS() {
        return new Queue(QUEUE_SMS, true);
    }

    //定义队列
    @Bean(QUEUE_SYSTEM)
    public Queue queueSystem() {
        return new Queue(QUEUE_SYSTEM, true);
    }
    //将队列绑定到交换机
    @Bean
    public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("message_Email").noargs();
    }
    @Bean
    public Binding bindingSms(@Qualifier(QUEUE_SMS) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("message_Sms").noargs();
    }
    @Bean
    public Binding bindingSystem(@Qualifier(QUEUE_SYSTEM) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("message_System").noargs();
    }


}

  • 发送消息:然后在课程服务层,课程上线以后通知用户(不同的业务需求,可以在不同的情况下去通知用户)
/**
     * 课程上线
     * @param id 课程id
     */
    @Override
    public void onLineCourse(Long id) {
        //根据id查询课程
        Course course = baseMapper.selectById(id);
        //判断课程是否是下线状态  如果不是就直接结束
        if(course.getStatus()==Course.ONLINE_STATUS||course.getStatus()==null){
            logger.error("课程已经上线!");
            return;
        }
        //修改数据库课程状态为上线
        course.setStatus(Course.ONLINE_STATUS);
        baseMapper.updateById(course);
        //存到es
        //写es控制层和feign接口
        CourseDetail courseDetail = courseDetailMapper.selectById(id);
        CourseMarket courseMarket = courseMarketMapper.selectById(id);
        //将course装转成doc对象
        CourseDoc courseDoc=new CourseDoc();
        courseDoc.setSearchField(course.getName()+courseDetail.getDescription());
        BeanUtils.copyProperties(course,courseDoc);
        BeanUtils.copyProperties(courseDetail,courseDoc);
        BeanUtils.copyProperties(courseMarket,courseDoc);
        courseDoc.setPic(fastDfsUrl+course.getPic());
        System.out.println(courseDoc.getPic());
        AjaxResult ajaxResult = commonFeignClient.save(courseDoc);
        System.out.println(courseDetail);
        System.out.println(courseMarket);
        System.out.println(courseDoc);
        if(!ajaxResult.isSuccess()){
            logger.error("存入Es失败!");
        }
        pushMessage(course);
    }

    /**
     * 课程上线成功以后发送消息通知用户
     * @param course
     */
    private void pushMessage(Course course) {
        //发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,
                "message_Email",course.getName());//message_Email:队列绑定在交换机时设置的routingKey,发送的消息内容:course.getName()
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,
                "message_Sms",course.getName());
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,
                "message_System",course.getName());
    }
  • 在接收方导入RabbitMQ依赖包
 <!--RabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 在yml配置RabbitMQ
spring:
  application:
    name: course-server #服务名
    ##配置rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /
    listener:
      simple:
        acknowledge-mode: manual #手动签收
  • 写消息处理类,监听队列,然后对用户发送邮件短信,以及站内信等
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQHandler {
    //短信
    private static final String QUEUE_SMS = "queue_sms";
    //邮件
    private static final String QUEUE_EMAIL = "queue_email";
    //站内信
    private static final String QUEUE_SYSTEM = "queue_system";
    //监听队列接收消息
    @RabbitListener(queues = QUEUE_SMS)
    public void message_Sms(String msg, Message message, Channel channel){
        //获取用户电话给用户发送短信
        System.out.println("发送短信成功:"+msg);
    }
    @RabbitListener(queues = QUEUE_EMAIL)
    public void message_Email(String msg, Message message,Channel channel){
        //获取用户的邮箱 用户id  使用丰富邮箱发送超链接
        System.out.println("邮箱发送成功:"+msg);
    }
    @RabbitListener(queues = QUEUE_SYSTEM)
    public void meassge_System(String msg,Message message,Channel channel){
        //站内消息存入表中,状态未处理
        System.out.println("站内消息发送成功:"+msg);

    }
}


  • 注意点:MQ配置必须正确,@Bean注解 不要忘了。
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐