前言:

       因工作要求需要用到c++去开发一个RocketMQ驱动程序,作为一个小白最开始也是各种找资料,但是很少能找到c++语言进行开发的,因为 RocketMQ C++官方工程源码质量并不高,存在很多bug,很多时候都编译不成功(linux环境下成功率高一些),所以一般情况下最好不要选择C++语言,但对于普通的生产和消费还是可以用的,下面是我在开发rocketmq驱动时的一些经验总结和部分代码示例,同时也加上了环境配置以及一些关于RocketMQ的一些基础知识,内容相对较多,大家根据自己的需要进行筛选查看。

目录

1. 什么是RocketMQ?

2. 环境搭建

(1)下载

(2)环境变量配置

(3)内存分配设置

(4)RocketMQ图形化管理控制台搭建

3. 服务启动

(1)启动NAMESERVER

(2)启动BROKER

(3)启动可视化管理控制台

4. 生产者与消费者

(1)消息模式      

(2)具体实现

5. 遇到的问题和解决方案

(1)Linux 下 rocketmq-client-cpp编译

(2)消费者消费失败


1. 什么是RocketMQ?

        RocketMQ是一个开源的分布式消息传递系统,它最初由阿里巴巴开发并开源。RocketMQ具有高可用性、高吞吐量、可伸缩性和容错性等特点,能够满足企业级应用的需求。它支持多种消息模式,如点对点、发布/订阅和批量消息等,并提供了丰富的特性,如延迟消息、顺序消息、事务消息等。RocketMQ还提供了可视化的控制台,方便用户进行监控和管理

2. 环境搭建

(1)下载

官网地址:http://rocketmq.apache.org/dowloading/releases/ 

 

(2)环境变量配置

 

(3)内存分配设置

大部分人的计算机是满足不了默认的内存配置的,不进行更改可能无法正常启动rocketmq相关服务

编辑...\bin\runserver.cmd文件:

rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"

编辑...\bin\runbroker.cmd

rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"

(4)RocketMQ图形化管理控制台搭建

图形化界面管理平台能够更清晰的去查看和管理相关的信息,增加工作效率

Github下载地址https://gitcode.net/mirrors/apache/rocketmq-externals/-/archive/develop/rocketmq-externals-develop.zip

application.properties配置:

文件目录:...\rocketmq-externals-develop\rocketmq-console\src\main\resources

编译插件:

        在...\rocketmq-externals-develop\rocketmq-console目录下进入cmd窗口使用 mvn clean package -Dmaven.test.skip=true 命令进行编译。编译成功后会在target目录下生成rocketmq-console-ng-1.0.0.jar等文件。

mvn clean package -Dmaven.test.skip=true 

3. 服务启动

(1)启动NAMESERVER

        在bin目录下,打开cmd窗口,执行 start mqnamesrv.cmd命令启动NAMESERVER,启动成功会出现弹窗,且不要关闭!

start mqnamesrv.cmd

(2)启动BROKER

        同样在bin目录下运行cmd窗口,执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true命令启动BROKER(不要关闭),红色部分为你的namesrvAddr地址,根据实际情况填写!

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

(3)启动可视化管理控制台

        在target 目录下打开cmd窗口执行 java -jar rocketmq-console-ng-1.0.0.jar 命令来运行插件。

 java -jar rocketmq-console-ng-1.0.0.jar

注:每个人可能安装的插件版本不一样,我这里是1.0.0,根据实际情况书写正确版本号

在网页中输入你的namesryAddr+端口号(上面application.properties里配置的)打开平台

可视化平台右上角选择语言,里面能够看到各个topic的详细信息,消息轨迹,内容,往topic中发送指定内容,对于测试来说非常好用!

4. 生产者与消费者

(1)消息模式      

NO.1:

同步发送(SYNC):生产者发送消息后,会阻塞等待 Broker 的响应,直到收到响应或者超时。这种模式下,消息的可靠性较高,但是发送消息的速度较慢。

NO.2:

异步发送(ASYNC):生产者发送消息后不会阻塞等待 Broker 的响应,而是通过回调函数来处理发送结果。这种模式下,消息的可靠性较高,同时发送消息的速度也比同步发送模式快。

