目录

概念

为什么要使用mq(业务场景)

1、异步

2、应用解耦

3、流量削峰

安装

准备工作

挂载目录

启动脚本

脚本内容

脚本赋予权限

启动脚本

延迟队列

发短信

依赖

配置

nacos中的配置

定义一个常量

生产者

消费者

防止可重复消费


概念

RabbitMQ是一个开源的、高可靠性的消息中间件,它实现了高级消息队列协议(AMQP)并提供了可靠的消息传递机制。它主要用于分布式系统之间的异步通信和解耦,广泛应用于微服务架构、任务队列、日志处理、即时通信等场景。

RabbitMQ的核心概念包括以下几个部分:

1. Producer(生产者):将消息发布到RabbitMQ的应用程序。

2. Queue(队列):存储消息的地方,在RabbitMQ中以先进先出(FIFO)的方式进行消息的存储和分发。

3. Exchange(交换机):接收生产者发送的消息,并将消息路由到一个或多个队列。

4. Binding(绑定):将交换机和队列关联起来的规则,用于确定消息从交换机到队列的路由策略。

5. Consumer(消费者):从队列中获取消息,并进行处理的应用程序。

RabbitMQ的工作流程如下:

1. 生产者通过连接到RabbitMQ的Broker,将消息发送到指定的交换机。

2. 交换机根据绑定关系将消息路由到一个或多个队列。

3. 队列中的消费者从队列中获取消息,并进行相应的处理。

RabbitMQ具有以下特点:

l 可靠性:RabbitMQ使用持久化机制,可确保消息不会丢失,并可以在宕机后恢复。

l 灵活的路由:通过交换机和绑定规则,可以实现灵活的消息路由策略。

l 高可扩展性:RabbitMQ可以通过设置多个队列和消费者来实现高吞吐量和水平扩展。

l 消息确认机制:消费者可以通过发送确认消息来告知RabbitMQ已成功处理了某条消息,以确保消息不会重复处理。

l 可视化管理界面:RabbitMQ提供了一个易于使用的Web管理界面,用于监控和管理消息队列。

总之,RabbitMQ是一个功能强大、可靠性高的消息中间件,提供了丰富的特性和灵活的配置选项,使得开发者能够轻松地构建可靠的分布式应用系统。

为什么要使用mq(业务场景)

1、异步
2、应用解耦

3、流量削峰

安装

准备工作
#打开docker目录
[root@zh]# cd usr/local/docker/

#创建RabbitMQ文件
[root@zh docker]# mkdir rabbitmq

#打开RabbitMQ文件
[root@zh docker]# cd rabbitmq/
挂载目录
#创建挂载目录
[root@zh rabbitmq]# mkdir data
启动脚本
#编写脚本
[root@zh rabbitmq]# vim startRabbitMq.sh
脚本内容
docker run -d \
-v /usr/local/docker/rabbitmq/data:/var/lib/rabbitmq \
-p 5672:5672 -p 15672:15672 \
--name rabbitmq \
--restart=always \
--hostname myRabbit rabbitmq:3.9.13-management
脚本赋予权限
[root@zh rabbitmq]# chmod -R 700 startRabbitMq.sh
启动脚本
[root@zh rabbitmq]# ./startRabbitMq.sh
Unable to find image 'rabbitmq:3.9.13-management' locally
3.9.13: Pulling from library/rabbitmq
4d32b49e2995: Pull complete 
2108a18330ce: Pull complete 
5c6af9d52173: Pull complete 
0f88690b6c7c: Pull complete 
9531e199a7d9: Pull complete 
efaba55aede6: Pull complete 
41502a4f43bc: Pull complete 
11b60d9df2ff: Pull complete 
ac0763dc13e5: Pull complete 
Digest: sha256:f5c8c7fd99e4c88527276df319556fdcb56e4d289614c5fefda5ee8d17c5ea89
Status: Downloaded newer image for rabbitmq:3.9.13-management
ee7636e3d0baa7df4d5490c99616fef006bceedb08c46d7a2fea5650a6f01429

交换机 接收消息

发布订阅 绑定上就行

路由模式 包含

主题 通配符

消息转换器

第一种方式:将对象实现序列化接

第二种方式:将对象序列化成JSON字符串

第三种方式:自定义消息转换器

延迟队列

发短信

依赖
  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
配置
  rabbitmq:
    username: guest
    password: guest
    virtualHost: /
    port: 5672
    host: 192.168.109.129
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
    publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
    publisher-returns: true  #确认消息已发送到队列(Queue)
