一、概述

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。

二、环境搭建

下载RocketMQ
我们在linux平台下安装一个RocketMQ的服务。
下载RocketMQ,地址 https://github.com/apache/rocketmq/releases

环境要求
Linux 64位操作系统
64bit JDK 1.8+

安装RocketMQ
1 创建工作目录

[root@allen ~]#  mkdir /usr/rocketmq

2 上传文件到Linux系统
rocketmq-all-4.4.0-bin-release.zip

3 解压到安装目录

[root@allen rocketmq]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@allen rocketmq]# mv rocketmq-all-4.4.0-bin-release ../rocketmq

在这里插入图片描述

启动RocketMQ
1 切换到工作目录

cd  /usr/rocketmq/rocketmq-all-4.4.0-bin-release/bin

2 启动NameServer
[root@allen bin]# nohup ./mqnamesrv &
[1] 1467
#只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@allen bin]# tail -f /root/logs/rocketmqlogs/namesrv.log
在这里插入图片描述
3 启动Broker
#编辑bin/runbroker.sh 和 bin/runserver.sh文件,根据本级配置调整工作内存大小,修改里面的
#JAVA_OPT="KaTeX parse error: Expected 'EOF', got '#' at position 42: …-Xmx8g -Xmn4g" #̲为JAVA_OPT="{JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
启动Broker命令如下:

[root@allen bin]# nohup bin/mqbroker -n localhost:9876 &
[root@allen bin]# tail -f /root/logs/rocketmqlogs/broker.log

三、测试RocketMQ

1 测试消息发送

[root@allen bin]# export NAMESRV_ADDR=localhost:9876
[root@allen bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述
2 测试消息接收

[root@allen bin]# export NAMESRV_ADDR=localhost:9876
[root@allen bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述
关闭RocketMQ

[root@allen bin]# ./mqshutdown broker
[root@allen bin]# ./mqshutdown namesrv

四、使用JAVA编程进行消息收发

接下来我们使用Java代码来演示消息的发送和接收
1 在POM文件用导入依赖

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.0.2</version>
</dependency>

2 发送消息
消息发送步骤:

  1. 创建消息生产者, 指定生产者所属的组名
  2. 指定Nameserver地址
  3. 启动生产者
  4. 创建消息对象,指定主题、标签和消息体
  5. 发送消息
  6. 关闭生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class RocketMQSendMessageTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者,并设置生产组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

        //2.为生产者设置NameServer地址
        producer.setNamesrvAddr("192.168.1.110:9876");

        //3.启动生产者
        producer.start();

        //4.构建消息对象,设置主题、标签、内容
        Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());

        //5.发送消息
        SendResult result = producer.send(message, 10000);
        System.out.println(result);

        //6.关闭生产者
        producer.shutdown();
    }
}

发送成功
在这里插入图片描述

3 接收消息
消息接收步骤:

  1. 创建消息消费者, 指定消费者所属的组名
  2. 指定Nameserver地址
  3. 指定消费者订阅的主题和标签
  4. 设置回调函数,编写处理消息的方法
  5. 启动消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketMQReceiveMessageTest {
    public static void main(String[] args) throws Exception {
        //1. 创建消息消费者, 指定消费者所属的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");

        //2. 指定Nameserver地址
        consumer.setNamesrvAddr("192.168.1.110:9876");

        //3. 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");

        //4. 设置回调函数,编写处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("Receive New Messages: " + msgs);

                //返回消费状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5. 启动消息消费者
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

接收成功
在这里插入图片描述

Logo

更多推荐