目录

消费者和生产者的实现细节

1. 消费者的消费模式

1.1.推模式

1.2. 拉模式

2. 生产者负载均衡策略

SpringBoot集成RocketMQ-实现普通消息和事务消息 

1. 引入依赖

2. 配置文件修改

3. 实现生产者

3.1. 编写生产者单元测试

4.实现消费者

5. 实现事务消息

5.1. 实现事务消息的生产者

5.2. 实现本地事务消息

​SpringCloudStream整合RocketMQ

1. Spring Cloud Stream是什么?

2. Spring Cloud Stream的执行流程

3. 注解代码实现

在 my-springcloud-rocketmq-producer 上的操作

3.1. 引入依赖

3.2 . 属性文件配置

3.3. 定义生产者

在 my-springcloud-rocketmq-consumer上的操作

3.4. 引入依赖同生产者

3.5. 配置文件修改

3.6. 定义消费者


消费者和生产者的实现细节

1. 消费者的消费模式

RocketMQ 同时支持消费者的推模式以及拉模式。推模式顾名思义就是broker将消息推送给消费者,拉模式则是消费者主动到队列中拉取消息。默认情况下,RocketMQ使用的是推模式。

在IDEA中导入RocketMQ源码之后,找到 example模块,然后在此模块中找到各种例子。

image-20231005165050015

1.1.推模式

消费者推模式的例子就是 org.apache.rocketmq.example.simple.PushConsumer 。推模式的消费者的实现类是 DefaultMQPushConsumer 。之前的文章已经做了详细介绍,在此就不在赘述了。推模式适合于大部分正常消费的情况

    public static final String TOPIC = "TopicTest";
    public static final String CONSUMER_GROUP = "CID_JODIE_1";
    public static final String NAMESRV_ADDR = "127.0.0.1:9876";
    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        consumer.setNamesrvAddr(NAMESRV_ADDR);

        consumer.subscribe(TOPIC, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
1.2. 拉模式

消费者拉模式的例子是:org.apache.rocketmq.example.simple.LitePullConsumerAssign 。拉模式主要适用于回溯消费消息。比如:某个消息你消费失败了,你现在想重新消费该消息的情况。我们知道RocketMQ中消息消费完之后不会里面会被删除,默认会在队列中保留48小时。通过broker配置文件中的fileReservedTime参数进行设置。

  //1.创建DefaultLitePullConsumer实例
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        litePullConsumer.setAutoCommit(false);
        //2.启动litePullConsumer实例
        litePullConsumer.start();
        //3.获取TopicTest主题下所有的队列
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        //4.消费者需要拉取的队列的集合
        litePullConsumer.assign(assignList);
        //5.消费者需要定位,哪个队列,多少偏移量的消息。
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commit();
            }
        } finally {
            litePullConsumer.shutdown();
        }

2. 生产者负载均衡策略

我们都知道一个主题下会有多个消息队列(MessageQueue),那么,生产者在发送消息的时候如何选择消息队列呢?

首先找到生产者的示例代码类:org.apache.rocketmq.example.simple.Producer。在该类中找到发送消息的方法 producer.send(msg)

接着找到发送消息的默认实现方法 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

在此方法中可以找到 selectOneMessageQueue 方法,从方法名可以知道此方法就是用来选出一个MessageQueue的

//省略部分代码
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
                //省略部分代码

在selectOneMessageQueue方法中通过调用 tpInfo.selectOneMessageQueue 方法来获取

 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
        BrokerFilter brokerFilter = threadBrokerFilter.get();
        brokerFilter.setLastBrokerName(lastBrokerName);
        if (this.sendLatencyFaultEnable) {
            if (resetIndex) {
                tpInfo.resetIndex();
            }
            MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);
            if (mq != null) {
                return mq;
            }

            mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);
            if (mq != null) {
                return mq;
            }

            return tpInfo.selectOneMessageQueue();
        }

        MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);
        if (mq != null) {
            return mq;
        }
        return tpInfo.selectOneMessageQueue();
    }

那么最终的实现逻辑就是在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue 方法中了。我们可以查看此方法。

