使用C/C++语言操作Kafka时,librdkafka是首选的开源库

  • 使用librdkafka创建消费者客户端时,应配置如下属性
  1. 消费者会话组保持活动心跳间隔
  2. 自动提交偏移
  3. 自动重置偏移
    自动重置偏移有五种属性设置:
    a. earliest (最早的)
    b. latest (最近的)
    c. largest (最大的)
    d. smallest (最小的)
    e. beginning (起始的)
    f. end (结束的)
    g. error (错误的)
#include "librdkafka/rdkafka.h"

// 声明消费者实例
rd_kafka_t *rk; 
// 临时配置对象
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();

//消费者会话组保持活动心跳间隔
if (rd_kafka_conf_set(conf, "heartbeat.interval.ms", "3000", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
	fprintf(stderr, "%s\n", errstr);
	rd_kafka_conf_destroy(conf);
	return 1;
}

//自动提交偏移
if (rd_kafka_conf_set(conf, "auto.commit.enable", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
	fprintf(stderr, "%s\n", errstr);
	rd_kafka_conf_destroy(conf);
	return 1;
}

//自动重置偏移
if (rd_kafka_conf_set(conf, "auto.offset.reset", "latest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
	fprintf(stderr, "%s\n", errstr);
	rd_kafka_conf_destroy(conf);
	return 1;
}

// 创建消费者实例
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

  • 使用librdkafka创建生产者客户端时,若生产频率1s/次,可以设置socket为长连接
  1. 在代理套接字上启用TCP保持活动
#include "librdkafka/rdkafka.h"

// 声明生产者实例
rd_kafka_t *rk; 
// 临时配置对象
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();

// 设置长连接
if(rd_kafka_conf_set(conf, "socket.keepalive.enable", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
	rd_kafka_conf_destroy(conf);
	return false;
}

// 创建生产者实例
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