一、环境配置

1.1、下载源码

下载地址:https://github.com/apache/rocketmq

git clone https://github.com/apache/rocketmq.git

1.2、导入maven工程到IDE

在这里插入图片描述

1.3、准备启动的配置

1)在下载的rocketmq根目录创建新文件夹conf
2)把 rocketmq\distribution\conf 下的 broker.conf, broker-a.properties, logback_broker.xml 这几个文件copy到新建的conf目录下,并更新 broker-a.properties 文件的内容(红框部分为新添加):
在这里插入图片描述

用于nameserver启动时加载相应的配置。

1.4、启动NameServer 与 Broker

1.4.1、先启动 NameServer

在 namesrv 模块中找到 NamesrvStartup.java,运行前先配置run configurations, 在环境遍历中配置ROCKETMQ_HOME
在这里插入图片描述
nameserver启动成功后,会输出:

The Name Server boot success. serializeType=JSON

1.4.2、再启动 Broker

从broker模块中,找到 BrokerStartup.java 文件,配置其run configurations:
1)配置其Arguments启动配置
在这里插入图片描述
这个配置里,包括了上面修改的 namesrvAddr 等。

2)配置启动环境变量:
在这里插入图片描述
然后运行。运行结果输出:

The broker[broker-a, 10.91.199.184:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

1.5、执行生产与消费

1.5.1、生产者

在rocketmq-example模块中,找到Producer.java,添加nameserver的地址(在前面的broker-a.properties中配置过):
在这里插入图片描述
运行,输出结果:

SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C696633809874997A0000, offsetMsgId=0A5BC7B800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499E50001, offsetMsgId=0A5BC7B800002A9F00000000000000BE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499EB0002, offsetMsgId=0A5BC7B800002A9F000000000000017C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F00003, offsetMsgId=0A5BC7B800002A9F000000000000023A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F60004, offsetMsgId=0A5BC7B800002A9F00000000000002F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F90005, offsetMsgId=0A5BC7B800002A9F00000000000003B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499FC0006, offsetMsgId=0A5BC7B800002A9F0000000000000474, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A000007, offsetMsgId=0A5BC7B800002A9F0000000000000532, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A020008, offsetMsgId=0A5BC7B800002A9F00000000000005F0, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A060009, offsetMsgId=0A5BC7B800002A9F00000000000006AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=2]
14:29:38.465 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:38.472 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[10.91.199.184:10911] result: true

1.5.2、消费者

在rocketmq-example模块中,找到 Consumer.java,添加NameServer地址:
在这里插入图片描述
然后运行,结果如下:

Consumer Started.
ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778314, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778374, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C696633809874997A0000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778411, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778414, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F000000000000017C, commitLogOffset=380, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499EB0002, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_6 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778432, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778433, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000532, commitLogOffset=1330, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A000007, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778405, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778409, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000000BE, commitLogOffset=190, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499E50001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_4 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778416, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778420, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F000000000000023A, commitLogOffset=570, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F00003, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_5 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778422, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778424, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000002F8, commitLogOffset=760, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F60004, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_9 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1648621778434, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778436, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000005F0, commitLogOffset=1520, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A020008, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_7 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778428, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778430, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000474, commitLogOffset=1140, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499FC0006, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_10 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1648621778438, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778439, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000006AE, commitLogOffset=1710, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A060009, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]] 
ConsumeMessageThread_please_rename_unique_group_name_4_8 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778425, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778427, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000003B6, commitLogOffset=950, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F90005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]] 

二、源码解析

2.1、源码目录

在这里插入图片描述
1)broker:broker模块
2)client:消息客户端,包括生产者、消费者等
3)common:通用包
4)dev:开发者信息 (非源码)
5)distribution:部署实例文件夹(非源码)
6)example:RocketMQ示例代码
7)filter:消息过滤相关
8)filtersrv:消息过滤服务器实现
9)logging:日志相关
10)namesrv:NameServer相关实现
11)openmessaging:消息开放标准
12)remoting:基于Netty的RPC通信
13)srvutil:服务端工具类
14)store:消息存储实现
15)test:测试相关类
17)tools:监控命令相关实现

