在这里插入图片描述
生产组:用于消息的发送。
消费组:用于消息的订阅处理。
生产组和消费组,方便扩缩机器,增减处理能力,集群组的名字,用于标记用途中的一员。每次只会随机的发给每个集群中的一员。

生产者使用

  1. 创建生产者对象 DefaultMQProducer
  2. 设置NamesrvAddr
  3. 启动生产者服务
  4. 创建消息并发送

代码实现如下:
同步发送:

// 创建DefaultMQProducer消息生产者对象
		DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
		//设置NameServer节点地址,多个节点间用分号分割
		producer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
		//与NameServer建立长连接
		producer.start();

		for(int i = 0 ; i <5; i ++) {
			//	1.	创建消息
			Message message = new Message("test_quick_topic",	//	主题
					"TagA", //	标签
					"key" + i,	// 	用户自定义的key ,唯一的标识
					("Hello RocketMQ" + i).getBytes());	//	消息内容实体(byte[])
			//	2.1	同步发送消息
//			if(i == 1) {
//				message.setDelayTimeLevel(3);
//			}
			// 发送消息,获取发送结果
			SendResult sr = producer.send(message, new MessageQueueSelector() {

				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					Integer queueNumber = (Integer)arg;
					return mqs.get(queueNumber);
				}
			}, 2);
			System.err.println(sr);

//			SendResult sr = producer.send(message);
//			SendStatus status = sr.getSendStatus();
//			System.err.println(status);


          //System.err.println("消息发出: " + sr);


		}
		// 消息发送完毕关闭连接
		producer.shutdown();

异步发送:

			//  2.2 异步发送消息
			producer.send(message, new SendCallback() {
				//rabbitmq急速入门的实战: 可靠性消息投递
				@Override
				public void onSuccess(SendResult sendResult) {
					System.err.println("msgId: " + sendResult.getMsgId() + ", status: " + sendResult.getSendStatus());
				}
				@Override
				public void onException(Throwable e) {
					e.printStackTrace();
					System.err.println("------发送失败");
				}
			});

执行生产者发送消息,可以看到控制台输出如下:
在这里插入图片描述
在对应的控制台可以查看到对应的消息主题
在这里插入图片描述
在消息页签可以通过topic查询到消息,也可以通过message_key和message_id查询。
在这里插入图片描述

消费者使用

  1. 创建消费者对象 DefaultMQPushConsumer
  2. 设置NamesrvAddr及其消费位置ConsumeFromWhere
  3. 设置订阅主题subscribe
  4. 注册监听并消费registerMessageListener

具体代码实现如下:

        // 创建消费者对象
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
		// 设置NameServer节点
		consumer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
		// 设置消费位置,从哪个点开始
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
		/*订阅主题,
        consumer.subscribe包含两个参数:
        topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。
        subExpression: 子表达式用于筛选tags。
            同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。
            例如:设置为*,则代表接收所有tags数据。
            例如:设置为2022S1,则Broker中只有tags=2022S1的消息会被接收,而2022S2就会被排除在外。
        */
		consumer.subscribe("test_quick_topic", "*");
		// 创建监听,当有新的消息监听程序会及时捕捉并加以处理。
		consumer.registerMessageListener(new MessageListenerConcurrently() {

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				MessageExt me = msgs.get(0);
				try {
					String topic = me.getTopic();
					String tags = me.getTags();
					String keys = me.getKeys();
//					if(keys.equals("key1")) {
//						System.err.println("消息消费失败..");
//						int a = 1/0;
//					}

					String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
					System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
				} catch (Exception e) {
					e.printStackTrace();
//					int recousumeTimes = me.getReconsumeTimes();
//					System.err.println("recousumeTimes: " + recousumeTimes);
//					if(recousumeTimes == 3) {
//						//		记录日志....
//						//  	做补偿处理
//						return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//					}

					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		// 启动消费者,与Broker建立长连接,开始监听。
		consumer.start();
		System.err.println("consumer start...");

上述注释代码模拟了消息出现异常的情况,如果连续三次消费失败,则记录日志做补偿处理,并返回成功。

本文内容到此结束了,
如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位大佬指出。
主页共饮一杯无的博客汇总👨‍💻

保持热爱,奔赴下一场山海。🏃🏃🏃

在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