RocketMQ搭建步骤

开发环境

  • 64位 centos7(虚拟机,1G内存)
  • 64位 jdk1.8
  • maven 3.5.0
  • Git
  • tomcat(用于启动rocketmq-console)
  • rocketmq 3.2.6(最好选择maven仓库中已有的版本,保持客户端依赖的jar包和服务器版本一致)
  • rocketmq-console

环境变量配置

vi /etc/profile 打开文件配置如下:

JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
M2_HOME=/usr/maven/
ROCKETMQ_HOME=/usr/rocketmq
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$M2_HOME/bin:$ROCKETMQ_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH M2_HOME ROCKETMQ_HOME PATH
export NAMESRV_ADDR=127.0.0.1:9876

source /etc/profile 使配置文件立即生效

防火墙配置

宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙

service iptables stop 关闭防火墙
service iptables status 查看防火墙的状态
service iptables start 启动防火墙

或者为了安全,只开放特定的端口号,如8080、9876、10911等等,此处不再赘述。

安装、启动RocketMQ

1.下载和安装

cd /usr
wget https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz
mv alibaba-rocketmq-3.2.6 rocketmq

cd rocketmq/bin 进入rocketmq核心命令文件目录

2.设置可执行权限

chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv

3.修改jvm参数

vim修改runserver.sh和runbroker.sh的jvm参数如下(根据虚拟机内存大小设置,超出内存大小可能会报错):
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"

4.启动nameserver

nohup sh mqnamesrv &

5.配置broker

(1)创建broker配置文件

mkdir ../conf/me-2m-2s-async/
sh mqbroker -m >../conf/me-2m-2s-async/broker.p

(2)修改brokerIP

vi ../conf/me-2m-2s-async/broker.p
brokerIP1=192.168.x.x  显示指定为虚拟机的外网IP,不要用localhost和127.0.0.1,因为远程主机会根据brokerIP1指定的地址去访问broker

6.启动broker

nohup sh mqbroker -n localhost:9876 -c ../conf/me-2m-2s-async/broker.p &

7.检查nameserver和broker是否启动成功

执行jps,输出以下进程表示启动成功

8464 NamesrvStartup
8618 BrokerStartup

或者,查看nuhup.out日志文件,有如下信息表示启动成功

The Name Server boot success.
The broker[localhost.localdomain, 192.168.x.x:10911] boot success. and name server is localhost:9876

或者,启动rocketmq自带的Producer和Consumer程序,若可正常发送和消费消息,则表示服务启动成功

bash tools.sh com.alibaba.rocketmq.example.quickstart.Producer #生产者
bash tools.sh com.alibaba.rocketmq.example.quickstart.Consumer #消费者

8.关闭nameserver和broker的方法

sh mqshutdown broker
sh mqshutdown namesrv

安装、启动rocketmq-console

wget https://github.com/duomu/rocketmq-console/raw/master/rocketmq-console.war 下载
将rocketmq-console.war放在/usr/tomcat/webapps目录下
sh /usr/tomcat/bin/startup.sh  启动tomcat

虚拟机本地访问http://localhost:8080/rocketmq-console,显示如下页面表示启动成功
这里写图片描述

宿主机远程访问http://192.168.x.x:8080/rocketmq-console,若无法访问,请检查防火墙是否关闭或者是否开放了8080端口号。

编写测试程序

在宿主机(windows)上编写如下测试程序:

依赖配置

//此处只列出mq相关的依赖
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version>
</dependency>

创建生产者

package com.fuscent.infoquery.practice.rocketmq;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import org.apache.log4j.Logger;

/**
 * @author:duomu
 * @date:2017/8/4 18:09
 */
public class MqProducer {
    private static Logger logger = Logger.getLogger(MqProducer.class);
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("192.168.229.132:9876");
        try {
            producer.start();
            logger.info("producer启动成功");
            for (int i = 0; i < 5; i++) {
                Message msg = new Message("TopicA", "tagA", "OrderID188", "Hello world".getBytes());
                SendResult result = producer.send(msg);
                logger.info("id:" + result.getMsgId() + " result:" + result.getSendStatus());
            }
        } catch (Exception e) {
            logger.error("发送消息失败,Exception error:" + e);
        } finally {
            producer.shutdown();
        }
    }
}

创建消费者

package com.fuscent.infoquery.practice.rocketmq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.apache.log4j.Logger;
import java.util.List;

/**
 * @author:duomu
 * @date:2017/8/4 18:09
 */