2.2、NameServer源码解读

2.2.1、NameServer设计原理简述

NameServer替代了ZK, 负责RocketMQ的路由管理、服务注册与发现。

NameServer负责管理和维护broker集群的节点:
1)Broker在启动时,会向所有的NameServer进行注册。

2)Producer在发消息之前,先从NameServer中获取Broker集群节点列表,然后根据某种负载均衡算法,从列表中选一台broker对其发送消息。

3)NameServer与每台broker保持长连接,并间隔每30s检测一次broker是否存活,如果发现broker已经死机,则从路由注册表中将其移除。但移除后,并不会把变化马上同步给producer,这样设计是为了降低NameServer的实现复杂度。

4)NameServer本身的高可用,是通过部署多台NameServer来实现,但他们彼此互补通信,所以他们的数据也不是强一致,这样做也是为了追求设计上的简单高效。

2.2.2、NameServer启动流程源码解读

入口:rocketmq-namesrv模块的NamesrvStartup.java main函数
NamesrvStartup.main0 --> NamesrvController controller = createNamesrvController(args) :

Step1、在createNamesrvController方法中,加载两个配置

1、NamesrvConfig:NameServer的业务配置参数
其属性值包括:
在这里插入图片描述
1)rocketmqhome:rocketmq的主目录,可以通过 -Drocketmq.home.dir=path 或 设置环境变量 ROCKETMQ_HOME来配置RocketMQ的主目录

2)kvConfigPath:NameServer存储KV配置属性的持久化路径

3)configStorePath:NameServer默认配置文件路径,默认不生效。NameServer启动时如果要通过配置文件配置NameServer属性的话,可使用-c选项

4)orderMessageEnable:是否支持顺序消息,默认为false

2、NettyServerConfig:NameServer的网络配置参数
看其启动参数 -c 后面是否有带配置文件
在这里插入图片描述
1)listenPort:NameServer的监听端口,默认会被初始化为9876

2)serverWorkerThreads:Netty处理业务的线程池中的线程个数

3)serverCallbackExecutorThreads:Netty public 任务线程池中的线程个数,RocketMQ的netty会对不同的业务创建不同独立的线程池,如消息发送、消息消费、心跳检测等。但如果该业务类型(RequestCode)未注册线程池,则在public线程池中执行。

4)serverSelectorThreads:IO线程池中的线程个数,这些线程在netty中负责解析请求包,然后转发到各业务线程池(worker)中进行业务处理,然后再把结果返回给调用方。

5)serverOnewaySemaphoreValue:send oneway消息请求的并发度(broker的参数)

6)serverAsyncSemaphoreValue:异步消息发送最大并发度(Broker的参数)

7)serverChannelMaxIdelTimeSeconds:网络连接最大空闲时间,默认120s,如果连接空闲时间超过该参数设置的值,则连接会被关闭。

8)serverSocketSndBufSize:网络socket发送缓冲区大小,默认64k

9)serverSocketRcvBufSize:网络socket接收缓冲区大小,默认64k

10)serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,默认开启

11)useEpollNativeSelector:是否启用Epoll IO模型,Linux环境建议开启

Step2、创建NamesrvController实例(作为NameServer的控制器),并对实例初始化。

NamesrvController实例作为NameServer的核心控制器
NamesrvStartup.main0 --> start(controller) --> boolean initResult = controller.initialize():
1、加载KV配置到内存HashMap,即

kvConfigManager.load()

,创建NettyServer网络处理对象,即

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

然后开启两个定时任务,用于做心跳检测:
定时任务1:每隔10s检测一次broker,如broker不心跳则摘除。即:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

定时任务2:每隔10分钟打印一次KV配置,即:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
Step3、注册JVM钩子函数,并启动服务器,以监听broker, 及producer的网络请求
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
            controller.shutdown();
            return null;
        }));

controller.start();

controller的shutdown方法:

public void shutdown() {
        this.remotingServer.shutdown();
        this.remotingExecutor.shutdown();
        this.scheduledExecutorService.shutdown();

        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }

通过钩子来关闭线程池,是一种优雅的方式。

2.2.3、NameServer的路由注册、故障剔除

NameServer的主要作用:
1、为Producer和Consumer提供关于topic的路由信息,所以NameServer首先要存储这些路由信息,这就包括了broker的路由注册。
2、管理broker节点,这就包括了broker的路由删除。

2.2.3.1、路由信息的存储

NameServer的路由实现类为:RouteInfoManager.java,其存储元数据的代码:

private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

1)topicQueueTable:存储 topic队列 的映射,消息发送时,根据此进行负载均衡。

2)brokerAddrTable:存储 brokerNamebroker基本信息 的映射,broker的基本信息包括:集群名称,集群内所有broker的地址。

3)clusterAddrTable:存储 集群名集群中所有brokerName 的映射。

4)brokerLiveTable:存储 broker地址broker的状态信息 的映射,NameServer每次收到心跳包后,会替换该信息。

5)filterServerTable:存储 broker地址Filter Server 的映射,主要用于类模式消息过滤。

2.2.3.2、NameServer管理的broker的部署架构

RocketMQ的部署架构模式:
1)一个topic拥有多个消息队列,一个broker为每个topic默认创建4个读队列,4个写队列。
2)多个broker组成一个集群,brokerName由相同的多个broker组成Master-Slave架构,brokerId为0代表master,大于0代表slave。
3)BrokerLiveInfo中的lastUpdateTimestamp存储该broker上次收到的心跳包时间。

NameServer路由关键类的类图:
在这里插入图片描述
如RocketMQ 两主两从,其部署图如下:
在这里插入图片描述
其对应的数据结构为:
在这里插入图片描述
在这里插入图片描述

2.2.3.3、路由注册

broker启动时,会向NameServer集群中的所有节点发送心跳请求,之后每隔30s发送一次心跳包。

NameServer收到心跳包后,会更新brokerLiveTable缓存中的BrokerLiveInfo的lastUpdateTimstamp;然后NameServer每隔10s会扫描一次brokerLiveTable,如果连续120s没有收到某个broker的心跳包,则将该broker从路由信息中移除,并关闭对该broker的连接。

1、broker发送心跳包

在broker模块的BrokerController中的start()方法:每隔10s发送一次心跳给NameServer:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

其关键的BrokerOuterAPI.registerBrokerAll()方法,把broker注册到了所有的NameServer上

final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);

            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

对于向某个nameserver注册,采用oneway的方式发送:

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);

        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }

        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
    }

心跳包都包含哪些内容呢?在BrokerOuterAPI.java的registerBrokerAll()中:
1、封装了RegisterBrokerRequestHeader,具体包括:
1)brokerAddr:broker地址
2)brokerId:0表示master, 大于0表示slave
3)brokerName
4)clusterName
5)haServerAddr:master地址,初次请求时该值为空,slave向NameServer注册后返回。

2、封装了RegisterBrokerBody,具体包括:
1)filterServerList:消息过滤服务器列表
2)topicConfigWrapper:topic配置。用TopicConfigSerializeWrapper封装了TopicConfigManager中的topicConfigTable, 其中存储了broker启动时默认的一些topic,如MixAll.SELF_TEST_TOPIC, MixAll.DEFAULT_TOPIC, MixAll.BENCHMARK_TOPIC, MixAll.OFFSET_MOVED_EVENT, BrokerConfig#brokerClusterName, BrokerCOnfig#brokerName。
Broker中topic默认存储在 ${Rocket_Home}/store/config/topic.json中。

2、NameServer处理心跳包

在namesrv模块,DefaultRequestProcessor.java的processRequest()方法,用于根据请求类型,转发到不同的地方处理:

switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);

此处关于心跳包的请求类型为REGISTER_BROKER,所以转发到RoutelnfoManager.registerBroker:

Step1:注册该broker到clusterAddrTable

先加写锁,防止改RouteInfoManager中的路由表时由于并发导致写入的数据丢失。如果该broker在路由表中不存在,则创建,然后将broker加入到路由信息的broker集合中。

Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
    brokerNames = new HashSet<String>();
    this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
