消息中间件RocketMQ环境搭建、测试及使用
一、概述RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。二、环境搭建下载RocketMQ我们在linux平台下安装一个RocketMQ的服务。下载RocketMQ,地址https://github.com/apache/rocketmq/releases环境要求Linux 64位操
一、概述
RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。
二、环境搭建
下载RocketMQ
我们在linux平台下安装一个RocketMQ的服务。
下载RocketMQ,地址 https://github.com/apache/rocketmq/releases
环境要求
Linux 64位操作系统
64bit JDK 1.8+
安装RocketMQ
1 创建工作目录
[root@allen ~]# mkdir /usr/rocketmq
2 上传文件到Linux系统
rocketmq-all-4.4.0-bin-release.zip
3 解压到安装目录
[root@allen rocketmq]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@allen rocketmq]# mv rocketmq-all-4.4.0-bin-release ../rocketmq
启动RocketMQ
1 切换到工作目录
cd /usr/rocketmq/rocketmq-all-4.4.0-bin-release/bin
2 启动NameServer
[root@allen bin]# nohup ./mqnamesrv &
[1] 1467
#只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@allen bin]# tail -f /root/logs/rocketmqlogs/namesrv.log
3 启动Broker
#编辑bin/runbroker.sh 和 bin/runserver.sh文件,根据本级配置调整工作内存大小,修改里面的
#JAVA_OPT="KaTeX parse error: Expected 'EOF', got '#' at position 42: …-Xmx8g -Xmn4g" #̲为JAVA_OPT="{JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
启动Broker命令如下:
[root@allen bin]# nohup bin/mqbroker -n localhost:9876 &
[root@allen bin]# tail -f /root/logs/rocketmqlogs/broker.log
三、测试RocketMQ
1 测试消息发送
[root@allen bin]# export NAMESRV_ADDR=localhost:9876
[root@allen bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
2 测试消息接收
[root@allen bin]# export NAMESRV_ADDR=localhost:9876
[root@allen bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭RocketMQ
[root@allen bin]# ./mqshutdown broker
[root@allen bin]# ./mqshutdown namesrv
四、使用JAVA编程进行消息收发
接下来我们使用Java代码来演示消息的发送和接收
1 在POM文件用导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
2 发送消息
消息发送步骤:
- 创建消息生产者, 指定生产者所属的组名
- 指定Nameserver地址
- 启动生产者
- 创建消息对象,指定主题、标签和消息体
- 发送消息
- 关闭生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class RocketMQSendMessageTest {
public static void main(String[] args) throws Exception {
//1.创建消息生产者,并设置生产组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2.为生产者设置NameServer地址
producer.setNamesrvAddr("192.168.1.110:9876");
//3.启动生产者
producer.start();
//4.构建消息对象,设置主题、标签、内容
Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());
//5.发送消息
SendResult result = producer.send(message, 10000);
System.out.println(result);
//6.关闭生产者
producer.shutdown();
}
}
发送成功
3 接收消息
消息接收步骤:
- 创建消息消费者, 指定消费者所属的组名
- 指定Nameserver地址
- 指定消费者订阅的主题和标签
- 设置回调函数,编写处理消息的方法
- 启动消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQReceiveMessageTest {
public static void main(String[] args) throws Exception {
//1. 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.1.110:9876");
//3. 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");
//4. 设置回调函数,编写处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("Receive New Messages: " + msgs);
//返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息消费者
consumer.start();
System.out.println("Consumer Started.");
}
}
接收成功
更多推荐
所有评论(0)