public class MqConsumer {
    private static Logger logger = Logger.getLogger(MqConsumer.class);

    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll");
        consumer.setNamesrvAddr("192.168.229.132:9876");
        try {
            consumer.subscribe("TopicA", "tagA||tagB");//可订阅多个tag,但是一个消息只能有一个tag
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    Message msg = list.get(0);
                    logger.info(msg.toString());
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            logger.info("consumer启动成功");
        } catch (MQClientException e) {
            logger.error("消费者订阅消息失败,error:" + e);
        }
    }
}

测试生成者和消费者

启动生成者

这里写图片描述

启动消费者

这里写图片描述

总结

前人栽树,后人乘凉,在baidu+google了n篇文章后,终于把rocketmq搭建成功了,虽然只是单机配置,但是把该踩的坑都踩了,集群搭建应该只是多配几台服务而已,后续再研究啦~~~

坑1

在github上下载了最新的rocketmq4.1.0,后来发现maven中央仓库还没有4.1.0的rocketmq-client依赖包,后来下载了3.5.8,也没有调成功,索性下载一个比较早期的版本,选了3.2.6,我们公司用的3.2.4,比我们公司的早一点点应该不会太差。。。

坑2

nameserver和broker启动成功,宿主机上的生产者发送消息失败,报如下错误,且指向错误码33/44/50:

com.alibaba.rocketmq.client.exception.MQClientException:Send [1] times, still failed, cost [75]ms,...

出现这个问题首先要查看虚拟机本地的producer是否可以正常发送消息,如果本地收发消息正常,那么一定远程访问的过程中出了问题,可能是端口号没开放,也可能是IP地址映射有问题。

对于端口号,我已经确定了n遍,防火墙是关闭的,最初还没有考虑到IP地址的问题,所以百思不得其解,从阿里官方渠道获取了错误33/44/50的解决方案,试了一下也没用,把rocketmq3.2.6源码里面的Producer跑了一下也是报那个错误,错误44的说明里写着可能是producer没有正确连接到NameServer,我知道没有连接成功,可是防火墙我都关闭了还能有什么原因呢。

这里写图片描述

捣鼓了大半天,就卡在这个问题上了,我想我一定是漏掉了什么,反反复复看38/44/50的错误说明,直到看到错误50说明里面的这一句话:

这里写图片描述

然后我注意到下面这个嵌套错误,debug了一下,也没看出什么,当时我还以为这个ip是虚拟机的局域网ip

这里写图片描述

接着就baidu+google,偶然google出一篇思路别具一格的文章,说rocketmq自动识别网络出错,要把其他网络关掉,我之前学习docker的时候的确在虚拟机上配了docker的网络。

这里写图片描述

然后就尝试关掉docker的网络(172.17.0.1),可是关掉了还是照样报上面的错误啊。。。

真的没有办法了,今天早晨来了突然想到,能够访问外网ip不能访问局域网ip,ping一下看看吧,果然局域网ip ping不通,由于对网络、虚拟机了解的不深,我就去求教网络童鞋了,问宿主机怎么能够访问虚拟机的局域网ip(我用的NAT模式),网络童鞋说你用桥接模式吧,当时心中暗喜,心想吼吼我的大难题就要这么简单的解决了,网络童鞋走后,我就试了一下,麻蛋为什么用桥接模式分了新的ip(172.16.2.129),还是报上面那个172.17.0.1的错误。。

第一次搭rocketmq,想尽快调通,基本上都是用的默认配置,而且默认配置一般不会有问题啊,自己写配置才容易出错,然鹅万能的百度告诉我我之前先入为主的观念是错的,我想这应该是终极解决方案了吧。。

这里写图片描述

原来broker自动寻的地址是172.17.0.1,而且深深的刻在了默认配置文件里,虽然我关掉了这个网络,配置文件里还是这个地址,然后我重新写了个配置文件,强制指定broker所在的机器ip为192.168.x.x,重启服务,大功告成!

和局域网ip能否ping通无关,我把网络连接改回了NAT模式,感谢网络童鞋的帮忙,我要好好补一下网络和虚拟机的知识了。。。

参考资料

附上最有价值的几个~~
http://rocketmq.apache.org/docs/quick-start/ 官方资料,搭建mq之前最好把User Guide都看一遍
https://firsh.me/2017/07/19/rocketmq-p-c/
https://my.oschina.net/xcafe/blog/814135 坑2的终极解决方案
http://www.cnblogs.com/badboyf/p/6611774.html

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