RabbitMQ的安装及发送消息
个人的理解(后续还会更新),主要还是靠自己的实践和理解
目录
概念
RabbitMQ是一个开源的、高可靠性的消息中间件,它实现了高级消息队列协议(AMQP)并提供了可靠的消息传递机制。它主要用于分布式系统之间的异步通信和解耦,广泛应用于微服务架构、任务队列、日志处理、即时通信等场景。
RabbitMQ的核心概念包括以下几个部分:
1. Producer(生产者):将消息发布到RabbitMQ的应用程序。
2. Queue(队列):存储消息的地方,在RabbitMQ中以先进先出(FIFO)的方式进行消息的存储和分发。
3. Exchange(交换机):接收生产者发送的消息,并将消息路由到一个或多个队列。
4. Binding(绑定):将交换机和队列关联起来的规则,用于确定消息从交换机到队列的路由策略。
5. Consumer(消费者):从队列中获取消息,并进行处理的应用程序。
RabbitMQ的工作流程如下:
1. 生产者通过连接到RabbitMQ的Broker,将消息发送到指定的交换机。
2. 交换机根据绑定关系将消息路由到一个或多个队列。
3. 队列中的消费者从队列中获取消息,并进行相应的处理。
RabbitMQ具有以下特点:
l 可靠性:RabbitMQ使用持久化机制,可确保消息不会丢失,并可以在宕机后恢复。
l 灵活的路由:通过交换机和绑定规则,可以实现灵活的消息路由策略。
l 高可扩展性:RabbitMQ可以通过设置多个队列和消费者来实现高吞吐量和水平扩展。
l 消息确认机制:消费者可以通过发送确认消息来告知RabbitMQ已成功处理了某条消息,以确保消息不会重复处理。
l 可视化管理界面:RabbitMQ提供了一个易于使用的Web管理界面,用于监控和管理消息队列。
总之,RabbitMQ是一个功能强大、可靠性高的消息中间件,提供了丰富的特性和灵活的配置选项,使得开发者能够轻松地构建可靠的分布式应用系统。
为什么要使用mq(业务场景)
1、异步
2、应用解耦
3、流量削峰
安装
准备工作
#打开docker目录
[root@zh]# cd usr/local/docker/
#创建RabbitMQ文件
[root@zh docker]# mkdir rabbitmq
#打开RabbitMQ文件
[root@zh docker]# cd rabbitmq/
挂载目录
#创建挂载目录
[root@zh rabbitmq]# mkdir data
启动脚本
#编写脚本
[root@zh rabbitmq]# vim startRabbitMq.sh
脚本内容
docker run -d \
-v /usr/local/docker/rabbitmq/data:/var/lib/rabbitmq \
-p 5672:5672 -p 15672:15672 \
--name rabbitmq \
--restart=always \
--hostname myRabbit rabbitmq:3.9.13-management
脚本赋予权限
[root@zh rabbitmq]# chmod -R 700 startRabbitMq.sh
启动脚本
[root@zh rabbitmq]# ./startRabbitMq.sh
Unable to find image 'rabbitmq:3.9.13-management' locally
3.9.13: Pulling from library/rabbitmq
4d32b49e2995: Pull complete
2108a18330ce: Pull complete
5c6af9d52173: Pull complete
0f88690b6c7c: Pull complete
9531e199a7d9: Pull complete
efaba55aede6: Pull complete
41502a4f43bc: Pull complete
11b60d9df2ff: Pull complete
ac0763dc13e5: Pull complete
Digest: sha256:f5c8c7fd99e4c88527276df319556fdcb56e4d289614c5fefda5ee8d17c5ea89
Status: Downloaded newer image for rabbitmq:3.9.13-management
ee7636e3d0baa7df4d5490c99616fef006bceedb08c46d7a2fea5650a6f01429
交换机 接收消息
发布订阅 绑定上就行
路由模式 包含
主题 通配符
消息转换器
第一种方式:将对象实现序列化接
第二种方式:将对象序列化成JSON字符串
第三种方式:自定义消息转换器
延迟队列
发短信
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
rabbitmq:
username: guest
password: guest
virtualHost: /
port: 5672
host: 192.168.109.129
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
nacos中的配置
rabbitmq:
host: 192.168.109.129
port: 5672
# 开启发送端消息抵达Broker确认
publisher-confirms: true
# 开启发送端消息抵达Queue确认
publisher-returns: true
# 只要消息抵达Queue,就会异步发送优先回调 returnfirm
template:
mandatory: true
listener:
simple:
# 手动 ack消息,不使用默认的消费端确认
acknowledge-mode: manual
定义一个常量
@Data
public class RabbitConstants {
//发送验证码
public static final String SEND_CODE_BY_PHONE="send_code_by_phone";
}
生产者
这段代码是为了给消息设置一个唯一的消息ID,使用了UUID.randomUUID().toString()方法生成一个随机的UUID作为消息ID
UUID(Universally Unique Identifier)是一个标准化的128位字符串格式,用于唯一地标识信息。调用UUID.randomUUID()方法可以生成一个新的随机UUID。这个UUID在很大程度上保证了唯一性,即使在分布式系统中也能够生成不重复的ID。
通过将生成的UUID转换为字符串,然后赋值给消息的消息ID属性,可以确保每个消息在发送过程中具有唯一的标识符。
请注意,该代码片段只是给消息设置一个唯一的ID,并不涉及具体的消息传递和处理逻辑。在实际使用中,您可能还需要根据业务需求,对消息进行更多的处理和操作。
public Result sendCode(String phone) {
if (!valIdPhone(phone)){
return Result.error("手机格式错误!");
}
Result<SysUser> byPhone = sysUserFeignService.findIdPhone(phone);
SysUser sysUser = byPhone.getData();
if (sysUser==null){
return Result.error("请注册!");
}
String code = RandomUtil.randomNumbers(4);
redisTemplate.opsForValue().set(phone,code,5, TimeUnit.MINUTES);
/**
* rabbitmq发送消息
*/
rabbitTemplate.convertAndSend(RabbitConstants.SEND_CODE_BY_PHONE,"",message -> {
//设置消息ID
message.getMessageProperties().setMessageId(UUID.randomUUID().toString().replaceAll("-",""));
return message;
});
return Result.success(code,"验证码发送成功!");
}
消费者
防止可重复消费
//消费者
@AllArgsConstructor
@Log4j2
@Component
public class SmsConfig {
private final RedisTemplate<String,String> redisTemplate;
@RabbitListener(queuesToDeclare = {@Queue(name = RabbitConstants.SEND_CODE_BY_PHONE)})
public void smsConfig(String msg, Message message, Channel channel){
log.info("消费者接收到消息,队列名称:{},消息内容:{},消费中........",RabbitConstants.SEND_CODE_BY_PHONE,msg);
//唯一值
String messageId = message.getMessageProperties().getMessageId();
try {
//判断消息是否被消费过
Long add = redisTemplate.opsForSet().add(RabbitConstants.SEND_CODE_BY_PHONE, messageId);
if (add == 1){
//反序列化
LoginRequest loginRequest = JSONObject.parseObject(msg, LoginRequest.class);
//发送短信
String sendSms = TelSmsUtils.sendSms(loginRequest.getPhone(), "SMS1001", new HashMap<String, String>() {{
put("code", loginRequest.getCode());
}});
//发送失败,重新发送
//反序列化
SendSmsResponseBody sendSmsResponseBody = JSONObject.parseObject(sendSms, SendSmsResponseBody.class);
if (!sendSmsResponseBody.getCode().equals("OK")){
//发送失败了
//重新发送
TelSmsUtils.sendSms(loginRequest.getPhone(), "SMS1001", new HashMap<String, String>() {{
put("code", loginRequest.getCode());
}});
}
//手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消费者消费消息成功,队列名称:{},消息内容:{}",RabbitConstants.SEND_CODE_BY_PHONE,msg);
}else {
log.info("重复消费消息,队列名称:{},消息内容:{}",RabbitConstants.SEND_CODE_BY_PHONE,msg);
}
} catch (Exception e) {
e.printStackTrace();
//删除redis
redisTemplate.opsForSet().remove(RabbitConstants.SEND_CODE_BY_PHONE, messageId);
//消息出现异常,需要回退消息
log.error("消息出现异常,队列名称:{},消息内容:{},异常信息是:{}",RabbitConstants.SEND_CODE_BY_PHONE,msg,e);
//回退
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
更多推荐
所有评论(0)