RocketMQ内容分享(三):消费者和生产者、SpringBoot集成、Spring Cloud Stream集成
在MySpringcloudRocketmqProducerApplication 添加 @EnableBinding(Source.class) 注解。然后创建生产者。@Component@Resource// 封装消息头// 创建消息对象// 发送消息在MySpringcloudRocketmqConsumerApplication 类上添加 @EnableBinding(Sink.class
目录
SpringBoot集成RocketMQ-实现普通消息和事务消息
在 my-springcloud-rocketmq-producer 上的操作
在 my-springcloud-rocketmq-consumer上的操作
消费者和生产者的实现细节
1. 消费者的消费模式
RocketMQ 同时支持消费者的推模式以及拉模式。推模式顾名思义就是broker将消息推送给消费者,拉模式则是消费者主动到队列中拉取消息。默认情况下,RocketMQ使用的是推模式。
在IDEA中导入RocketMQ源码之后,找到 example模块,然后在此模块中找到各种例子。
1.1.推模式
消费者推模式的例子就是 org.apache.rocketmq.example.simple.PushConsumer
。推模式的消费者的实现类是 DefaultMQPushConsumer 。之前的文章已经做了详细介绍,在此就不在赘述了。推模式适合于大部分正常消费的情况
public static final String TOPIC = "TopicTest";
public static final String CONSUMER_GROUP = "CID_JODIE_1";
public static final String NAMESRV_ADDR = "127.0.0.1:9876";
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.subscribe(TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
1.2. 拉模式
消费者拉模式的例子是:org.apache.rocketmq.example.simple.LitePullConsumerAssign
。拉模式主要适用于回溯消费消息。比如:某个消息你消费失败了,你现在想重新消费该消息的情况。我们知道RocketMQ中消息消费完之后不会里面会被删除,默认会在队列中保留48小时。通过broker配置文件中的fileReservedTime参数进行设置。
//1.创建DefaultLitePullConsumer实例
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false);
//2.启动litePullConsumer实例
litePullConsumer.start();
//3.获取TopicTest主题下所有的队列
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
//4.消费者需要拉取的队列的集合
litePullConsumer.assign(assignList);
//5.消费者需要定位,哪个队列,多少偏移量的消息。
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commit();
}
} finally {
litePullConsumer.shutdown();
}
2. 生产者负载均衡策略
我们都知道一个主题下会有多个消息队列(MessageQueue),那么,生产者在发送消息的时候如何选择消息队列呢?
首先找到生产者的示例代码类:org.apache.rocketmq.example.simple.Producer
。在该类中找到发送消息的方法 producer.send(msg)
。
接着找到发送消息的默认实现方法 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
在此方法中可以找到 selectOneMessageQueue 方法,从方法名可以知道此方法就是用来选出一个MessageQueue的
//省略部分代码
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
//省略部分代码
在selectOneMessageQueue方法中通过调用 tpInfo.selectOneMessageQueue 方法来获取
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
BrokerFilter brokerFilter = threadBrokerFilter.get();
brokerFilter.setLastBrokerName(lastBrokerName);
if (this.sendLatencyFaultEnable) {
if (resetIndex) {
tpInfo.resetIndex();
}
MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);
if (mq != null) {
return mq;
}
mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);
if (mq != null) {
return mq;
}
return tpInfo.selectOneMessageQueue();
}
MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);
if (mq != null) {
return mq;
}
return tpInfo.selectOneMessageQueue();
}
那么最终的实现逻辑就是在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue
方法中了。我们可以查看此方法。
private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) {
//省略非核心代码
if (filter != null && filter.length != 0) {
for (int i = 0; i < messageQueueList.size(); i++) {
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
MessageQueue mq = messageQueueList.get(index);
boolean filterResult = true;
for (QueueFilter f: filter) {
Preconditions.checkNotNull(f);
filterResult &= f.filter(mq);
}
if (filterResult) {
return mq;
}
}
return null;
}
这里的核心代码就是下面这句代码:
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
首先通过 sendQueue.incrementAndGet()
方法获取当前线程下index值。然后对该主题下所有的队列数进行求模取余。也就是说RocketMQ默认会采取轮询的方式选择消息队列 接着我们来看下该方法的实现。
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<>();
private final Random random = new Random();
public int incrementAndGet() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = random.nextInt();
}
this.threadLocalIndex.set(++index);
return index & POSITIVE_MASK;
}
首先从线程本地变量 threadLocalIndex 中获取索引值index,如果没有的话则随机取一个值。然后将取到index中进行加一操作放回threadLocalIndex中。
SpringBoot集成RocketMQ-实现普通消息和事务消息
1. 引入依赖
本例中使用的RocketMQ的版本是 5.1.3。所以引入的 rocketmq-spring-boot 版本要与之匹配。
可以通过mvnrepository进行查看。https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot/2.2.2
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.2</version>
</dependency>
2. 配置文件修改
在springboot-rocketmq-producer项目的application.yml文件中添加如下配置:
rocketmq:
name-server: 172.31.184.89:9876
producer:
group: feige-producer-group
consumer:
topic: my-spring-boot-topic
在springboot-rocketmq-consumer项目的application.yml文件中添加如下配置:
server:
port: 8080
rocketmq:
name-server: 172.31.184.89:9876
consumer:
group: feige-consumer-group
topic: my-spring-boot-topic
3. 实现生产者
定义一个生产者类MyProducer,在该类中引入RocketMQTemplate 操作类,然后定义发送消息的方法sendMessage,在此方法中调用 rocketMQTemplate.convertAndSend
方法进行消息发送。
@Component
public class MyProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*
* @param topic 主题
* @param message 消息
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
3.1. 编写生产者单元测试
@Autowired
private MyProducer myProducer;
@Value("${rocketmq.consumer.topic:}")
private String consumerTopic;
@Test
void sendMessage() {
myProducer.sendMessage(consumerTopic,"飞哥SpringBoot集成RocketMQ消息测试");
}
4.实现消费者
定义消费者类MyConsumer。此类实现了RocketMQListener接口并重写了onMessage方法用于接收broker推送过来的消息。
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic:}", consumerGroup = "generalConsumerGroup")
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到的消息是=" + s);
}
}
5. 实现事务消息
在SpringBoot中实现RocketMQ的事务消息,整体思路与 【RocketMQ系列六】RocketMQ事务消息 文中提到的思路相同。
5.1. 实现事务消息的生产者
在前面创建的MyProducer类中添加实现事务消息的方法 sendTransactionMessage。
/**
* 发送事务消息
*
* @param topic 话题
* @param msg 消息
*/
public void sendTransactionMessage(String topic, String msg) throws InterruptedException {
String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};
for (int i = 0; i < 10; i++) {
// 2. 将topic和tag整合在一起,以:分割,
String destination = topic + ":" + tags[i % tags.length];
// 1.注意该message是org.springframework.messaging.Message
Message<String> message = MessageBuilder.withPayload(msg + "_" + tags[i % tags.length] + "_" + i)
.setHeader("destination", destination).build();
// 第一个参数是发布的目的地,第二个参数是消息,第三个参数是额外的参数
rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
Thread.sleep(10);
}
}
这里需要注意的是传入的Message类是org.springframework.messaging.Message ,不是RocketMQ的Message。
5.2. 实现本地事务消息
接着在定义生产者本地事务实现类 MyTransactionListener,该类实现了RocketMQLocalTransactionListener接口,并重写了executeLocalTransaction方法和checkLocalTransaction方法。这里多了一步就是将 org.springframework.messaging.Message
转成 org.apache.rocketmq.common.message.Message
。
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 将消息转成rocketmq下的message
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "utf-8", (String) arg, msg);
String tags = message.getTags();
if (tags.equals("tagA")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (tags.equals("tagB")) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 将消息转成rocketmq下的message
String destination = (String) msg.getHeaders().get("destination");
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),
"utf-8",destination, msg);
String tags = message.getTags();
if (tags.equals("tagC")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (tags.equals("tagD")) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}
SpringCloudStream整合RocketMQ
1. Spring Cloud Stream是什么?
Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。
官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
官网概述:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations
该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的Spring习惯用法和最佳实践,包括对持久pub/sub 语义、消费者组和有状态分区的支持。
简单的理解就是Spring Cloud Stream 通过在上层定义统一消息的编程模型,屏蔽了底层消息中间件的差异,降低了使用成本。下图展示了Spring Cloud Stream的处理架构
Spring Cloud Stream的核心构建块(编程模型)是:
- Destination Binders: 负责提供与外部消息传递系统集成的组件。Binders 可以生成Bindings。
- **Bindings: ** 外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。即用来绑定消息生产者和消息消费者。它有两种类型,INPUT和OUTPUT,INPUT对应消费者,OUTPUT对应生产者。
- Message: 生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范的数据结构。
2. Spring Cloud Stream的执行流程
3. 注解代码实现
首先创建一个生产者项目 my-springcloud-rocketmq-producer 和一个消费者项目 my-springcloud-rocketmq-consumer。
本demo使用的 版本号是 cloud 2021.0.5.0 +springboot 2.6.13
在 my-springcloud-rocketmq-producer 上的操作
3.1. 引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3.2 . 属性文件配置
spring:
cloud:
stream:
bindings:
output:
destination: my-springcloud-stream-topic
rocketmq:
binder:
name-server: 172.31.184.89:9876
3.3. 定义生产者
在MySpringcloudRocketmqProducerApplication 添加 @EnableBinding(Source.class) 注解。然后创建生产者。
@Component
public class MyProducer {
@Resource
private Source source;
public void sendMessage(String msg) {
// 封装消息头
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "tagA");
// 创建消息对象
Message<String> message = MessageBuilder.createMessage(msg, new MessageHeaders(headers));
// 发送消息
source.output().send(message);
}
}
在 my-springcloud-rocketmq-consumer上的操作
3.4. 引入依赖同生产者
3.5. 配置文件修改
spring.cloud.stream.rocketmq.binder.name-server=172.31.184.89:9876
spring.cloud.stream.bindings.input.destination=my-springcloud-stream-topic
spring.cloud.stream.bindings.input.group=my-springcloud-stream-consume-group
3.6. 定义消费者
在MySpringcloudRocketmqConsumerApplication 类上添加 @EnableBinding(Sink.class)注解。
@Component
public class MyConsumer {
@StreamListener(Sink.INPUT)
public void processMessage(String message) {
System.out.println("收到的消息=" + message);
}
}
更多推荐
所有评论(0)