NO.3:

单向发送(ONEWAY):生产者发送消息后不会等待 Broker 的响应,也不会通过回调函数来处理发送结果。这种模式下,消息的可靠性较低,但是发送消息的速度最快。

NO.4:

集群消费(CLUSTERING):多个消费者实例共同消费同一个 Topic 下的消息,每个消息只会被消费者组中的一个实例消费。这种模式下,消息的可靠性较高,适合于需要保证消息不重复消费的场景。

NO.5:

广播消费(BROADCASTING):多个消费者实例都会消费同一个 Topic 下的所有消息,每个消息会被所有的消费者实例都消费一次。这种模式下,消息的可靠性较低,但是适合于需要多个消费者实例同时处理消息的场景。

注:当我们创建一个消费者程序去消费某个topic中的信息时,消费者组的消息模式一定要区分到底是集群还是广播模式,在c++程序中,这个地方弄错是捕获不到异常的,显现在你面前的是各种windows模块的报错,例如ucrtbase.dll故障。

NO.6

Pull 模式:消费者通过调用 pull 方法主动向 Broker 拉取消息。在 Pull 模式下,消费者可以自由控制消息的消费进度,可以根据实际需求灵活地进行消费。但是 Pull 模式需要消费者不断地轮询 Broker,会产生一定的网络开销。

N0.7

Push 模式:Broker 将消息推送给消费者,消费者只需要等待消息的到来即可。在 Push 模式下,消费者不需要进行轮询,可以减少网络开销,同时也可以实现消息的实时推送。但是 Push 模式下,消费者无法自由控制消息的消费进度,需要按照 Broker 推送的顺序进行消费。

(2)具体实现

        在实现rocketmq生产者和消费者程序时,我们可以有多种选择,以消费者为例,最常用的就是pull模式和push模式,push模式中,我们会对Broker里的消息进行一个监听,当有消息到来时,Broker就会推送给你,然后调用你写的消息处理函数进行解析。pull模式为主动向Broker拉取消息,可以指定条数,需要注意的是每个topic可能有多个读队列,拉取的时候不要漏掉!pull适用于消费能力有限的场景,不能保证实时性。

生产者:

AsyncProducer示例:

using namespace rocketmq;

std::atomic<bool> g_quit;
std::mutex g_mtx;
std::condition_variable g_finished;
SendCallback* g_callback = NULL;
TpsReportService g_tps;

class MySendCallback : public SendCallback {
  virtual void onSuccess(SendResult& sendResult) {
    g_msgCount--;
    g_tps.Increment();
    if (g_msgCount.load() <= 0) {
      std::unique_lock<std::mutex> lck(g_mtx);
      g_finished.notify_one();
    }
  }
  virtual void onException(MQException& e) { cout << "send Exception\n"; }
};

class MyAutoDeleteSendCallback : public AutoDeleteSendCallBack {
 public:
  virtual ~MyAutoDeleteSendCallback() {}
  virtual void onSuccess(SendResult& sendResult) {
    g_msgCount--;
    if (g_msgCount.load() <= 0) {
      std::unique_lock<std::mutex> lck(g_mtx);
      g_finished.notify_one();
    }
  }
  virtual void onException(MQException& e) { std::cout << "send Exception" << e << "\n"; }
};

void AsyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
  while (!g_quit.load()) {
    if (g_msgCount.load() <= 0) {
      std::unique_lock<std::mutex> lck(g_mtx);
      g_finished.notify_one();
    }
    MQMessage msg(info->topic,  // topic
                  "*",          // tag
                  info->body);  // body

    if (info->IsAutoDeleteSendCallback) {
      g_callback = new MyAutoDeleteSendCallback();  // auto delete
    }

    try {
      producer->send(msg, g_callback);
    } catch (MQException& e) {
      std::cout << e << endl;  // if catch excepiton , need re-send this msg by
                               // service
    }
  }
}