private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) {
       //省略非核心代码
        if (filter != null && filter.length != 0) {
            for (int i = 0; i < messageQueueList.size(); i++) {
                int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
                MessageQueue mq = messageQueueList.get(index);
                boolean filterResult = true;
                for (QueueFilter f: filter) {
                    Preconditions.checkNotNull(f);
                    filterResult &= f.filter(mq);
                }
                if (filterResult) {
                    return mq;
                }
            }

            return null;
        }

这里的核心代码就是下面这句代码:

 int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

首先通过 sendQueue.incrementAndGet() 方法获取当前线程下index值。然后对该主题下所有的队列数进行求模取余。也就是说RocketMQ默认会采取轮询的方式选择消息队列 接着我们来看下该方法的实现。

  private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<>();
    private final Random random = new Random();
    public int incrementAndGet() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = random.nextInt();
        }
        this.threadLocalIndex.set(++index);
        return index & POSITIVE_MASK;
    }

首先从线程本地变量 threadLocalIndex 中获取索引值index,如果没有的话则随机取一个值。然后将取到index中进行加一操作放回threadLocalIndex中。

image-20231010092715799

SpringBoot集成RocketMQ-实现普通消息和事务消息 

1. 引入依赖

本例中使用的RocketMQ的版本是 5.1.3。所以引入的 rocketmq-spring-boot 版本要与之匹配。

可以通过mvnrepository进行查看。https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot/2.2.2

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot</artifactId>
    <version>2.2.2</version>
</dependency>

2. 配置文件修改

在springboot-rocketmq-producer项目的application.yml文件中添加如下配置:

rocketmq:
  name-server: 172.31.184.89:9876
  producer:
    group: feige-producer-group
  consumer:
    topic: my-spring-boot-topic

在springboot-rocketmq-consumer项目的application.yml文件中添加如下配置:

server:
  port: 8080
rocketmq:
  name-server: 172.31.184.89:9876
  consumer:
    group: feige-consumer-group
    topic: my-spring-boot-topic

3. 实现生产者

定义一个生产者类MyProducer,在该类中引入RocketMQTemplate 操作类,然后定义发送消息的方法sendMessage,在此方法中调用 rocketMQTemplate.convertAndSend 方法进行消息发送。

@Component
public class MyProducer {
	@Autowired
	private RocketMQTemplate rocketMQTemplate;

	/**
	 * 发送普通消息
	 *
	 * @param topic   主题
	 * @param message 消息
	 */
	public void sendMessage(String topic, String message) {
		rocketMQTemplate.convertAndSend(topic, message);
	}
3.1. 编写生产者单元测试
@Autowired
	private MyProducer myProducer;

	@Value("${rocketmq.consumer.topic:}")
	private String consumerTopic;

	@Test
	void sendMessage() {
		myProducer.sendMessage(consumerTopic,"飞哥SpringBoot集成RocketMQ消息测试");
	}

4.实现消费者

定义消费者类MyConsumer。此类实现了RocketMQListener接口并重写了onMessage方法用于接收broker推送过来的消息。

@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic:}", consumerGroup = "generalConsumerGroup")
public class MyConsumer implements RocketMQListener<String> {

	@Override
	public void onMessage(String s) {
		System.out.println("收到的消息是=" + s);
	}
}

5. 实现事务消息

在SpringBoot中实现RocketMQ的事务消息,整体思路与 【RocketMQ系列六】RocketMQ事务消息 文中提到的思路相同。

5.1. 实现事务消息的生产者

在前面创建的MyProducer类中添加实现事务消息的方法 sendTransactionMessage。

/**
	 * 发送事务消息
	 *
	 * @param topic 话题
	 * @param msg   消息
	 */
	public void sendTransactionMessage(String topic, String msg) throws InterruptedException {
		String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};
		for (int i = 0; i < 10; i++) {
			// 2. 将topic和tag整合在一起,以:分割,
			String destination = topic + ":" + tags[i % tags.length];
				// 1.注意该message是org.springframework.messaging.Message
			Message<String> message = MessageBuilder.withPayload(msg + "_" + tags[i % tags.length] + "_" + i)
					.setHeader("destination", destination).build();
			// 第一个参数是发布的目的地,第二个参数是消息,第三个参数是额外的参数
			rocketMQTemplate.sendMessageInTransaction(destination, message, destination);

			Thread.sleep(10);
		}
	}

