RocketMQ是什么?
RocketMQ介绍一.消息中间件1.应用场景1.异步解耦例:注册 发短信 发邮件的操作,我们以前要先注册,再等着发短信,最后等着发邮件,而使用消息件就直接注册将其封装成一个对象放到消息中间件中,然后由消息中间件去监控发短信与发邮件,然后接着去干其他事情,不用等着2.削峰填谷//请求先到消息中间件然后服务器一点一点的获取中间件中的请求,避免一次性所有请求全部到达服务器3.分布式缓存同步/消息分发/
·
RocketMQ介绍
一.消息中间件
1.应用场景
1.异步解耦
例:注册 发短信 发邮件的操作,我们以前要先注册,再等着发短信,最后等着发邮件,
而使用消息件就直接注册将其封装成一个对象放到消息中间件中,然后由消息中间件去监控发短信与发邮件,
然后接着去干其他事情,不用等着
2.削峰填谷
//请求先到消息中间件 然后服务器一点一点的获取中间件中的请求,避免一次性所有请求全部到达服务器
3.分布式缓存同步/消息分发
//微服务中,每个服务都放到消息中间件中,当一个服务需要调用另一个服务时,直接到消息中间件中获取
未使用消息中间件
使用消息中间件(异步解耦)
削峰填谷
2.消息中间件
1.ActiveMQ
2.KafKa
3.RabbitMQ
4.RocketMQ
消息中间件对比
二.RocketMQ的核心概念
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分
1.名字服务NameServer
主要负责对于源数据的管理,包括了对于Topic和路由信息的管理
1.生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。
2.多个Namesrv实例组成集群,但相互独立,没有信息交换。
注意:Broker向NameServer发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),
会导致一次心跳中,Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败
2.代理服务器Broker Server
消息中转角色,负责存储消息、转发消息
1.Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,
并会定时将Topic信息注册到NameServer
2.代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备
3.生产者Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息
1.一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
2.RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
3.同步和异步方式均需要Broker返回确认信息,单向发送不需要。
4.消费者Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费
Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制
5.消息内容Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题
1.RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
2.系统提供了通过Message ID和Key查询消息的功能。
6.消息主题Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题
是RocketMQ进行消息订阅的基本单位
7.标签Tag
为消息设置的标志,用于同一主题下区分不同类型的消息
1.标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统
2.消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
8.消息队列MessageQueue
主题被划分为一个或多个子主题,即消息队列
对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取
三.发送消息方式
1.同步发送(默认同步)
必须要等消息持久化到磁盘中以后,rocketmq给生产者返回一个消息,持久化完成
2.异步发送
不需要等消息持久化到磁盘中,就可以执行后面的业务逻辑
3.单向发送
一次性发送,把消息发送到消息中间件中,不需要确认返回结果
四.消费模式
1.集群模式(默认模式)
MessageModel.CLUSTERING 多个消费者分担一个消费者的压力, 一个消息只会给一个消费者消费(订单只会支付一次)
//默认集群模式
//设置集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);//不设置默认也可以
2.广播模式
MessageModel.BROADCASTING 需要对同一个消息进行不同处理的时候, 比如对同一个消息, 需要同时发送短信和发送邮件,
一个消息会发送给所有的消费者(同时发送短信,邮箱)
//在消费者端加一个设置广播模式就好
//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
五.消费方式
1.推送消费(push)
//主动将信息推送给消费者
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
2.拉取消费(pul)
//消费者主动broker中获取,前提是broker中要存在生产者已经发送过来的信息,否则报错
关键类
//1.DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer");
//2.PullResult pullResult = consumer.pull(new MessageQueue("pull", "broker-a", 0), "*", 0, 1);
六.延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
//在生产端设置延时等级2, 5秒后发送
message.setDelayTimeLevel(2);
//延迟等级
延时消息的使用限制
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
七.消息过滤
1.Tag标签过滤(默认这个)
生产者
Message message =
new Message("06-filter", "tagC", "hello,rocketmq".getBytes("utf-8"));
消费者
consumer.subscribe("06-filter", "tagA||tagC");
//生产者第二个参数tag与消费者第二个参数相对应就都能匹配,如果消费者使用*就都能接受
2.SQL92过滤
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
//注意: 在使用SQL过滤的时候, 需要在conf中broker.conf配置参数enablePropertyFilter=true
步骤:
1.在生产者中设置条件 message.putUserProperty("age", "20");
2.在消费者中设置匹配条件
consumer.subscribe("06-filter", MessageSelector.bySql("age>18"));
八.使用api实现
生产者
public class Producer {
public static void main(String[] args) throws Exception{
//1 创建一个生产者对象, 并且指定一个生产者组
DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer");
//2 设置名字服务的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3 启动生产者
producer.start();
//4 创建一个消息
Message message = new Message("01-hello", "tagA", "hello,rocketmq".getBytes("utf-8"));
//5 使用生产者发送消息
producer.send(message);
//6 关闭连接
producer.shutdown();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception{
//创建一个拉取消息的消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode-consumer");
//设置名字地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//绑定消息的主题
consumer.subscribe("01-hello", "*");
//消费者监听处理消息方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费线程:"+Thread.currentThread().getName()+",消息ID:"+msg.getMsgId()+",消息内容:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}
九.SpringBoot集成RocketMQ
1>同步/异步/一次性消息
1.同步消息(生产者)
RestController
public class SendMsgController {
@Autowired
private RocketMQTemplate template;
@RequestMapping("/sendMsg")
public String sendMsg(String msg) {
//第一个参数目的地(主题)
//第二个参数:发送的消息
//同步发送的方式
template.syncSend("01-boot-hello", msg);
return "发送成功";
}
}
2.异步消息(生产者)
@RequestMapping("/asyncMsg")
public String asyncMsg(String msg) {
//异步发送的方式
template.asyncSend("01-boot-hello", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
return "发送成功";
}
3.一次性消息(生产者)
@RequestMapping("/sendOneWay")
public String sendOneWay(String msg)
//一次性发送的方式
template.sendOneWay("01-boot-hello", msg);
return "发送成功";
}
同步/异步/一次性(消费者代码不变)
@Component
@RocketMQMessageListener(
consumerGroup = "boot-consumer-demo", //组
topic="01-boot-hello" //主题
)
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消息的内容" + msg);
}
}
2>集群模式/广播模式
//生产者
@RestController
public class SendMsgController {
@Autowired
private RocketMQTemplate template;
@RequestMapping("/sendMsg")
public String sendMsg(String msg)
template.syncSend("02-consumer-model", msg);
return "发送成功";
}
}
//消费者
//集群模式
@Component
@RocketMQMessageListener(
consumerGroup = "boot-consumer-demo",
topic="02-consumer-model",
messageModel = MessageModel.CLUSTERING //集群模式
)
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消费者一,消息的内容" + msg);
}
}
//消费者
//广播模式
@Component
@RocketMQMessageListener(
consumerGroup = "boot-consumer-demo",
topic="02-consumer-model",
messageModel = MessageModel.BROADCASTING //广播模式
)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消费者二,消息的内容" + msg);
}
}
3>延时消息
//生产者
@RestController
public class SendMsgController {
@Autowired
private RocketMQTemplate template;
@RequestMapping("/sendDelayMsg")
public String sendDelayMsg(String msg) {
//延时发送的方式
MessageBuilder<String> builder = MessageBuilder.withPayload(msg);
template.syncSend("03-boot-delay", builder.build(),5000,2);
return "发送成功";
}
}
//消费者
@Component
@RocketMQMessageListener(
consumerGroup = "boot-consumer-demo1",
topic="03-boot-delay"
)
public class Consumer3 implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消息的内容" + msg);
}
}
4>tag过滤
//生产者
@RestController
public class SendMsgController {
@RequestMapping("/filter")
public String filter(String msg,String tag) {
template.syncSend("04-boot-filter:"+tag, msg);
return "发送成功";
}
}
//消费者
@Component
@RocketMQMessageListener(
consumerGroup = "boot-consumer-demo",
topic = "04-boot-filter",
selectorType = SelectorType.TAG, //tag过滤方式
selectorExpression = "TagA||TagC" //过滤格式
)
public class Consumer4 implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消息的内容" + msg);
}
}
//测试:http://localhost:8089/filter?msg=hello&tag=TagA
5>SQL92过滤
//生产者
@RestController
public class SendMsgController {
@RequestMapping("/filter")
public String filter(String msg,String age) {
HashMap map = new HashMap();
map.put("age",age );
template.convertAndSend("04-boot-filter:", msg,map);
return "发送成功";
}
}
//消费者
@Component
@RocketMQMessageListener(
consumerGroup = "boot-consumer-demo",
topic = "04-boot-filter",
/* selectorType = SelectorType.TAG,
selectorExpression = "TagA||TagC"*/
selectorType = SelectorType.SQL92, //SQL92过滤
selectorExpression = "age>20" //过滤格式
)
public class Consumer4 implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("消息的内容" + msg);
}
}
//测试:http://localhost:8089/filter?msg=hello&age=25
pom.xml依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.properties
//生产者
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
//设置发送消息时间,默认3秒,设置10秒,防止网络延迟导致发送失败
rocketmq.producer.send-message-timeout=10000
server.port=8089
//消费者
rocketmq.name-server=127.0.0.1:9876
server.port=8088
十.小结
1.RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件
2.RcoketMQ使用场景有异步解耦 削峰填谷 分布式缓存同步/消息分发等
3.生产者提供三种发送方式:1.同步发送 2.异步发送 3.单向发送
4.消费者支持PUSH和PULL两种消费模式,支持集群消费和广播消息
5.消费者的集群消费和广播消息只需要修改消费者端的代码即可
6.2种消息过滤都只要更改消费端注解上的类型和过滤方式即可
更多推荐
已为社区贡献2条内容
所有评论(0)