nacos中的配置
rabbitmq:
    host: 192.168.109.129
    port: 5672
    # 开启发送端消息抵达Broker确认
    publisher-confirms: true
    # 开启发送端消息抵达Queue确认
    publisher-returns: true
    # 只要消息抵达Queue,就会异步发送优先回调 returnfirm
    template:
      mandatory: true
    listener:
      simple:
        # 手动 ack消息,不使用默认的消费端确认
        acknowledge-mode: manual 
定义一个常量
@Data
public class RabbitConstants {
    //发送验证码
    public static final String SEND_CODE_BY_PHONE="send_code_by_phone";
}
生产者

这段代码是为了给消息设置一个唯一的消息ID,使用了UUID.randomUUID().toString()方法生成一个随机的UUID作为消息ID

UUID(Universally Unique Identifier)是一个标准化的128位字符串格式,用于唯一地标识信息。调用UUID.randomUUID()方法可以生成一个新的随机UUID。这个UUID在很大程度上保证了唯一性,即使在分布式系统中也能够生成不重复的ID。

通过将生成的UUID转换为字符串,然后赋值给消息的消息ID属性,可以确保每个消息在发送过程中具有唯一的标识符。

请注意,该代码片段只是给消息设置一个唯一的ID,并不涉及具体的消息传递和处理逻辑。在实际使用中,您可能还需要根据业务需求,对消息进行更多的处理和操作。

public Result sendCode(String phone) {
        if (!valIdPhone(phone)){
            return Result.error("手机格式错误!");
        }
        Result<SysUser> byPhone = sysUserFeignService.findIdPhone(phone);
        SysUser sysUser = byPhone.getData();
        if (sysUser==null){
            return Result.error("请注册!");
        }
        String code = RandomUtil.randomNumbers(4);
        redisTemplate.opsForValue().set(phone,code,5, TimeUnit.MINUTES);
        /**
         * rabbitmq发送消息
         */
        rabbitTemplate.convertAndSend(RabbitConstants.SEND_CODE_BY_PHONE,"",message -> {
            //设置消息ID
            message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replaceAll("-",""));
            return message;
        });
        return Result.success(code,"验证码发送成功!");
    }
消费者

防止可重复消费
//消费者
@AllArgsConstructor
@Log4j2
@Component
public class SmsConfig {

    private final RedisTemplate<String,String> redisTemplate;

    @RabbitListener(queuesToDeclare = {@Queue(name = RabbitConstants.SEND_CODE_BY_PHONE)})
    public void smsConfig(String msg, Message message, Channel channel){
        log.info("消费者接收到消息,队列名称:{},消息内容:{},消费中........",RabbitConstants.SEND_CODE_BY_PHONE,msg);
        //唯一值
        String messageId = message.getMessageProperties().getMessageId();
        try {
            //判断消息是否被消费过
            Long add = redisTemplate.opsForSet().add(RabbitConstants.SEND_CODE_BY_PHONE, messageId);
            if (add == 1){
                //反序列化
                LoginRequest loginRequest = JSONObject.parseObject(msg, LoginRequest.class);
                //发送短信
                String sendSms = TelSmsUtils.sendSms(loginRequest.getPhone(), "SMS1001", new HashMap<String, String>() {{
                    put("code", loginRequest.getCode());
                }});
                //发送失败,重新发送
                //反序列化
                SendSmsResponseBody sendSmsResponseBody = JSONObject.parseObject(sendSms, SendSmsResponseBody.class);
                if (!sendSmsResponseBody.getCode().equals("OK")){
                    //发送失败了
                    //重新发送
                    TelSmsUtils.sendSms(loginRequest.getPhone(), "SMS1001", new HashMap<String, String>() {{
                        put("code", loginRequest.getCode());
                    }});
                }
                //手动确认消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                log.info("消费者消费消息成功,队列名称:{},消息内容:{}",RabbitConstants.SEND_CODE_BY_PHONE,msg);
            }else {
                log.info("重复消费消息,队列名称:{},消息内容:{}",RabbitConstants.SEND_CODE_BY_PHONE,msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
            //删除redis
            redisTemplate.opsForSet().remove(RabbitConstants.SEND_CODE_BY_PHONE, messageId);
            //消息出现异常,需要回退消息
            log.error("消息出现异常,队列名称:{},消息内容:{},异常信息是:{}",RabbitConstants.SEND_CODE_BY_PHONE,msg,e);
            //回退
            try {
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }
}
Logo

欢迎加入西安开发者社区!我们致力于为西安地区的开发者提供学习、合作和成长的机会。参与我们的活动,与专家分享最新技术趋势,解决挑战,探索创新。加入我们,共同打造技术社区!

更多推荐