• 自动提交offset

     以下实例代码展示了如何自动提交topic的offset:

public void autoOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "binghe100:9092");
    props.put("group.id", "binghe");
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("name1", "name2"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            logger.info("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
        }
    }
}

Properties的实例props中存放的key意义:

     1)bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;

     2)group.id表示Kafka消费者组的唯一标识;

     2)enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;

     3)其中name1和name2为要消费的topic名称,由group.id为binghe作为consumer group统一进行管理;

     4)key.deserializer和value.deserializer表示指定将字节序列化为对象。

  • 手动提交offset

      生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。

      以下实例代码展示了如何手动提交topic的offset:

public void manualOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "binghe100:9092");
    props.put("group.id", "binghe");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("name1", "name2"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }

        if (buffer.size() >= minBatchSize) {
            // operation to handle data
            consumer.commitSync();
            buffer.clear();
        }
    }
}

本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交。

  • 手动提交partition的offset

     以下实例代码展示了手动提交topic中每一个partition的offset:

public void manualOffsetCommitOfPartition() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "binghe100:9092");
    props.put("group.id", "binghe");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("name1", "name2"));

    boolean running = true;
    try {
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    logger.info(record.offset() + " : " + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    } finally {
        consumer.close();
    }
}

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