int main(int argc, char* argv[]) {
  RocketmqSendAndConsumerArgs info;
  if (!ParseArgs(argc, argv, &info)) {
    exit(-1);
  }

  DefaultMQProducer producer("please_rename_unique_group_name");
  if (!info.IsAutoDeleteSendCallback) {
    g_callback = new MySendCallback();
  }

  PrintRocketmqSendAndConsumerArgs(info);

  if (!info.namesrv.empty())
    producer.setNamesrvAddr(info.namesrv);

  producer.setGroupName(info.groupname);
  producer.setInstanceName(info.groupname);
  producer.setNamesrvDomain(info.namesrv_domain);
  producer.start();
  g_tps.start();
  std::vector<std::shared_ptr<std::thread>> work_pool;
  auto start = std::chrono::system_clock::now();
  int msgcount = g_msgCount.load();
  for (int j = 0; j < info.thread_count; j++) {
    std::shared_ptr<std::thread> th = std::make_shared<std::thread>(AsyncProducerWorker, &info, &producer);
    work_pool.push_back(th);
  }

  {
    std::unique_lock<std::mutex> lck(g_mtx);
    g_finished.wait(lck);
    g_quit.store(true);
  }

  auto end = std::chrono::system_clock::now();
  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

  std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
            << "========================finished==============================\n";

  producer.shutdown();
  for (size_t th = 0; th != work_pool.size(); ++th) {
    work_pool[th]->join();
  }
  if (!info.IsAutoDeleteSendCallback) {
    delete g_callback;
  }
  return 0;
}

BatchProducer示例:

using namespace rocketmq;
using namespace std;
std::atomic<bool> g_quit;
std::mutex g_mtx;
std::condition_variable g_finished;
TpsReportService g_tps;

void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
  while (!g_quit.load()) {
    if (g_msgCount.load() <= 0) {
      std::unique_lock<std::mutex> lck(g_mtx);
      g_finished.notify_one();
      break;
    }

    vector<MQMessage> msgs;
    MQMessage msg1(info->topic, "*", info->body);
    msg1.setProperty("property1", "value1");
    MQMessage msg2(info->topic, "*", info->body);
    msg2.setProperty("property1", "value1");
    msg2.setProperty("property2", "value2");
    MQMessage msg3(info->topic, "*", info->body);
    msg3.setProperty("property1", "value1");
    msg3.setProperty("property2", "value2");
    msg3.setProperty("property3", "value3");
    msgs.push_back(msg1);
    msgs.push_back(msg2);
    msgs.push_back(msg3);
    try {
      auto start = std::chrono::system_clock::now();
      SendResult sendResult = producer->send(msgs);
      g_tps.Increment();
      --g_msgCount;
      auto end = std::chrono::system_clock::now();
      auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
      if (duration.count() >= 500) {
        std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl;
      }
    } catch (const MQException& e) {
      std::cout << "send failed: " << e.what() << std::endl;
      std::unique_lock<std::mutex> lck(g_mtx);
      g_finished.notify_one();
      return;
    }
  }
}

int main(int argc, char* argv[]) {
  RocketmqSendAndConsumerArgs info;
  if (!ParseArgs(argc, argv, &info)) {
    exit(-1);
  }
  PrintRocketmqSendAndConsumerArgs(info);
  DefaultMQProducer producer("please_rename_unique_group_name");
  producer.setNamesrvAddr(info.namesrv);
  producer.setNamesrvDomain(info.namesrv_domain);
  producer.setGroupName(info.groupname);
  producer.setInstanceName(info.groupname);
  producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
  producer.setSendMsgTimeout(500);
  producer.setTcpTransportTryLockTimeout(1000);
  producer.setTcpTransportConnectTimeout(400);

  producer.start();
  std::vector<std::shared_ptr<std::thread>> work_pool;
  auto start = std::chrono::system_clock::now();
  int msgcount = g_msgCount.load();
  g_tps.start();

  int threadCount = info.thread_count;
  for (int j = 0; j < threadCount; j++) {
    std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
    work_pool.push_back(th);
  }

  {
    std::unique_lock<std::mutex> lck(g_mtx);
    g_finished.wait(lck);
    g_quit.store(true);
  }

  auto end = std::chrono::system_clock::now();
  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

  std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
            << "========================finished==============================\n";

  for (size_t th = 0; th != work_pool.size(); ++th) {
    work_pool[th]->join();
  }

  producer.shutdown();

  return 0;
}