Step2:注册该broker到brokerAddrTable

维护BrokerData信息,如果该broker不存在,则创建。
registerFirst 为true,表示第一次创建

BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    this.brokerAddrTable.put(brokerName, brokerData);
}

String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
if (MixAll.MASTER_ID == brokerId) {
    log.info("cluster [{}] brokerName [{}] master address change from {} to {}",
            brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);
}
registerFirst = registerFirst || (null == oldAddr);
Step3:Master的注册时更新topic信息到topicQueueTable

如果broker是Master,且broker topic配置信息发生变化或者初次注册,则需要创建或更新 topic路由元数据,即填充topicQueueTable,即为默认topic自动注册路由信息,其中包括MixAll.DEFAULT_TOPIC的路由信息。

if (null != topicConfigWrapper
       && MixAll.MASTER_ID == brokerId) {
   if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
           || registerFirst) {
       ConcurrentMap<String, TopicConfig> tcTable =
               topicConfigWrapper.getTopicConfigTable();
       if (tcTable != null) {
           for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
               this.createAndUpdateQueueData(brokerName, entry.getValue());
           }
       }
   }
}

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());

    Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataMap) {
        queueDataMap = new HashMap<>();
        queueDataMap.put(queueData.getBrokerName(), queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap);
        log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    } else {
        QueueData old = queueDataMap.put(queueData.getBrokerName(), queueData);
        if (old != null && !old.equals(queueData)) {
            log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), old,
                    queueData);
        }
    }
}
Step4:更新BrokerLiveInfo来维护存活的broker信息表

RoutelnfoManager#registerBroker

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
        new BrokerLiveInfo(
                System.currentTimeMillis(),
                topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr));
if (null == prevBrokerLiveInfo) {
    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
Step5:注册broker过滤器Server地址列表

一个broker上会关联多个FilterServer消息过滤服务器。

关于NameServer broker注册代码的设计总结

1、broker与NameServer保持长连接
2、broker的状态,存储在NameServer的brokerLiveTable中,NameServer没收到一个心跳包,都会更新brokerLiveTable中关于broker的状态信息及路由表(topicQueueTable, brokerAddrTable, brokerLiveTable, filterServerTable)。
3、更新上述路由表采用了读写锁:允许多个发送者并发读,但写时只能串行写。

2.2.3.4、路由删除

broker每隔30s向NameServer发送一个心跳包,心跳包中包含brokerId, broker地址,broker名称,broker所属集群名,broker关联的filterServer列表。

NameServer会每隔10s扫描一次brokerLiveTable表,如果BrokerLive的lastUpdateTimstamp的时间戳距离当前时间超过120s,则认为broker失效了,从而移除该broker,并关闭与该broker的连接,同时更新topicQueueTable, brokerAddrTable, brokerLiveTable,filterServerTable。

RocketMQ删除broker的触发场景有2个:
1)10s扫描120s没有心跳的broker,并删除
2)broker正常关闭,会执行unregisterBroker指令。
这两种方式删除broker的路由,都是走同样的代码,及从topicQueueTable, brokerAddrTable, brokerLiveTable, filterServerTable删除该broker的相关信息。

路由删除的相关代码:RouteInfoManager#scanNotActiveBroker

public int scanNotActiveBroker() {
   int removeCount = 0;
   Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
   while (it.hasNext()) {
       Entry<String, BrokerLiveInfo> next = it.next();
       long last = next.getValue().getLastUpdateTimestamp();
       if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
           RemotingUtil.closeChannel(next.getValue().getChannel());
           it.remove();
           log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
           this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

           removeCount++;
       }
   }

   return removeCount;
}

其中,RemotingUtil.closeChannel(next.getValue().getChannel());是用于关闭netty的nio长连接

this.onChannelDestroy(next.getKey(), next.getValue().getChannel());用于删除该broker的路由表信息:
先加读锁,查此要删除的broker在brokerLiveTable中是否存在:

