云原生中间件RocketMQ-快速入门
生产组用于消息的发送。消费组用于消息的订阅处理。生产组和消费组,方便扩缩机器,增减处理能力,集群组的名字,用于标记用途中的一员。每次只会随机的发给每个集群中的一员。
·
生产组:用于消息的发送。
消费组:用于消息的订阅处理。
生产组和消费组,方便扩缩机器,增减处理能力,集群组的名字,用于标记用途中的一员。每次只会随机的发给每个集群中的一员。
生产者使用
- 创建生产者对象 DefaultMQProducer
- 设置NamesrvAddr
- 启动生产者服务
- 创建消息并发送
代码实现如下:
同步发送:
// 创建DefaultMQProducer消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
//设置NameServer节点地址,多个节点间用分号分割
producer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
//与NameServer建立长连接
producer.start();
for(int i = 0 ; i <5; i ++) {
// 1. 创建消息
Message message = new Message("test_quick_topic", // 主题
"TagA", // 标签
"key" + i, // 用户自定义的key ,唯一的标识
("Hello RocketMQ" + i).getBytes()); // 消息内容实体(byte[])
// 2.1 同步发送消息
// if(i == 1) {
// message.setDelayTimeLevel(3);
// }
// 发送消息,获取发送结果
SendResult sr = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNumber = (Integer)arg;
return mqs.get(queueNumber);
}
}, 2);
System.err.println(sr);
// SendResult sr = producer.send(message);
// SendStatus status = sr.getSendStatus();
// System.err.println(status);
//System.err.println("消息发出: " + sr);
}
// 消息发送完毕关闭连接
producer.shutdown();
异步发送:
// 2.2 异步发送消息
producer.send(message, new SendCallback() {
//rabbitmq急速入门的实战: 可靠性消息投递
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msgId: " + sendResult.getMsgId() + ", status: " + sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
System.err.println("------发送失败");
}
});
执行生产者发送消息,可以看到控制台输出如下:
在对应的控制台可以查看到对应的消息主题
在消息页签可以通过topic查询到消息,也可以通过message_key和message_id查询。
消费者使用
- 创建消费者对象 DefaultMQPushConsumer
- 设置NamesrvAddr及其消费位置ConsumeFromWhere
- 设置订阅主题subscribe
- 注册监听并消费registerMessageListener
具体代码实现如下:
// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
// 设置NameServer节点
consumer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
// 设置消费位置,从哪个点开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/*订阅主题,
consumer.subscribe包含两个参数:
topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。
subExpression: 子表达式用于筛选tags。
同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。
例如:设置为*,则代表接收所有tags数据。
例如:设置为2022S1,则Broker中只有tags=2022S1的消息会被接收,而2022S2就会被排除在外。
*/
consumer.subscribe("test_quick_topic", "*");
// 创建监听,当有新的消息监听程序会及时捕捉并加以处理。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
// if(keys.equals("key1")) {
// System.err.println("消息消费失败..");
// int a = 1/0;
// }
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
} catch (Exception e) {
e.printStackTrace();
// int recousumeTimes = me.getReconsumeTimes();
// System.err.println("recousumeTimes: " + recousumeTimes);
// if(recousumeTimes == 3) {
// // 记录日志....
// // 做补偿处理
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者,与Broker建立长连接,开始监听。
consumer.start();
System.err.println("consumer start...");
上述注释代码模拟了消息出现异常的情况,如果连续三次消费失败,则记录日志做补偿处理,并返回成功。
本文内容到此结束了,
如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位大佬指出。
主页:共饮一杯无的博客汇总👨💻保持热爱,奔赴下一场山海。🏃🏃🏃
更多推荐
已为社区贡献14条内容
所有评论(0)