消费者:

DefaultMQPushConsumer 和 DefaultMQProducer 示例:

std::mutex g_mtx;
std::condition_variable g_finished;
TpsReportService g_tps;

using namespace rocketmq;

class MyMsgListener : public MessageListenerConcurrently {
 public:
  MyMsgListener() {}
  virtual ~MyMsgListener() {}

  virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt>& msgs) {
    g_msgCount.store(g_msgCount.load() - msgs.size());
    for (size_t i = 0; i < msgs.size(); ++i) {
      g_tps.Increment();
      // cout << "msg body: "<<  msgs[i].getBody() << endl;
    }

    if (g_msgCount.load() <= 0) {
      std::unique_lock<std::mutex> lck(g_mtx);
      g_finished.notify_one();
    }
    return CONSUME_SUCCESS;
  }
};

int main(int argc, char* argv[]) {
  RocketmqSendAndConsumerArgs info;
  if (!ParseArgs(argc, argv, &info)) {
    exit(-1);
  }
  PrintRocketmqSendAndConsumerArgs(info);
  DefaultMQPushConsumer consumer("please_rename_unique_group_name");
  DefaultMQProducer producer("please_rename_unique_group_name");
  producer.setSessionCredentials("AccessKey", "SecretKey", "ALIYUN");
  producer.setTcpTransportTryLockTimeout(1000);
  producer.setTcpTransportConnectTimeout(400);
  producer.setNamesrvDomain(info.namesrv_domain);
  producer.setNamesrvAddr(info.namesrv);
  producer.setGroupName("msg-persist-group_producer_sandbox");
  producer.start();

  consumer.setNamesrvAddr(info.namesrv);
  consumer.setGroupName(info.groupname);
  consumer.setSessionCredentials("AccessKey", "SecretKey", "ALIYUN");
  consumer.setConsumeThreadCount(info.thread_count);
  consumer.setNamesrvDomain(info.namesrv_domain);
  consumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);

  if (info.syncpush)
    consumer.setAsyncPull(false);  // set sync pull
  if (info.broadcasting) {
    consumer.setMessageModel(rocketmq::BROADCASTING);
  }

  consumer.setInstanceName(info.groupname);

  consumer.subscribe(info.topic, "*");
  consumer.setConsumeThreadCount(15);
  consumer.setTcpTransportTryLockTimeout(1000);
  consumer.setTcpTransportConnectTimeout(400);

  MyMsgListener msglistener;
  consumer.registerMessageListener(&msglistener);

  try {
    consumer.start();
  } catch (MQClientException& e) {
    cout << e << endl;
  }
  g_tps.start();

  int msgcount = g_msgCount.load();
  for (int i = 0; i < msgcount; ++i) {
    MQMessage msg(info.topic,  // topic
                  "*",         // tag
                  info.body);  // body

    //    std::this_thread::sleep_for(std::chrono::seconds(100000));
    try {
      producer.send(msg);
    } catch (MQException& e) {
      std::cout << e << endl;  // if catch excepiton , need re-send this msg by
                               // service
    }
  }

  {
    std::unique_lock<std::mutex> lck(g_mtx);
    g_finished.wait(lck);
  }
  producer.shutdown();
  consumer.shutdown();
  return 0;
}

CPushConsumer示例:

void thread_sleep(unsigned milliseconds) {
#ifdef _WIN32
  Sleep(milliseconds);
#else
  usleep(milliseconds * 1000);  // takes microseconds
#endif
}

int doConsumeMessage(struct CPushConsumer* consumer, CMessageExt* msgExt) {
  printf("Hello,doConsumeMessage by Application!\n");
  printf("Msg Topic:%s\n", GetMessageTopic(msgExt));
  printf("Msg Tags:%s\n", GetMessageTags(msgExt));
  printf("Msg Keys:%s\n", GetMessageKeys(msgExt));
  printf("Msg Body:%s\n", GetMessageBody(msgExt));
  return E_CONSUME_SUCCESS;
}