public void onChannelDestroy(String remoteAddr, Channel channel) {
        String brokerAddrFound = null;
        if (channel != null) {
            try {
                try {
                    this.lock.readLock().lockInterruptibly();
                    Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                            this.brokerLiveTable.entrySet().iterator();
                    while (itBrokerLiveTable.hasNext()) {
                        Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                        if (entry.getValue().getChannel() == channel) {
                            brokerAddrFound = entry.getKey();
                            break;
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }

如存在,则通过加写锁删除其broker的路由信息表:
Step1:把brokerAddress 从brokerLiveTable,filterServerTable中 移除

this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
        this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
    BrokerData brokerData = itBrokerAddrTable.next().getValue();

    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
    while (it.hasNext()) {
        Entry<Long, String> entry = it.next();
        Long brokerId = entry.getKey();
        String brokerAddr = entry.getValue();
        if (brokerAddr.equals(brokerAddrFound)) {
            brokerNameFound = brokerData.getBrokerName();
            it.remove();
            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                    brokerId, brokerAddr);
            break;
        }
    }

    if (brokerData.getBrokerAddrs().isEmpty()) {
        removeBrokerName = true;
        itBrokerAddrTable.remove();
        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                brokerData.getBrokerName());
    }
}

Step2:维护brokerAddrTable
brokerAddrTable的数据结构:
HashMap<String /*brokerName*/, BrokerData> brokerAddrTable
BrokerData的数据结构:HashMap<Long /*brokerId*/, String /*broker address*/> broker Address
遍历brokerAddrTable,找到brokerAddrs中的broker,从BrokerData中删除。

Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                            this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
    BrokerData brokerData = itBrokerAddrTable.next().getValue();

    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
    while (it.hasNext()) {
        Entry<Long, String> entry = it.next();
        Long brokerId = entry.getKey();
        String brokerAddr = entry.getValue();
        if (brokerAddr.equals(brokerAddrFound)) {
            brokerNameFound = brokerData.getBrokerName();
            it.remove();
            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                    brokerId, brokerAddr);
            break;
        }
    }

    if (brokerData.getBrokerAddrs().isEmpty()) {
        removeBrokerName = true;
        itBrokerAddrTable.remove();
        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                brokerData.getBrokerName());
    }
}

Step3:根据brokerName,从clusterAddrTable中找到broker,并从集群中移除:

if (brokerNameFound != null && removeBrokerName) {
   Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, Set<String>> entry = it.next();
        String clusterName = entry.getKey();
        Set<String> brokerNames = entry.getValue();
        boolean removed = brokerNames.remove(brokerNameFound);
        if (removed) {
            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                    brokerNameFound, clusterName);

            if (brokerNames.isEmpty()) {
                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                        clusterName);
                it.remove();
            }
            break;
        }
    }
}

Step4:根据brokerName, 遍历所有topic的队列,如果队列中包含了当前broker,则移除:

if (removeBrokerName) {
    String finalBrokerNameFound = brokerNameFound;
    Set<String> needRemoveTopic = new HashSet<>();

    topicQueueTable.forEach((topic, queueDataMap) -> {
        QueueData old = queueDataMap.remove(finalBrokerNameFound);
        log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                topic, old);

        if (queueDataMap.size() == 0) {
            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                    topic);
            needRemoveTopic.add(topic);
        }
    });

    needRemoveTopic.forEach(topicQueueTable::remove);
}

Step5:最后释放锁,完成路由删除

} finally {
    this.lock.writeLock().unlock();
}
2.2.3.5、路由发现

当某broker宕机,如果NameServer通过心跳检测发现后,并不会实时通知每个RocketMQ客户端,而是由客户端定时拉取topic最新的路由时做数据同步。

根据topic拉取路由信息的命令编码为:

GET_ROUTEINTO_BY_TOPIC

代码在:DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) {
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }

        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

上面代码重点在下面的方法:

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);

    try {
        try {
            this.lock.readLock().lockInterruptibly();
            Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
            if (queueDataMap != null) {
                topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
                foundQueueData = true;

                brokerNameSet.addAll(queueDataMap.keySet());

                for (String brokerName : brokerNameSet) {
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                .getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        foundBrokerData = true;

                        // skip if filter server table is empty
                        if (!filterServerTable.isEmpty()) {
                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                List<String> filterServerList = this.filterServerTable.get(brokerAddr);

                                // only add filter server list when not null
                                if (filterServerList != null) {
                                    filterServerMap.put(brokerAddr, filterServerList);
                                }
                            }
                        }
                    }
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }

    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }

    return null;
}

