本文介绍KafkaConsumer,一个从kafka集群消费记录的java客户端。该客户端是非线程安全的,关于多线程的使用,参见文章的“Multi-threaded Processing”部分。本文介绍的内容来自于kafka官方文档,详情参见KafkaConsumer

kafka以数字形式的偏移量(a numerical offset)维护着每条消息在partition中的位置,offset是每条消息在partition中的唯一标志,同时也用于表示消费者的消费进度。当客户端调用poll方法从partition中获取消息后,offset会自动向前递进;当然offset也可以手动提交,即客户端可以决定何时提交该消费进度,详见“Manual Offset Control”。至于offset的存储位置,在现有版本中,默认情况下以topic(名为__consumer_offsets)的形式保存在本地。用户可以根据需要放弃这种内置的offset存储方式,而选择自己的方式,如存储在磁盘上,或者数据库中等。
kafka具有分组的概念,具有相同group.id的消费者属于同一个消费小组,kafka会动态的维护小组中的成员,当有消费者加入或者离开时,kafka为会剩余的活跃成员重新分配partition,位于同一小组的消费者它们会消费同一个topic的不同partition,通常kafka会自动的为每个客户端分配partition,而且是尽可能均匀的分配。当然,用户也可以拒绝这种自动的partition分配方式,而是每次只从既定的partition消费消息,详见“Manual Partition Assignment”。

示例代码:

package test.kafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestKafkaConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");       
        props.put("group.id", "test");//消费者的组id
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //订阅主题列表topic
        consumer.subscribe(Arrays.asList("test01","mytopic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");
        }
    }

}

相关参数说明(kafka全部参数详见consumerconfigs):
bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2;
group.id:消费者的组id
kafka使用消费者分组的概念来允许多个消费者共同消费和处理同一个topic中的消息。分组中消费者成员是动态维护的,如果一个消费者处理失败了,那么之前分配给它的partition将被重新分配给分组中其他消费者;同样,如果分组中加入了新的消费者,也将触发整个partition的重新分配,每个消费者将尽可能的分配到相同数目的partition,以达到新的均衡状态;
enable.auto.commit:用于配置是否自动的提交消费进度;
auto.commit.interval.ms:用于配置自动提交消费进度的时间;
session.timeout.ms:会话超时时长,客户端需要周期性的发送“心跳”到broker,这样broker端就可以判断消费者的状态,如果消费在会话周期时长内未发送心跳,那么该消费者将被判定为dead,那么它之前所消费的partition将会被重新的分配给其他存活的消费者;
key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

Manual Offset Control
上述的示例代码是自动提交消费进度(offset)的方式,下面给出手动控制offset的方式

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false"); //关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }   
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}

用户可以控制消息何时被确认为消费完成,然后手动的提交offset,而不是像之前那样依赖于客户端周期性的提交offset。在上述的例子中,客户端从partition中拉取消息后,将他们加入本地的buffer中,等待buffer中的消息达到一定数量后,将它们插入到DB中,然后才向broker发送offset,以确认消息被真正的消费完成。在这种情况下,kafka可以确保每条消息至少被消费一次,因为如果commitSync方法提交失败了,那么将导致这部分消息被重复的消费。

Manual Partition Assignment
在前面的例子中,用户订阅相关的topic之后,kafka为同一小组中的活跃消费者动态的分配partition,下面介绍如何手动的订阅某个topic的partition,这在有些时候是非常有用的,例如:
1. 如果进程维护着消费者和partition的固定关系,那么每次启动时只能从指定的partition中消费数据;
2. 如果进程本身是高可用的,即进程失败了,它会自己重启(例如基于YARN,Mesos等集群管理框架),在这种情况下,不需要kafka来动态的为消费分配partition。
手动的订阅partition的方式如下:

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

一旦分配了partition,那么之后的消费方式与之前一样,用户通过循环的调用poll方法来从partition中获取消息。但是此时如果消费者失败了,kafka不会将你之前消费的partition重新分配给其他消费者,因此不会出现rebalance。同时用户也应该确保每个消费者消费的partition是不重合的。