这里需要注意的是传入的Message类是org.springframework.messaging.Message ,不是RocketMQ的Message。

5.2. 实现本地事务消息

接着在定义生产者本地事务实现类 MyTransactionListener,该类实现了RocketMQLocalTransactionListener接口,并重写了executeLocalTransaction方法和checkLocalTransaction方法。这里多了一步就是将 org.springframework.messaging.Message 转成 org.apache.rocketmq.common.message.Message 。

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListener implements RocketMQLocalTransactionListener {
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		// 将消息转成rocketmq下的message
		org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "utf-8", (String) arg, msg);
		String tags = message.getTags();
		if (tags.equals("tagA")) {
			return RocketMQLocalTransactionState.COMMIT;
		} else if (tags.equals("tagB")) {
			return RocketMQLocalTransactionState.ROLLBACK;
		}
		return RocketMQLocalTransactionState.UNKNOWN;
	}

@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		// 将消息转成rocketmq下的message
		String destination = (String) msg.getHeaders().get("destination");
		org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),
				"utf-8",destination, msg);
		String tags = message.getTags();
		if (tags.equals("tagC")) {
			return RocketMQLocalTransactionState.COMMIT;
		} else if (tags.equals("tagD")) {
			return RocketMQLocalTransactionState.ROLLBACK;
		}
		return RocketMQLocalTransactionState.UNKNOWN;
	}
}

​SpringCloudStream整合RocketMQ

1. Spring Cloud Stream是什么?

Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。

官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

官网概述:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的Spring习惯用法和最佳实践,包括对持久pub/sub 语义、消费者组和有状态分区的支持。

简单的理解就是Spring Cloud Stream 通过在上层定义统一消息的编程模型,屏蔽了底层消息中间件的差异,降低了使用成本。下图展示了Spring Cloud Stream的处理架构

带粘合剂的 SCSt

image-20231006092654679

Spring Cloud Stream的核心构建块(编程模型)是:

  1. Destination Binders: 负责提供与外部消息传递系统集成的组件。Binders 可以生成Bindings。
  2. **Bindings: ** 外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。即用来绑定消息生产者和消息消费者。它有两种类型,INPUT和OUTPUT,INPUT对应消费者,OUTPUT对应生产者。
  3. Message: 生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范的数据结构。

2. Spring Cloud Stream的执行流程

SpringCloudStream

3. 注解代码实现

首先创建一个生产者项目 my-springcloud-rocketmq-producer 和一个消费者项目 my-springcloud-rocketmq-consumer。

本demo使用的 版本号是 cloud 2021.0.5.0 +springboot 2.6.13

在 my-springcloud-rocketmq-producer 上的操作
3.1. 引入依赖
  <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
3.2 . 属性文件配置
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-springcloud-stream-topic
      rocketmq:
        binder:
          name-server: 172.31.184.89:9876
3.3. 定义生产者

在MySpringcloudRocketmqProducerApplication 添加 @EnableBinding(Source.class) 注解。然后创建生产者。

@Component
public class MyProducer {
	@Resource
	private Source source;
	
	public void sendMessage(String msg) {
		// 封装消息头
		Map<String, Object> headers = new HashMap<>();
		headers.put(MessageConst.PROPERTY_TAGS, "tagA");
		// 创建消息对象
		Message<String> message = MessageBuilder.createMessage(msg, new MessageHeaders(headers));
		// 发送消息
		source.output().send(message);
	}
}
在 my-springcloud-rocketmq-consumer上的操作
3.4. 引入依赖同生产者
3.5. 配置文件修改
spring.cloud.stream.rocketmq.binder.name-server=172.31.184.89:9876
spring.cloud.stream.bindings.input.destination=my-springcloud-stream-topic
spring.cloud.stream.bindings.input.group=my-springcloud-stream-consume-group
3.6. 定义消费者

在MySpringcloudRocketmqConsumerApplication 类上添加 @EnableBinding(Sink.class)注解。

@Component
public class MyConsumer {
   @StreamListener(Sink.INPUT)
   public void processMessage(String message) {
      System.out.println("收到的消息=" + message);
   }
}
Logo

一起探索未来云端世界的核心,云原生技术专区带您领略创新、高效和可扩展的云计算解决方案,引领您在数字化时代的成功之路。

更多推荐