首先通过从RouterInfoManager的topicQueueTable, brokerAddrTable, filterServerTable中分别填充TopicRouteData中的List<QueueData>, List<BrokerData>, flterServer地址表

然后如果找到topic对应的路由信息的topic为顺序消息,则从NameServer VKconfig中获取关于顺序消息相关的配置,来填充路由信息。

2.2.3、NameServer设计总结

NameServer的路由注册、删除机制:
在这里插入图片描述

2.3、消息发送

RocketMQ支持三种消息发送:
1)同步发送:Producer发消息时,同步等待,串行发送
2)异步发送:Producer发消息是,不阻塞,仅指定消息成功后的回调方法,然后立即返回,最后发送成功还是失败,在新的线程中执行返回。
3)单向(Oneway)发送:Producer发消息时,不等待服务器返回结果,直接返回,也不注册回调函数,即不在乎消息是否成功发到了服务端。

2.3.1、消息类

在common模块的Message.java
在这里插入图片描述
消息类基本属性包括:
1)String topic
2)消息 int flag
3)扩展属性 Map<String, String> properties
4)消息体 byte[] body
5)事务ID String transactionId

Message的构造器为:

public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
    this.topic = topic;
    this.flag = flag;
    this.body = body;

    if (tags != null && tags.length() > 0) {
        this.setTags(tags);
    }

    if (keys != null && keys.length() > 0) {
        this.setKeys(keys);
    }

    this.setWaitStoreMsgOK(waitStoreMsgOK);
}

消息的扩展属性,及存储在Map<String, String> properties中,主要包括:
1)tag:用于消息过滤
2)keys:message的索引key, 多个用空格分隔,可根据key快速查找到消息
3)waitStoreMsgOK:发消息时,是否等消息在broker存储完成后再返回
4)delayTimeLevel:消息延迟级别,用于定时消息或消息重试

2.3.2、Producer

2.3.2.1、Producer类主要属性及方法介绍

Producer的代码都在client模块,对于RocketMQ来说,他就是客户端。

client模块中的DefaultMQProducer.java是默认的Producer实现类,他继承自MQProducer类,并实现了MQAdmin接口,其主要方法如下:
在这里插入图片描述
在这里插入图片描述
MQAdmin的方法:
1)创建topic

void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException;

2)根据时间戳从队列中查找其偏移量

long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;

3)查找该消息队列中最大的物理偏移量

long maxOffset(final MessageQueue mq) throws MQClientException;

4)查找该消息队列中最小的物理偏移量

long minOffset(final MessageQueue mq) throws MQClientException;

5)查找消息队列中最早的消息

long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;

6)根据消息偏移量查找消息

MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

7)根据下面条件查找消息
topic, 消息索引key, 本次取出最多消息条数maxNum, 开始时间begin,结束时间end

QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;

MQProducer的方法:
1)查找topic下所有消息队列:

List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;

2)同步发送消息:

SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

3)同步发消息,超时抛异常

SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

4)异步发消息,sendCallback为发送成功后的回调方法

void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;

5)异步发消息,超时抛异常

void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;

6)单向发消息,不管是否接收成功,发出去后立即返回

void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;

7)同步发消息到指定消息队列

SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

8)异步发消息到指定消息队列

void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;

9)单向发消息到指定队列

void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;

10)同步发送消息,并指定负载均衡算法

SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

11)同步批量发送消息

SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

DefaultMQProducer的核心属性:
1)生产者组 private String producerGroup:
对于事务消息,消息服务器在回查事务时,会随机选择生产者组中的任何一个生产者发起事务回查请求

2)默认topic key private String createTopicKey:
仅用于测试或demo

3)topic在每个broker上默认的队列数量private volatile int defaultTopicQueueNums = 4;

