RocketMQ C++ 开发总结
RocketMQ是一个开源的分布式消息传递系统,它最初由阿里巴巴开发并开源。RocketMQ具有高可用性、高吞吐量、可伸缩性和容错性等特点,能够满足企业级应用的需求。它支持多种消息模式,如点对点、发布/订阅和批量消息等,并提供了丰富的特性,如延迟消息、顺序消息、事务消息等。RocketMQ还提供了可视化的控制台,方便用户进行监控和管理。
前言:
因工作要求需要用到c++去开发一个RocketMQ驱动程序,作为一个小白最开始也是各种找资料,但是很少能找到c++语言进行开发的,因为 RocketMQ C++官方工程源码质量并不高,存在很多bug,很多时候都编译不成功(linux环境下成功率高一些),所以一般情况下最好不要选择C++语言,但对于普通的生产和消费还是可以用的,下面是我在开发rocketmq驱动时的一些经验总结和部分代码示例,同时也加上了环境配置以及一些关于RocketMQ的一些基础知识,内容相对较多,大家根据自己的需要进行筛选查看。
目录
(1)Linux 下 rocketmq-client-cpp编译
1. 什么是RocketMQ?
RocketMQ是一个开源的分布式消息传递系统,它最初由阿里巴巴开发并开源。RocketMQ具有高可用性、高吞吐量、可伸缩性和容错性等特点,能够满足企业级应用的需求。它支持多种消息模式,如点对点、发布/订阅和批量消息等,并提供了丰富的特性,如延迟消息、顺序消息、事务消息等。RocketMQ还提供了可视化的控制台,方便用户进行监控和管理
2. 环境搭建
(1)下载
官网地址:http://rocketmq.apache.org/dowloading/releases/
(2)环境变量配置
(3)内存分配设置
大部分人的计算机是满足不了默认的内存配置的,不进行更改可能无法正常启动rocketmq相关服务
编辑...\bin\runserver.cmd文件:
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
编辑...\bin\runbroker.cmd
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
(4)RocketMQ图形化管理控制台搭建
图形化界面管理平台能够更清晰的去查看和管理相关的信息,增加工作效率
Github下载地址:https://gitcode.net/mirrors/apache/rocketmq-externals/-/archive/develop/rocketmq-externals-develop.zip
application.properties配置:
文件目录:...\rocketmq-externals-develop\rocketmq-console\src\main\resources
编译插件:
在...\rocketmq-externals-develop\rocketmq-console目录下进入cmd窗口使用 mvn clean package -Dmaven.test.skip=true 命令进行编译。编译成功后会在target目录下生成rocketmq-console-ng-1.0.0.jar等文件。
mvn clean package -Dmaven.test.skip=true
3. 服务启动
(1)启动NAMESERVER
在bin目录下,打开cmd窗口,执行 start mqnamesrv.cmd命令启动NAMESERVER,启动成功会出现弹窗,且不要关闭!
start mqnamesrv.cmd
(2)启动BROKER
同样在bin目录下运行cmd窗口,执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true命令启动BROKER(不要关闭),红色部分为你的namesrvAddr地址,根据实际情况填写!
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
(3)启动可视化管理控制台
在target 目录下打开cmd窗口执行 java -jar rocketmq-console-ng-1.0.0.jar 命令来运行插件。
java -jar rocketmq-console-ng-1.0.0.jar
注:每个人可能安装的插件版本不一样,我这里是1.0.0,根据实际情况书写正确版本号
在网页中输入你的namesryAddr+端口号(上面application.properties里配置的)打开平台
可视化平台右上角选择语言,里面能够看到各个topic的详细信息,消息轨迹,内容,往topic中发送指定内容,对于测试来说非常好用!
4. 生产者与消费者
(1)消息模式
NO.1:
同步发送(SYNC):生产者发送消息后,会阻塞等待 Broker 的响应,直到收到响应或者超时。这种模式下,消息的可靠性较高,但是发送消息的速度较慢。
NO.2:
异步发送(ASYNC):生产者发送消息后不会阻塞等待 Broker 的响应,而是通过回调函数来处理发送结果。这种模式下,消息的可靠性较高,同时发送消息的速度也比同步发送模式快。
NO.3:
单向发送(ONEWAY):生产者发送消息后不会等待 Broker 的响应,也不会通过回调函数来处理发送结果。这种模式下,消息的可靠性较低,但是发送消息的速度最快。
NO.4:
集群消费(CLUSTERING):多个消费者实例共同消费同一个 Topic 下的消息,每个消息只会被消费者组中的一个实例消费。这种模式下,消息的可靠性较高,适合于需要保证消息不重复消费的场景。
NO.5:
广播消费(BROADCASTING):多个消费者实例都会消费同一个 Topic 下的所有消息,每个消息会被所有的消费者实例都消费一次。这种模式下,消息的可靠性较低,但是适合于需要多个消费者实例同时处理消息的场景。
注:当我们创建一个消费者程序去消费某个topic中的信息时,消费者组的消息模式一定要区分到底是集群还是广播模式,在c++程序中,这个地方弄错是捕获不到异常的,显现在你面前的是各种windows模块的报错,例如ucrtbase.dll故障。
NO.6
Pull 模式:消费者通过调用 pull
方法主动向 Broker 拉取消息。在 Pull 模式下,消费者可以自由控制消息的消费进度,可以根据实际需求灵活地进行消费。但是 Pull 模式需要消费者不断地轮询 Broker,会产生一定的网络开销。
N0.7
Push 模式:Broker 将消息推送给消费者,消费者只需要等待消息的到来即可。在 Push 模式下,消费者不需要进行轮询,可以减少网络开销,同时也可以实现消息的实时推送。但是 Push 模式下,消费者无法自由控制消息的消费进度,需要按照 Broker 推送的顺序进行消费。
(2)具体实现
在实现rocketmq生产者和消费者程序时,我们可以有多种选择,以消费者为例,最常用的就是pull模式和push模式,push模式中,我们会对Broker里的消息进行一个监听,当有消息到来时,Broker就会推送给你,然后调用你写的消息处理函数进行解析。pull模式为主动向Broker拉取消息,可以指定条数,需要注意的是每个topic可能有多个读队列,拉取的时候不要漏掉!pull适用于消费能力有限的场景,不能保证实时性。
生产者:
AsyncProducer示例:
using namespace rocketmq;
std::atomic<bool> g_quit;
std::mutex g_mtx;
std::condition_variable g_finished;
SendCallback* g_callback = NULL;
TpsReportService g_tps;
class MySendCallback : public SendCallback {
virtual void onSuccess(SendResult& sendResult) {
g_msgCount--;
g_tps.Increment();
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
}
virtual void onException(MQException& e) { cout << "send Exception\n"; }
};
class MyAutoDeleteSendCallback : public AutoDeleteSendCallBack {
public:
virtual ~MyAutoDeleteSendCallback() {}
virtual void onSuccess(SendResult& sendResult) {
g_msgCount--;
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
}
virtual void onException(MQException& e) { std::cout << "send Exception" << e << "\n"; }
};
void AsyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
MQMessage msg(info->topic, // topic
"*", // tag
info->body); // body
if (info->IsAutoDeleteSendCallback) {
g_callback = new MyAutoDeleteSendCallback(); // auto delete
}
try {
producer->send(msg, g_callback);
} catch (MQException& e) {
std::cout << e << endl; // if catch excepiton , need re-send this msg by
// service
}
}
}
int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
DefaultMQProducer producer("please_rename_unique_group_name");
if (!info.IsAutoDeleteSendCallback) {
g_callback = new MySendCallback();
}
PrintRocketmqSendAndConsumerArgs(info);
if (!info.namesrv.empty())
producer.setNamesrvAddr(info.namesrv);
producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
producer.setNamesrvDomain(info.namesrv_domain);
producer.start();
g_tps.start();
std::vector<std::shared_ptr<std::thread>> work_pool;
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
for (int j = 0; j < info.thread_count; j++) {
std::shared_ptr<std::thread> th = std::make_shared<std::thread>(AsyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}
{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
g_quit.store(true);
}
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";
producer.shutdown();
for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
}
if (!info.IsAutoDeleteSendCallback) {
delete g_callback;
}
return 0;
}
BatchProducer示例:
using namespace rocketmq;
using namespace std;
std::atomic<bool> g_quit;
std::mutex g_mtx;
std::condition_variable g_finished;
TpsReportService g_tps;
void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
break;
}
vector<MQMessage> msgs;
MQMessage msg1(info->topic, "*", info->body);
msg1.setProperty("property1", "value1");
MQMessage msg2(info->topic, "*", info->body);
msg2.setProperty("property1", "value1");
msg2.setProperty("property2", "value2");
MQMessage msg3(info->topic, "*", info->body);
msg3.setProperty("property1", "value1");
msg3.setProperty("property2", "value2");
msg3.setProperty("property3", "value3");
msgs.push_back(msg1);
msgs.push_back(msg2);
msgs.push_back(msg3);
try {
auto start = std::chrono::system_clock::now();
SendResult sendResult = producer->send(msgs);
g_tps.Increment();
--g_msgCount;
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (duration.count() >= 500) {
std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl;
}
} catch (const MQException& e) {
std::cout << "send failed: " << e.what() << std::endl;
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
return;
}
}
}
int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
DefaultMQProducer producer("please_rename_unique_group_name");
producer.setNamesrvAddr(info.namesrv);
producer.setNamesrvDomain(info.namesrv_domain);
producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
producer.setSendMsgTimeout(500);
producer.setTcpTransportTryLockTimeout(1000);
producer.setTcpTransportConnectTimeout(400);
producer.start();
std::vector<std::shared_ptr<std::thread>> work_pool;
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
g_tps.start();
int threadCount = info.thread_count;
for (int j = 0; j < threadCount; j++) {
std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}
{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
g_quit.store(true);
}
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";
for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
}
producer.shutdown();
return 0;
}
消费者:
DefaultMQPushConsumer 和 DefaultMQProducer 示例:
std::mutex g_mtx;
std::condition_variable g_finished;
TpsReportService g_tps;
using namespace rocketmq;
class MyMsgListener : public MessageListenerConcurrently {
public:
MyMsgListener() {}
virtual ~MyMsgListener() {}
virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) {
g_msgCount.store(g_msgCount.load() - msgs.size());
for (size_t i = 0; i < msgs.size(); ++i) {
g_tps.Increment();
// cout << "msg body: "<< msgs[i].getBody() << endl;
}
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
DefaultMQPushConsumer consumer("please_rename_unique_group_name");
DefaultMQProducer producer("please_rename_unique_group_name");
producer.setSessionCredentials("AccessKey", "SecretKey", "ALIYUN");
producer.setTcpTransportTryLockTimeout(1000);
producer.setTcpTransportConnectTimeout(400);
producer.setNamesrvDomain(info.namesrv_domain);
producer.setNamesrvAddr(info.namesrv);
producer.setGroupName("msg-persist-group_producer_sandbox");
producer.start();
consumer.setNamesrvAddr(info.namesrv);
consumer.setGroupName(info.groupname);
consumer.setSessionCredentials("AccessKey", "SecretKey", "ALIYUN");
consumer.setConsumeThreadCount(info.thread_count);
consumer.setNamesrvDomain(info.namesrv_domain);
consumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);
if (info.syncpush)
consumer.setAsyncPull(false); // set sync pull
if (info.broadcasting) {
consumer.setMessageModel(rocketmq::BROADCASTING);
}
consumer.setInstanceName(info.groupname);
consumer.subscribe(info.topic, "*");
consumer.setConsumeThreadCount(15);
consumer.setTcpTransportTryLockTimeout(1000);
consumer.setTcpTransportConnectTimeout(400);
MyMsgListener msglistener;
consumer.registerMessageListener(&msglistener);
try {
consumer.start();
} catch (MQClientException& e) {
cout << e << endl;
}
g_tps.start();
int msgcount = g_msgCount.load();
for (int i = 0; i < msgcount; ++i) {
MQMessage msg(info.topic, // topic
"*", // tag
info.body); // body
// std::this_thread::sleep_for(std::chrono::seconds(100000));
try {
producer.send(msg);
} catch (MQException& e) {
std::cout << e << endl; // if catch excepiton , need re-send this msg by
// service
}
}
{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
}
producer.shutdown();
consumer.shutdown();
return 0;
}
CPushConsumer示例:
void thread_sleep(unsigned milliseconds) {
#ifdef _WIN32
Sleep(milliseconds);
#else
usleep(milliseconds * 1000); // takes microseconds
#endif
}
int doConsumeMessage(struct CPushConsumer* consumer, CMessageExt* msgExt) {
printf("Hello,doConsumeMessage by Application!\n");
printf("Msg Topic:%s\n", GetMessageTopic(msgExt));
printf("Msg Tags:%s\n", GetMessageTags(msgExt));
printf("Msg Keys:%s\n", GetMessageKeys(msgExt));
printf("Msg Body:%s\n", GetMessageBody(msgExt));
return E_CONSUME_SUCCESS;
}
int main(int argc, char* argv[]) {
int i = 0;
printf("PushConsumer Initializing....\n");
CPushConsumer* consumer = CreatePushConsumer("Group_Consumer_Test");
SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876");
Subscribe(consumer, "T_TestTopic", "*");
RegisterMessageCallback(consumer, doConsumeMessage);
StartPushConsumer(consumer);
printf("Push Consumer Start...\n");
for (i = 0; i < 10; i++) {
printf("Now Running : %d S\n", i * 10);
thread_sleep(10000);
}
ShutdownPushConsumer(consumer);
DestroyPushConsumer(consumer);
printf("PushConsumer Shutdown!\n");
return 0;
}
CPullConsumer示例:
void thread_sleep(unsigned milliseconds) {
#ifdef _WIN32
Sleep(milliseconds);
#else
usleep(milliseconds * 1000); // takes microseconds
#endif
}
int main(int argc, char* argv[]) {
int i = 0, j = 0;
int ret = 0;
int size = 0;
CMessageQueue* mqs = NULL;
printf("PullConsumer Initializing....\n");
CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
StartPullConsumer(consumer);
printf("Pull Consumer Start...\n");
for (i = 1; i <= 5; i++) {
printf("FetchSubscriptionMessageQueues : %d times\n", i);
ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
if (ret != OK) {
printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
}
printf("Get MQ Size:%d\n", size);
for (j = 0; j < size; j++) {
int noNewMsg = 0;
long long tmpoffset = 0;
printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
do {
int k = 0;
CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
tmpoffset = pullResult.nextBeginOffset;
}
printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", pullResult.pullStatus,
pullResult.maxOffset, pullResult.minOffset, pullResult.nextBeginOffset);
switch (pullResult.pullStatus) {
case E_FOUND:
printf("Get Message Size:%d\n", pullResult.size);
for (k = 0; k < pullResult.size; ++k) {
printf("Got Message ID:%s,Body:%s\n", GetMessageId(pullResult.msgFoundList[k]),
GetMessageBody(pullResult.msgFoundList[k]));
}
break;
case E_NO_MATCHED_MSG:
noNewMsg = 1;
break;
default:
noNewMsg = 0;
}
ReleasePullResult(pullResult);
thread_sleep(100);
} while (noNewMsg == 0);
thread_sleep(1000);
}
thread_sleep(2000);
ReleaseSubscriptionMessageQueue(mqs);
}
thread_sleep(5000);
ShutdownPullConsumer(consumer);
DestroyPullConsumer(consumer);
printf("PullConsumer Shutdown!\n");
return 0;
}
DefaultMQPullConsumer示例:
using namespace rocketmq;
std::map<MQMessageQueue, uint64_t> g_offseTable;
void putMessageQueueOffset(MQMessageQueue mq, uint64_t offset) {
g_offseTable[mq] = offset;
}
uint64_t getMessageQueueOffset(MQMessageQueue mq) {
map<MQMessageQueue, uint64_t>::iterator it = g_offseTable.find(mq);
if (it != g_offseTable.end()) {
return it->second;
}
return 0;
}
int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
DefaultMQPullConsumer consumer("please_rename_unique_group_name");
consumer.setNamesrvAddr(info.namesrv);
consumer.setNamesrvDomain(info.namesrv_domain);
consumer.setGroupName(info.groupname);
consumer.setInstanceName(info.groupname);
consumer.registerMessageQueueListener(info.topic, NULL);
consumer.start();
std::vector<MQMessageQueue> mqs;
try {
consumer.fetchSubscribeMessageQueues(info.topic, mqs);
auto iter = mqs.begin();
for (; iter != mqs.end(); ++iter) {
std::cout << "mq:" << (*iter).toString() << endl;
}
} catch (MQException& e) {
std::cout << e << endl;
}
auto start = std::chrono::system_clock::now();
auto iter = mqs.begin();
for (; iter != mqs.end(); ++iter) {
MQMessageQueue mq = (*iter);
// if cluster model
// putMessageQueueOffset(mq, g_consumer.fetchConsumeOffset(mq,true));
// if broadcast model
// putMessageQueueOffset(mq, your last consume offset);
bool noNewMsg = false;
do {
try {
PullResult result = consumer.pull(mq, "*", getMessageQueueOffset(mq), 32);
g_msgCount += result.msgFoundList.size();
std::cout << result.msgFoundList.size() << std::endl;
// if pull request timeout or received NULL response, pullStatus will be
// setted to BROKER_TIMEOUT,
// And nextBeginOffset/minOffset/MaxOffset will be setted to 0
if (result.pullStatus != BROKER_TIMEOUT) {
putMessageQueueOffset(mq, result.nextBeginOffset);
PrintPullResult(&result);
} else {
cout << "broker timeout occur" << endl;
}
switch (result.pullStatus) {
case FOUND:
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
case BROKER_TIMEOUT:
break;
case NO_NEW_MSG:
noNewMsg = true;
break;
default:
break;
}
} catch (MQClientException& e) {
std::cout << e << std::endl;
}
} while (!noNewMsg);
}
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "msg count: " << g_msgCount.load() << "\n";
std::cout << "per msg time: " << duration.count() / (double)g_msgCount.load() << "ms \n"
<< "========================finished==============================\n";
consumer.shutdown();
return 0;
}
5. 遇到的问题和解决方案
因为是第一次做有关rocketmq相关的工作,所以过程中也遇到了不少问题,主要都是由于自己对rocketmq的不了解导致的。
(1)Linux 下 rocketmq-client-cpp编译
在编译linux环境下的工程文件时,选择比较新的版本遇到了很多问题,windows环境下更糟糕,就像我开头说的,官方对于c++版本不是很重视,有很多不合理的地方,建议选择比较老的版本,且编译途中经常中途报错,但你再重新编译一次可能就能通过了。
(2)消费者消费失败
因为我主要实现的是消费者程序,主要谈谈消费者相关的注意事项,首先是连接上的问题,保证namesrvAddr,topic,groupid要与实际的要匹配,考虑有没用安全验证方式,目前主要采用的是阿里云的安全验证方式,默认采用的也是这种方式。有些场景是要自己创建groupid来与其他人进行区别,但要注意的是每个topic下的消费者组消费模式尽量保持一致!有些场景gruopid又是固定死的,可能指定了某一个消费者组,这个时候自己创建一个新的可能就不会成功。还有就是注意消息的模式是集群模式还是广播模式,这个地方默认是集群模式,但如果选错了模式,程序会异常但你无论怎么try-catch都捕获不到异常而是出现一些莫名奇怪的错误,有些时候他也能消费到消息,但很快就会异常,容易造成一种连接上没有问题,问题出在其他地方的错觉,我就在此浪费了大量的时间。
(3)pull消费模式
如果你采用了pull消费者模式主动去拉取信息,那么你得注意topic下有多少个读队列的情况,我最开始实现的时候采取了下面这种方式:
CMessageQueue* mqs = NULL;
ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
tmpoffset = pullResult.nextBeginOffset;
}
这样做有个不好的地方就是假如topic中有4个读队列的时候,我们只能消费到默认的编号为0的那个队列,而消息是可能被分配到任何一个队列中的,这样就会漏掉很多内容。解决办法就是多创几个消费者对应不同的队列!
谢谢观看!
更多推荐
所有评论(0)