RocketMQ原生API以及整合SpringBoot
一、Java原生API一、maven依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency>在Rock
一、Java原生API
一、maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
在RocketMQ源码包中的example模块提供了非常详尽的测试代码,可以拿来直接调试,完整示例代码可在源码中查看。
二、指向NameServer
生产者和消费者都需要依赖NameServer才能运行,将NameServer指向RocketMQ集群,就可以连接RocketMQ集群,不需要管Broker在哪里。而RocketMQ提供的生产者和消费者寻找NameServer的方式有两种:
1、在代码中指定namesrvAddr属性。例如:consumer.setNamesrvAddr(“127.0.0.1:9876”);
2、通过NAMESRV_ADDR环境变量来指定。多个NameServer之间用分号连接。
三、消息类型
1.简单样例
消息发送端
- 单向发送:只负责把消息推送到MQ,推过去之后就不再管,这种方式吞吐量高。
- 同步发送:发送消息后,等待MQ返回再进行后续的操作,能够知道消息是否成功发送到MQ,这种方式最慢,但是安全性最高,不易丢失消息。
- 异步发送:发送消息后继续自己的后续操作,但是会给MQ一个回调函数,MQ接收到消息后会请求producer的回调函数。此时producer和broker是双向交互的关系,producer既是客户端也是服务端,broker同理。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//1.创建消息生产者producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//2.指定Nameserver地址,也可以通过环境变量配置
//producer.setNamesrvAddr("127.0.0.1:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 128; i++)
try {
{
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送消息
//同步发送
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
//单向发送,这个方法没有返回值。
producer.sendOneway(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
//6.关闭生产者producer
producer.shutdown();
}
}
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);//重试次数
int messageCount = 100;
//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//异步发送
producer.send(msg, new SendCallback() {
//发送消息成功
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
//发送消息失败
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
消息消费端
- Pull:消费者主动去Broker上拉取消息的拉模式。
- Push:消费者等待Broker把消息推送过来的推模式,推模式也是由拉模式封装出来的。
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
//2.指定Nameserver地址
//consumer.setNamesrvAddr("127.0.0.1:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
//4.注册消息监听,设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
//返回给broker的状态,CONSUME_SUCCESS消费成功 / RECONSUME_LATER过一会重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
//DefaultMQPullConsumer为过期API,新版本可以用DefaultLitePullConsumer,此处仅为示例
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//从topic上拿到message queue集合
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
//拉取消息
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
2.顺序消息
RocketMQ保证的是消息的局部有序,而不是全局有序。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序。
发送者端:只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。部分实现代码如下:
//MessageQueueSelector的select方法用来选择队列,保证同一个orderid的消息发到一个message queue中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
//List<MessageQueue> mqs:topic下所有的队列集合;Message msg当前消息;Object arg就是send方法中的orderId参数
//目的是给当前消息msg找一个目标message queue。
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();//orderId对消息队列取模,保证同一个id一定在同一个队列中
return mqs.get(index);
}
}, orderId);
Broker:一个队列内的消息是可以保证有序的。
消费者端:需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。部分实现代码如下:
//MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
3.广播消息
在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到。而广播模式(MessageModel.BROADCASTING)则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。广播模式与消息生产者无关,消费者部分实现代码如下:
//默认是CLUSTERING,集群模式
consumer.setMessageModel(MessageModel.BROADCASTING);
4.延迟消息
延迟消息就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。生产者部分实现代码如下:
//消息延迟级别,messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(3);
商业版可以任意设计延迟时间,开源版只有规定的18个级别。
延迟队列就是在SCHEDULE_TOPIC_XXXX主题下的18个队列,对应18个级别。
5.批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
官方注释:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB。实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。
批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
6.过滤消息
在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。但是一个消息只能有一个TAG,这是一个限制。消费者部分实现代码如下:
//消费者消费TagFilterTest主题下,tag为TagA或TagC的消息
consumer.subscribe("TagFilterTest", "TagA || TagC");
RocketMQ将消息的过滤实现在broker中,在broker中就会判断如果是TagB的消息,就不会推给消费者,可以减少IO,提高效率。
还可以使用SQL表达式来对消息进行过滤。只有推模式的消费者可以使用SQL过滤,拉模式是用不了的。
生产者部分实现代码:
//设置索引a的值为0-9
msg.putUserProperty("a", String.valueOf(i));
消费者部分实现代码:
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));
7.事务消息
事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。只涉及到消息发送者,与消息消费者无关。
消息生产者示例代码:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
//事务消息专有的消息生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
//在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
事务监听器示例代码:
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// int value = transactionIndex.getAndIncrement();
// int status = value % 3;
// localTrans.put(msg.getTransactionId(), status);
// return LocalTransactionState.UNKNOW;
String tags = msg.getTags();
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;//TagA的消息就提交
}else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;//TagB的消息就回滚
}else{
return LocalTransactionState.UNKNOW;//TagC/D/E的消息为UNKNOW
}
}
//检查本地事务
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// Integer status = localTrans.get(msg.getTransactionId());
// if (null != status) {
// switch (status) {
// case 0:
// return LocalTransactionState.UNKNOW;
// case 1:
// return LocalTransactionState.COMMIT_MESSAGE;
// case 2:
// return LocalTransactionState.ROLLBACK_MESSAGE;
// default:
// return LocalTransactionState.COMMIT_MESSAGE;
// }
// }
// return LocalTransactionState.COMMIT_MESSAGE;
String tags = msg.getTags();
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;//TagC的消息就提交
}else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;//TagD的消息就回滚
}else{
return LocalTransactionState.UNKNOW;//TagE的消息仍为UNKNOW
}
}
}
实现机制:
1.生产者会先向broker发送一个half消息,存入RocketMQ内部的RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,消费者没有订阅该Topic,所以该消息对下游消费者服务不可见。主要是为了检查Broker服务是否正常。
2.如果执行本地事务失败,那么返回的本地事务状态是ROLLBACK_MESSAGE,Broker会直接丢弃该消息。
3.如果本地事务执行时间较长,那么生产者可以先给Broker一个UNKNOW状态,一段时间后Broker会对UNKNOW状态的消息进行回查本地事务,比如提交订单后要等待支付,30分钟内支付即可这种情况。
4.回查本地事务如果还是UNKNOW状态,那么567三个步骤会反复执行,默认将单个消息的检查次数限制为15 次,可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。
5.事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以事务消息只保证了分布式事务的一半。
二、RocketMQ+SpringBoot
maven依赖
在使用SpringBoot的starter集成包时,要特别注意版本,rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,不同版本之间的差距非常大。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
application.properties
#NameServer地址
rocketmq.name-server=localhost:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup
相关属性都以rockemq.开头。具体配置信息可以参考org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
消息生产者
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
//发送普通消息的示例
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
}
引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。
消息消费者
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
消费者部分的核心就在这个@RocketMQMessageListener注解上,所有消费者的核心功能也都会集成到这个注解中。
更多推荐
所有评论(0)