int main(int argc, char* argv[]) {
  int i = 0;
  printf("PushConsumer Initializing....\n");
  CPushConsumer* consumer = CreatePushConsumer("Group_Consumer_Test");
  SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876");
  Subscribe(consumer, "T_TestTopic", "*");
  RegisterMessageCallback(consumer, doConsumeMessage);
  StartPushConsumer(consumer);
  printf("Push Consumer Start...\n");
  for (i = 0; i < 10; i++) {
    printf("Now Running : %d S\n", i * 10);
    thread_sleep(10000);
  }
  ShutdownPushConsumer(consumer);
  DestroyPushConsumer(consumer);
  printf("PushConsumer Shutdown!\n");
  return 0;
}

CPullConsumer示例:

void thread_sleep(unsigned milliseconds) {
#ifdef _WIN32
  Sleep(milliseconds);
#else
  usleep(milliseconds * 1000);  // takes microseconds
#endif
}

int main(int argc, char* argv[]) {
  int i = 0, j = 0;
  int ret = 0;
  int size = 0;
  CMessageQueue* mqs = NULL;
  printf("PullConsumer Initializing....\n");
  CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
  SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
  StartPullConsumer(consumer);
  printf("Pull Consumer Start...\n");
  for (i = 1; i <= 5; i++) {
    printf("FetchSubscriptionMessageQueues : %d times\n", i);
    ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
    if (ret != OK) {
      printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
    }
    printf("Get MQ Size:%d\n", size);
    for (j = 0; j < size; j++) {
      int noNewMsg = 0;
      long long tmpoffset = 0;
      printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
      do {
        int k = 0;
        CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
        if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
          tmpoffset = pullResult.nextBeginOffset;
        }
        printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", pullResult.pullStatus,
               pullResult.maxOffset, pullResult.minOffset, pullResult.nextBeginOffset);
        switch (pullResult.pullStatus) {
          case E_FOUND:
            printf("Get Message Size:%d\n", pullResult.size);
            for (k = 0; k < pullResult.size; ++k) {
              printf("Got Message ID:%s,Body:%s\n", GetMessageId(pullResult.msgFoundList[k]),
                     GetMessageBody(pullResult.msgFoundList[k]));
            }
            break;
          case E_NO_MATCHED_MSG:
            noNewMsg = 1;
            break;
          default:
            noNewMsg = 0;
        }
        ReleasePullResult(pullResult);
        thread_sleep(100);
      } while (noNewMsg == 0);
      thread_sleep(1000);
    }
    thread_sleep(2000);
    ReleaseSubscriptionMessageQueue(mqs);
  }
  thread_sleep(5000);
  ShutdownPullConsumer(consumer);
  DestroyPullConsumer(consumer);
  printf("PullConsumer Shutdown!\n");
  return 0;
}

DefaultMQPullConsumer示例:

using namespace rocketmq;

std::map<MQMessageQueue, uint64_t> g_offseTable;

void putMessageQueueOffset(MQMessageQueue mq, uint64_t offset) {
  g_offseTable[mq] = offset;
}

uint64_t getMessageQueueOffset(MQMessageQueue mq) {
  map<MQMessageQueue, uint64_t>::iterator it = g_offseTable.find(mq);
  if (it != g_offseTable.end()) {
    return it->second;
  }
  return 0;
}

