librdkafka开源库使用总结
使用C/C++语言操作Kafka时,librdkafka是首选的开源库。
·
使用C/C++语言操作Kafka时,librdkafka是首选的开源库
- 使用librdkafka创建消费者客户端时,应配置如下属性
- 消费者会话组保持活动心跳间隔
- 自动提交偏移
- 自动重置偏移
自动重置偏移有五种属性设置:
a. earliest (最早的)
b. latest (最近的)
c. largest (最大的)
d. smallest (最小的)
e. beginning (起始的)
f. end (结束的)
g. error (错误的)
#include "librdkafka/rdkafka.h"
// 声明消费者实例
rd_kafka_t *rk;
// 临时配置对象
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();
//消费者会话组保持活动心跳间隔
if (rd_kafka_conf_set(conf, "heartbeat.interval.ms", "3000", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
//自动提交偏移
if (rd_kafka_conf_set(conf, "auto.commit.enable", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
//自动重置偏移
if (rd_kafka_conf_set(conf, "auto.offset.reset", "latest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
// 创建消费者实例
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
- 使用librdkafka创建生产者客户端时,若生产频率1s/次,可以设置socket为长连接
- 在代理套接字上启用TCP保持活动
#include "librdkafka/rdkafka.h"
// 声明生产者实例
rd_kafka_t *rk;
// 临时配置对象
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();
// 设置长连接
if(rd_kafka_conf_set(conf, "socket.keepalive.enable", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(conf);
return false;
}
// 创建生产者实例
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
更多推荐
已为社区贡献3条内容
所有评论(0)