4)发消息默认超时时间,单位毫秒:private int sendMsgTimeout = 3000;

5)消息体大小超过此阈值后启动压缩,默认4K:private int compressMsgBodyOverHowmuch = 1024 * 4;

6)同步发送消息重试次数,默认为2,即共执行3次:private int retryTimesWhenSendFailed = 2;

7)异步发送消息重试次数,默认为2,即共执行3次:private int retryTimesWhenSendAsyncFailed = 2;

8)消息重试时选择另一个broker时,是否不等待存储结果就返回,默认为false:private boolean retryAnotherBrokerWhenNotStoreOK = false;

9)允许发送的最大消息长度,默认为4M,最大值为2^32-1:private int maxMessageSize = 1024 * 1024 * 4;

2.3.2.2、Producer启动流程

Producer的启动入口在DefaultMQProducerImpl#start

Step1:检验producerGroup是否符合要求,并把生产者的instanceName改为进程ID

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
    this.defaultMQProducer.changeInstanceNameToPID();
}

Step2:创建MQClientInstance实例,同一个clientId只有一个MQClientInstance。这是通过存到了缓存表:
ConconrrentMap<String /*clientId*/, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>()中。

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

生成clientId的方法:

public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());

    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }

    return sb.toString();
}

clientId为 ip + instance + unitname(可选),为了避免同一个服务器上部署两个producer实例时,其clientId相同,所以RocketMQ设计时,自动将Instance设置为进程ID。

Step3:把producer加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等。

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}

Step4:启动MQClientInstance:

if (startFactory) {
      mQClientFactory.start();
}
2.3.2.3、Producer消息发送流程

消息发送流程包括:消息校验、查找路由、消息发送 三步
代码在DefaultMQProducer#send

Step1: 消息合法校验

校验消息的topic是否合法(不能跟系统的topic重复),消息体长度大于0且小于4M

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
	 if (null == msg) {
	     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
	 }
	 // topic
	 Validators.checkTopic(msg.getTopic());
	 Validators.isNotAllowedSendTopic(msg.getTopic());
	
	 // body
	 if (null == msg.getBody()) {
	     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
	 }
	
	 if (0 == msg.getBody().length) {
	     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
	 }
	
	 if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
	     throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
	         "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
	 }
}
Step2:查找topic路由信息

向topic发送消息前,先要获取到topic的路由信息,即topic在哪个broker节点上。
代码在:DefaultMQProducerImpl#tryToFindTopicPublishInfo

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

如果producer中已经缓存了topic的路由信息,且路由信息中包含了消息队列,则直接返回路由信息;如果没有缓存或包含消息队列,则向NameServer查询该topic的路由信息,然后存到TopicPublishInfo。

下面看下TopicPublishInfo的属性:
1)是否是顺序消息
private boolean orderTopic = false

2)topic对应的消息队列
private List messageQueueList = new ArrayList();

3)是否有topic路由信息
private boolean haveTopicRouterInfo = false

4)topic路由信息
private TopicRouteData topicRouteData;

5)每选择一次消息队列,该值会自增1,如果Integer.MAX_VALUE,则重置为0。用于选择消息队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

TopicRouteData 类的属性:
1)topic队列源数据:
private List queueDatas;

2)topic对应的broker的元数据
private List brokerDatas;

3)broker上过滤服务器的地址列表
private HashMap<String/* brokerAddr /, List/ Filter Server */> filterServerTable;

发消息时,如果本地没有缓存topic的路由信息,则从NameServer查询,如果路由信息未找到,则尝试用默认topic DefaultMQProducerImpl#createTopicKey去NameServer模块查询,如果autoCreateTopicEnable为true, 则NameServer将返回路由信息;如果autoCreateTopicEnable为false,则抛出异常。

从NameServer获取路由信息的client模块的代码:
MQClientInstance#updateTopicRouteInfoFromNameServer

Step1: 如果使用默认topic去查询,如果查询到路由信息,则替换路由信息中读写队列的个数为producer的默认队列个数

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
        clientConfig.getMqClientApiTimeout());
    if (topicRouteData != null) {
        for (QueueData data : topicRouteData.getQueueDatas()) {
            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
            data.setReadQueueNums(queueNums);
            data.setWriteQueueNums(queueNums);
        }
    }
} else {
    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}