int main(int argc, char* argv[]) {
  RocketmqSendAndConsumerArgs info;
  if (!ParseArgs(argc, argv, &info)) {
    exit(-1);
  }
  PrintRocketmqSendAndConsumerArgs(info);
  DefaultMQPullConsumer consumer("please_rename_unique_group_name");
  consumer.setNamesrvAddr(info.namesrv);
  consumer.setNamesrvDomain(info.namesrv_domain);
  consumer.setGroupName(info.groupname);
  consumer.setInstanceName(info.groupname);
  consumer.registerMessageQueueListener(info.topic, NULL);
  consumer.start();
  std::vector<MQMessageQueue> mqs;

  try {
    consumer.fetchSubscribeMessageQueues(info.topic, mqs);
    auto iter = mqs.begin();
    for (; iter != mqs.end(); ++iter) {
      std::cout << "mq:" << (*iter).toString() << endl;
    }
  } catch (MQException& e) {
    std::cout << e << endl;
  }

  auto start = std::chrono::system_clock::now();
  auto iter = mqs.begin();
  for (; iter != mqs.end(); ++iter) {
    MQMessageQueue mq = (*iter);
    // if cluster model
    // putMessageQueueOffset(mq, g_consumer.fetchConsumeOffset(mq,true));
    // if broadcast model
    // putMessageQueueOffset(mq, your last consume offset);

    bool noNewMsg = false;
    do {
      try {
        PullResult result = consumer.pull(mq, "*", getMessageQueueOffset(mq), 32);
        g_msgCount += result.msgFoundList.size();
        std::cout << result.msgFoundList.size() << std::endl;
        // if pull request timeout or received NULL response, pullStatus will be
        // setted to BROKER_TIMEOUT,
        // And nextBeginOffset/minOffset/MaxOffset will be setted to 0
        if (result.pullStatus != BROKER_TIMEOUT) {
          putMessageQueueOffset(mq, result.nextBeginOffset);
          PrintPullResult(&result);
        } else {
          cout << "broker timeout occur" << endl;
        }
        switch (result.pullStatus) {
          case FOUND:
          case NO_MATCHED_MSG:
          case OFFSET_ILLEGAL:
          case BROKER_TIMEOUT:
            break;
          case NO_NEW_MSG:
            noNewMsg = true;
            break;
          default:
            break;
        }
      } catch (MQClientException& e) {
        std::cout << e << std::endl;
      }
    } while (!noNewMsg);
  }

  auto end = std::chrono::system_clock::now();
  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

  std::cout << "msg count: " << g_msgCount.load() << "\n";
  std::cout << "per msg time: " << duration.count() / (double)g_msgCount.load() << "ms \n"
            << "========================finished==============================\n";

  consumer.shutdown();
  return 0;
}

5. 遇到的问题和解决方案

        因为是第一次做有关rocketmq相关的工作,所以过程中也遇到了不少问题,主要都是由于自己对rocketmq的不了解导致的。

(1)Linux 下 rocketmq-client-cpp编译

        在编译linux环境下的工程文件时,选择比较新的版本遇到了很多问题,windows环境下更糟糕,就像我开头说的,官方对于c++版本不是很重视,有很多不合理的地方,建议选择比较老的版本,且编译途中经常中途报错,但你再重新编译一次可能就能通过了。

(2)消费者消费失败

        因为我主要实现的是消费者程序,主要谈谈消费者相关的注意事项,首先是连接上的问题,保证namesrvAddr,topic,groupid要与实际的要匹配,考虑有没用安全验证方式,目前主要采用的是阿里云的安全验证方式,默认采用的也是这种方式。有些场景是要自己创建groupid来与其他人进行区别,但要注意的是每个topic下的消费者组消费模式尽量保持一致!有些场景gruopid又是固定死的,可能指定了某一个消费者组,这个时候自己创建一个新的可能就不会成功。还有就是注意消息的模式是集群模式还是广播模式,这个地方默认是集群模式,但如果选错了模式,程序会异常但你无论怎么try-catch都捕获不到异常而是出现一些莫名奇怪的错误,有些时候他也能消费到消息,但很快就会异常,容易造成一种连接上没有问题,问题出在其他地方的错觉,我就在此浪费了大量的时间。

(3)pull消费模式

        如果你采用了pull消费者模式主动去拉取信息,那么你得注意topic下有多少个读队列的情况,我最开始实现的时候采取了下面这种方式:

CMessageQueue* mqs = NULL;
ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
          tmpoffset = pullResult.nextBeginOffset;
        }

        这样做有个不好的地方就是假如topic中有4个读队列的时候,我们只能消费到默认的编号为0的那个队列,而消息是可能被分配到任何一个队列中的,这样就会漏掉很多内容。解决办法就是多创几个消费者对应不同的队列!

                                       谢谢观看!

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