Multi-threaded Processing
kafka消费者是非线程安全的,所有的网络I/O都发生在应用发起调用的线程中。用户有责任确保多线程获取同一资源是同步的,非同步的方式将导致ConcurrentModificationException异常。
一种简单的处理方式是每个线程都创建自己的消费者实例。其优缺点如下:
优点:1.易于实现;2.因为没有线程间的协调,这种方式通常很快;3.它使得顺序的处理每个partition中的消息变得很容易。
缺点:1.更多的消费者实例意味着更多的客户端到服务端的TCP链接,这通常会增加少量的开销;2.多个客户端意味着更多的发送请求,以及每次批量数据的减少,这可能会导致I/O丢包;3.消费者线程的数目受限于partition数量,如果消费者的线程数多于partition的数量,必然会导致某些线程无法被分配partition进行消费。
另一种选择是使用一个或者多个消费者线程消费数据后存储到一个队列中,再使用另外的多线程从队列中取数据以完成真正的数据处理,也就是通过使用内存队列将消息的消费和处理分离开,以解决某些情况消费的速度要远快于消息处理速度的情况:
优点:使得消息的消费和处理变得独立,可以实现单线程消费消息,多线程处理消息,而不受partition数量的影响。
缺点:1.不能保证消息处理的顺序性,因为每个处理消息的线程时独立的;2.它使得手动的提交消费进度的offset变的困难,不同的消息被不同的线程独立处理,协调每个线程的处理情况是困难的。

相关API
KafkaConsumer实例化方法
KafkaConsumer对象实例化方法,可以使用map形式的键值对或者Properties对象来配置客户端的属性

/*
 *keyDeserializer:发送数据key值的反序列化方法,该方法实现了Deserializer接口
 *valueDeserializer:发送数据value值的反序列化方法,该方法实现了Deserializer接口
 */
public KafkaConsumer(Map<String,Object> configs);
public KafkaConsumer(Map<String,Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);
public KafkaConsumer(Properties properties);
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);

subscribe()

/*
 *topics:订阅的topic集合
 *listener:当partition分配或者撤回时的回调函数
 *pattern:订阅符合特定模式的topic,并动态的获取partition
 */
public void subscribe(Collection<String> topics);
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener);
public void subscribe(Pattern pattern,ConsumerRebalanceListener listener);

订阅topic列表,并动态的获取分配的partition。topic订阅不是累加形式的,比如已经订阅了topic1,当再调用该接口,并传入列表[“topic2”,”topic3”]时,会覆盖之前的订阅列表,也就是说现在订阅的列表是[“topic2”,”topic3”],而不是[“topic1”,”topic2”,”topic3”]。消费者会跟踪消费分组中的消费者列表,以下情况是触发rebalance,即partition的重新分配等:1. 订阅的topic中任何一个topic的partition数量发生改变;2.Topic被创建或者删除了;3.消费者分组的已有成员死亡;4.消费者分组中加入了新的成员

unsubscribe()

//取消订阅之前使用subscribe接口订阅的topic列表
public void unsubscribe();

assign()

public void assign(Collection<TopicPartition> partitions)

手动的为消费者分配partition,现有的partition列表会覆盖之前已分配的partition列表(如果之前已存在partition列表的话),如果列表为空,其功能等同于unsubscribe。

poll()

/*
 *timeout:单位毫秒,即如果调用poll接口时,没有数据可供消费,那么会等待timeout时长
 */
public ConsumerRecords<K,V> poll(long timeout)

当消费者订阅topic之后,通过调用poll()方法自动的加入消费者分组。poll()方法被设计成可以确保消费者是存活的,只要客户端持续的调用该方法,消费者就可以留在分组中,继续的从之前分配给它的partition中接收消息。

commitSync()

public void commitSync(); 

自上次调用poll()方法后,提交所订阅的所有topic的partition消费进度

close()

public void close();

关闭消费者,并释放相关资源。消费者维护着客户端到broker端的TCP链接,如果不调用close方法,或者调用失败将使得这些链接资源发生泄漏。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