Step2:如果找到路由信息,与本地缓存中的对比,如果发生了变化,则更新

TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
    changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
    log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

Step3:更新MQClientInstance Broker地址缓存表

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
    Entry<String, MQProducerInner> entry = it.next();
    MQProducerInner impl = entry.getValue();
    if (impl != null) {
        impl.updateTopicPublishInfo(topic, publishInfo);
    }
}

Step4:遍历从NameServer获取到的QueueData信息,如果队列没有写权限,则继续遍历下一个QueueData;根据brokerName找到brokerData信息,找不到或没有找到master节点,则遍历下一个DataQueue; 最后根据写队列的个数遍历,根据topic+序号创建MessageQueue,填充topicPublishInfo中的List<QueueMessage>, 从而完成消息发送的路由查找。

List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
    if (PermName.isWriteable(qd.getPerm())) {
        BrokerData brokerData = null;
        for (BrokerData bd : route.getBrokerDatas()) {
            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                brokerData = bd;
                break;
            }
        }

        if (null == brokerData) {
            continue;
        }

        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
            continue;
        }

        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
            info.getMessageQueueList().add(mq);
        }
    }
}

info.setOrderTopic(false);
Step3:选择消息队列

消息的producer有发送重试机制,由DefaultMQProducer.retryTimesWhenSendFailed来指定同步发送重试次数。
由retryTimesWhenSendAsyncFailed指定异步重试次数。

选择消息队列有两种方式:
1)sendLatencyFaultEnable=false:默认不启用broker故障延迟机制
代码在TopicPublishInfo#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.incrementAndGet();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

lastBrokerName是上次选择执行发送失败的broker。第一次执行消息队列选择时,lastBrokerName为null, 此时用sendWhichQueue自增,再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue;如果消息发送失败,则下次进行消息队列选择时,规避掉上次MessageQueue所在的broker,这是为了避免再次失败。

之所以broker不可用,还没有从发送列表被清除,是因为NameServer发现broker不可用,摘除后,不会立即同步给Client, 直到client定时从NameServer同步时才能知道。为了解决这个问题,RocketMQ引入了一种机制,在broker宕机期间,如果一次发消息失败后,可以将该broker暂时排除在消息队列的选择范围中。

2)sendLatencyFauleEnable=true:启用broker故障延迟机制
代码在:MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

1)根据对消息队列轮训,获取到一个消息队列
2)验证该消息队列是否可用:latencyFaultTolerance.isAvailable(mq.getBrokerName())
如果可用,则从latencyFaultTolerance删除该topic,表示该broker已经恢复。

broker故障延迟机制的核心类:
在这里插入图片描述

Step4:消息发送

消息发送的API核心入口:DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        ...
}

1)Message msg:待发送消息
2)MessageQueue mq:将消息发送到该消息队列
3)CommunicationMode communicationMode:发送模式:同步、异步、单向
4)SendCallback sendCallback:异步回调函数
5)TopicPublishInfo topicPublishInfo:topic路由信息
6)long timeout:消息发送超时时间

接下来看发送过程:
Step1:从消息队列中获取broker网络地址
如果MQClientInstance的brokerAddrTable未缓存该地址,则调用NameServer主动更新topic的路由信息。如果更新后还是找不到Broker信息,则抛出MQClientException,提示broker不存在。

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

Step2:为消息分配全局唯一ID,如果消息体超过4K,则会对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
    topicWithNamespace = true;
}

int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

if (hasCheckForbiddenHook()) {
    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
    checkForbiddenContext.setCommunicationMode(communicationMode);
    checkForbiddenContext.setBrokerAddr(brokerAddr);
    checkForbiddenContext.setMessage(msg);
    checkForbiddenContext.setMq(mq);
    checkForbiddenContext.setUnitMode(this.isUnitMode());
    this.executeCheckForbiddenHook(checkForbiddenContext);
}

if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