理解storm、spark streamming等流式计算的数据来源、理解JMS规范、理解Kafka核心组件、掌握Kakfa生产者API、掌握Kafka消费者API。对流式计算的生态环境有深入的了解,具备流式计算项目架构的能力。所以学习kafka要掌握以下几点:

1、  kafka是什么?

2、  JMS规范是什么?
3、  为什么需要消息队列?
4、  Kafka核心组件
5、  Kafka安装部署
6、  Kafka生产者Java API
7、  Kafka消费者Java API
8、  Kafka整体结构图
9、  Consumer与topic关系
10、  Kafka  Producer消息分发
11、  Consumer 的负载均衡
12、  Kafka文件存储机制
13、 Kafka自定义partition

(一)kafka简介      

1、简介

        Kafka是一种分布式的、基于发布/订阅的消息系统。在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算(KAFKA + STORM+REDIS)。 Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。它最初是由LinkedIn开发,并于2011年初开源。2012年10月从ApacheIncubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

• 特点:

    – 消息持久化:通过O(1)的磁盘数据结构提供数据的持久化
    – 高吞吐量:每秒百万级的消息读写
    – 分布式:扩展能力强
    – 多客户端支持:java、php、python、c++ ……
    – 实时性:生产者生产的message立即被消费者可见
    -Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
    - Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为
    -无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性

2、JMS是什么

         JMS是什么:JMS是Java提供的一套技术规范

JMS干什么用:用来异构系统 集成通信,缓解系统瓶颈,提高系统的伸缩性增强系统用户体验,使得系统模块化和组件化变得可行并更加灵活

通过什么方式:生产消费者模式(生产者、服务器、消费者)

jdk,kafka,activemq……

2.2、JMS消息传输模型

l 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

l 发布/订阅模式(一对多,数据生产后,推送给所有订阅者)

发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即时当前订阅者不可用,处于离线状态

2.4、常见的类JMS消息服务器

2.4.1、JMS消息服务器 ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的。

主要特点:

l 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP
l 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
l 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
l 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
l 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
l 支持通过JDBC和journal提供高速的消息持久化
l 从设计上保证了高性能的集群,客户端-服务器,点对点
l 支持Ajax
l 支持与Axis的整合

l 可以很容易得调用内嵌JMS provider,进行测试

2.4.2、分布式消息中间件 Metamorphosis

 Metamorphosis(MetaQ) 是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在淘宝和支付宝有着广泛的应用,现已开源。

主要特点:

l 生产者、服务器和消费者都可分布
l 消息存储顺序写
l 性能极高,吞吐量大
l 支持消息顺序
l 支持本地和XA事务
l 客户端pull,随机读,利用sendfile系统调用,zero-copy ,批量拉数据
l 支持消费端事务
l 支持消息广播模式
l 支持异步发送消息
l 支持http协议
l 支持消息重试和recover
l 数据迁移、扩容对用户透明
l 消费状态保存在客户端
l 支持同步和异步复制两种HA

l 支持group commit

2.4.3、分布式消息中间件 RocketMQ

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

l 能够保证严格的消息顺序
l 提供丰富的消息拉取模式
l 高效的订阅者水平扩展能力
l 实时的消息订阅机制
l 亿级消息堆积能力

l Metaq3.0 版本改名,产品名称改为RocketMQ

2.4.4、其他MQ

l .NET消息中间件DotNetMQ
l 基于HBase的消息队列 HQueue
l Go 的 MQ 框架 KiteQ
l AMQP消息服务器RabbitMQ

l MemcacheQ 是一个基于MemcacheDB 的消息队列服务器。

3、为什么需要消息队列(重要)

消息系统的核心作用就是三点:解耦,异步和并行

以用户注册的案列来说明消息系统的作用

3.1、用户注册的一般流程

问题:随着后端流程越来越多,每步流程都需要额外的耗费很多时间,从而会导致用户更长的等待延迟。

3.2、用户注册的并行执行

问题:系统并行的发起了4个请求,4个请求中,如果某一个环节执行1分钟,其他环节再快,用户也需要等待1分钟。如果其中一个环节异常之后,整个服务挂掉了。

3.3、用户注册的最终一致

1、  保证主流程的正常执行、执行成功之后,发送MQ消息出去。

2、  需要这个destination的其他系统通过消费数据再执行,最终一致。

(二)Kafka基本组件

• Broker:每一台机器叫一个Broker

• Producer:日志消息生产者,用来写数据

• Consumer:消息的消费者,用来读数据

• Topic:不同消费者去指定的Topic中读,不同的生产者往不同的Topic中写
• Partition:在Topic基础上做了进一步区分分层

• Kafka内部是分布式的、一个Kafka集群通常包括多个Broker
• 负载均衡:将Topic分成多个分区,每个Broker存储一个或多个Partition
• 多个Producer和Consumer同时生产和消费消息

2.1 topic

• 一个Topic是一个用于发布消息的分类或feed名,kafka集群使用分区的日志,每个分区都是有顺序且不变的消息序列。

• commit的log可以不断追加。消息在每个分区中都分配了一个叫offset的id序列来唯一识别分区中的消息。

• 举例:若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹,如图

• 无论发布的消息是否被消费,kafka都会持久化一定时间(可配置)。
• 在每个消费者都持久化这个offset在日志中。通常消费者读消息时会使offset值线性的增长,但实际上其位置是由消费者控制,它可以按任意顺序来消费消息。比如复位到老的offset来重新处理。

• 每个分区代表一个并行单元。

2.2 message

• message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer
• DFS的基本单位:block
• Flume的基本单位:event
• Hadoop task进程
• Spark task/partition 线程
messageformat(磁盘上的存储格式)
    –  message length :4 bytes (value: 1+4+n)
    –  "magic"value : 1 byte
    –  crc : 4 bytes

    –  payload : n bytes

2.3 Producer

• 生产者可以发布数据到它指定的topic中,并可以指定在topic里哪些消息分配到哪些分区(比如简单的轮流分发各个分区或通过指定分区语义分配key到对应分区)key%3(partition)
•  生产者直接把消息发送给对应分区的broker,而不需要任何路由层。 

•  批处理发送,当message积累到一定数量或等待一定时间后进行发送。

 

生产者常用API:

2.4 Consumer

• 一种更抽象的消费方式:消费组(consumer group)
• 该方式包含了传统的queue和发布订阅方式
    – 首先消费者标记自己一个消费组名。消息将投递到每个消费组中的某一个消费者实例上。
    – 如果所有的消费者实例都有相同的消费组,这样就像传统的queue方式。
    – 如果所有的消费者实例都有不同的消费组,这样就像传统的发布订阅方式。
    – 消费组就好比是个逻辑的订阅者,每个订阅者由许多消费者实例构成(用于扩展或容错)。
• 相对于传统的消息系统,kafka拥有更强壮的顺序保证。

• 由于topic采用了分区,可在多Consumer进程操作时保证顺序性和负载均衡。

消费者常用API:

(三)kafka core

3.1 持久化

• Kafka存储布局简单:Topic的每个Partition对应一个逻辑日志(一个日志为相同大小的一组分段文件)
• 每次生产者发布消息到一个分区,代理就将消息追加到最后一个段文件中。当发布的消息数量达到设定值或者经过一定的时间后,段文件真正flush磁盘中。写入完成后,消息公开给消费者。
• 与传统的消息系统不同,Kafka系统中存储的消息没有明确的消息Id。
• 消息通过日志中的逻辑偏移量来公开。

3.2 传输效率

 

• 生产者提交一批消息作为一个请求。消费者虽然利用api遍历消息是一个一个的,但背后也是一次请求获取一批数据,从而减少网络请求数量。

 

• Kafka层采用无缓存设计,而是依赖于底层的文件系统页缓存。这有助于避免双重缓存,及即消息只缓存了一份在页缓存中。同时这在kafka重启后保持缓存warm也有额外的优势。因kafka根本不缓存消息在进程中,故gc开销也就很小
• zero-copy:kafka为了减少字节拷贝,采用了大多数系统都会提供的
sendfile系统调用
• 太多小的IO操作(message)以及过多的字节拷贝

 

    --sendfile:实现页缓存和

传统方式:1、操作系统将数据从磁盘读到内核空间的页缓存
                2、应用(application)将数据从内核空间读到用户空间的页缓存
                3、application将数据用户空间写到 socket buffer中
                4、操作系统将数据从socket缓存写到网卡缓存中
Zero copy:1、操作系统将数据从磁盘读到内核空间的页缓存
                  2、read buffer (内核页缓存)socket buffer
                  3、socket buffer缓存到NIC buffer (网卡缓存)

 

页缓存和块缓存的区别:(内核为块设备提供的两个通用的方案)为了加快到后端设备的IO效率   

 

3.3无状态的Broker

• Kafka代理是无状态的:意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,代理完全不管。这种设计非常微妙,它本身包含了创新
– 从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间SLA协议应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。

– 这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。

3.4 交付保证

• Kafka默认采用at least once的消息投递策略。即在消费者端的处理顺序是获得消息->处理消息->保存位置。这可能导致一旦客户端挂掉,新的客户端接管时处理前面客户端已处理过的消息。
• 三种保证策略:
    – At most once 消息可能会丢,但绝不会重复传输
    – At least one 消息绝不会丢,但可能会重复传输

    – Exactly once 每条消息肯定会被传输一次且仅传输一次

3.5 副本管理

• kafka将日志复制到指定多个服务器上。
• 复本的单元是partition。在正常情况下,每个分区有一个leader和0到多个follower。
• leader处理对应分区上所有的读写请求。分区可以多于broker数,leader也是分布式的。

• follower的日志和leader的日志是相同的, follower被动的复制leader。如果leader挂了,其中一个follower会自动变成新的leader.

• 和其他分布式系统一样,节点“活着” 定义在于我们能否处理一些失败情况。kafka需要两个条件保证是“活着”
    – 节点在zookeeper注册的session还在且可维护(基于zookeeper心跳机制)
    – 如果是slave则能够紧随leader的更新不至于落得太远。
• kafka采用in sync来代替“活着”
    – 如果follower挂掉或卡住或落得很远,则leader会移除同步列表中的in sync。至于落了多远才叫远由replica.lag.max.messages配置,而表示复本“卡住”由replica.lag.time.max.ms配置
• 所谓一条消息是“提交”的,意味着所有in sync的复本也持久化到了他们的log中。这意味着消费者无需担心leader挂掉导致数据丢失。另一方面,生产者可以选择是否等待消息“提交”。

• kafka动态的维护了一组in-sync(ISR)的复本,表示已追上了leader,只有处于该状态的成员组才是能被选择为leader。这些ISR组会在发生变化时被持久化到zookeeper中。通过ISR模型和f+1复本,可以让kafka的topic支持最多f个节点挂掉而不会导致提交的数据丢失。

3.6 分布式协调

• 由于kafka中一个topic中的不同分区只能被消费组中的一个消费者消费,就避免了多个消费者消费相同的分区时会导致额外的开销(如要协调哪个消费者消费哪个消息,还有锁及状态的开销)。kafka中消费进程只需要在代理和同组消费者有变化时时进行一次协调(这种协调不是经常性的,故可以忽略开销)。
• kafka使用zookeeper做以下事情:
    – 探测broker和consumer的添加或移除
    – 当1发生时触发每个消费者进程的重新负载。

    – 维护消费关系和追踪消费者在分区消费的消息的offset。

3.7 与zookkeeper的使用

• Broker Node Registry
    • /brokers/ids/[0...N] --> host:port (ephemeral node)
        – broker启动时在/brokers/ids下创建一个znode,把broker id写进去。
        – 因为broker把自己注册到zookeeper中实用的是瞬时节点,所以这个注册是动态的,如果broker宕机或者没有响应该节点就会被删除。
• Broker Topic Registry
    • /brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
        – 每个broker把自己存储和维护的partion信息注册到该路径下。
• Consumers and Consumer Groups
    – consumers也把它们自己注册到zookeeper上,用以保持消费负载平衡和offset记录。
    – group id相同的多个consumer构成一个消费租,共同消费一个topic,同一个组的consumer会尽量均匀的消费,其中的一个consumer只会消费一个partion的数据。
• Consumer Id Registry
    • /consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ...,"topicN": #streams} (ephemeral node)
        – 每个consumer在/consumers/[group_id]/ids下创建一个瞬时的唯一的consumer_id,用来描述当前该group下有哪些consumer是alive的,如果消费进程挂掉对应的consumer_id就会从该节点删除。
• Consumer Offset Tracking
    • /consumers/[group_id]/offsets/[topic]/[partition_id] -->offset_counter_value ((persistent node)
        – consumer把每个partition的消费offset记录保存在该节点下。
• Partition Owner registry
    • /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] -->consumer_node_id (ephemeral node)

– 该节点维护着partion与consumer之间的对应关系。

3.8 Kafka对比其他消息服务

• LinkedIn团队做了个实验研究,对比Kafka与Apache ActiveMQ V5.4和RabbitMQ V2.4的性能。他们使用ActiveMQ默认的消息持久化库Kahadb。LinkedIn在两台Linux机器上运行他们的实验,每台机器的配置为8核2GHz、16GB内存,6个磁盘使用RAID10。两台机器通过1GB网络连接。一台机器作为代理,另一台作为生产者或者消费者。

• 生产者测试:

    – 对每个系统,运行一个生产者,总共发布1000万条消息,每条消息200字节。Kafka生产者以1和50批量方式发送消息。ActiveMQ和RabbitMQ似乎没有简单的办法来批量发送消息,LinkedIn假定它的批量值为1。结果如下图所示:

Kafka 性能要好很多的主要原因包括:
1. Kafka不等待代理的确认,以代理能处理的最快速度发送消息。

2. Kafka有更高效的存储格式。平均而言,Kafka每条消息有9字节的开销,而ActiveMQ有144字节。其原因是JMS所需的沉重消息头,以及维护各种索引结构的开销。LinkedIn注意到ActiveMQ一个最忙的线程大部分时间都在存取B-Tree以维护消息元数据和状态。

• 消费者测试:

    – 为了做消费者测试,LinkedIn使用一个消费者获取总共1000万条消息。LinkedIn让所有系统每次读取请求都预获取大约相同数量的数据,最多1000条消息或者200KB。对ActiveMQ和RabbitMQ,LinkedIn设置消费者确认模型为自动。结果如图所示。

Kafka 性能要好很多的主要原因包括:

1. Kafka有更高效的存储格式,在Kafka中,从代理传输到消费者的字节更少。

2. ActiveMQ和RabbitMQ两个容器中的代理必须维护每个消息的传输状态。LinkedIn团队注意到其中一个ActiveMQ线程在测试过程中,一直在将KahaDB页写入磁盘。与此相反,Kafka代理没有磁盘写入动作。最后,Kafka通过使用sendfileAPI降低了传输开销

(四)Kafka整体结构图

l  Producer :消息生产者,就是向kafka broker发消息的客户端。
l  Consumer :消息消费者,向kafka broker取消息的客户端
l  Topic :咋们可以理解为一个队列。
l  Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
l  Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
l  Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

l  Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka

(五)Consumer与topic关系

本质上kafka只支持Topic;
l  每个group中可以有多个consumer,每个consumer属于一个consumergroup;
        通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
l  对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;
        那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
l  在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻)
        一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。
l  kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。

(六)Kafka消息的分发

6.1、Producer客户端负责消息的分发

l  kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息;
l  当producer获取到metadata信息之后,producer将会和Topic下所有partition leader保持socket连接;
l  消息由producer直接通过socket发送到broker,中间不会经过任何"路由层",事实上,消息被路由到哪个partition上由producer客户端决定;
比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。

l  在producer端的配置文件中,开发者可以指定partition路由的方式。

6.2、Producer消息发送的应答机制

设置发送数据是否需要服务端的反馈,有三个值0,1,-1

0: producer不会等待broker发送ack
1: 当leader接收到消息之后发送ack

-1: 当所有的follower都同步消息成功后发送ack

 request.required.acks=0

(七)Consumer的负载均衡

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力,步骤如下:
1、   假如topic1,具有如下partitions: P0,P1,P2,P3
2、   加入group中,有如下consumer: C1,C2
3、   首先根据partition索引号对partitions排序: P0,P1,P2,P3
4、   根据consumer.id排序: C0,C1
5、   计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

6、   然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i *M),P((i + 1) * M -1)]

 

(八)Kafka文件存储机制

 

8.1、Kafka文件存储基本结构

l  在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。

 

l  每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segmentfile消息数量不一定相等,这种特性方便oldsegment file快速被删除。默认保留7天的数据。

l  每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。(什么时候创建,什么时候删除)

数据有序的讨论?
         一个partition的数据是否是有序的?          间隔性有序,不连续
         针对一个topic里面的数据,只能做到partition内部有序,不能做到全局有序。
         特别加入消费者的场景后,如何保证消费者消费的数据全局有序的?伪命题。

        只有一种情况下才能保证全局有序?就是只有一个partition。

 

8.2、Kafka Partition Segment

Segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件。
Segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

 

索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。

3,497:当前log文件中的第几条信息,存放在磁盘上的那个地方

上述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。

 

其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。

segment data file由许多message组成, qq物理结构如下:

8.3、Kafka 查找message

读取offset=368776的message,需要通过下面2个步骤查找。

8.3.1、查找segment file

00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0
00000000000000368769.index的消息量起始偏移量为368770= 368769 + 1
00000000000000737337.index的起始偏移量为737338=737337+ 1
其他后续文件依次类推。

 

以起始偏移量命名并排序这些文件,只要根据offset**二分查找**文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index和对应log文件。

8.3.2、通过segment file查找message

当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址

然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

(九)、Kafka自定义Partition

package cn.itcast;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;


public class MyKafkaProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list","mini1:9092");

        // 默认的序列化为byte改为string
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        /**
         * 自定义parition的基本步骤
         * 1、实现partition类
         * 2、加一个构造器,MyPartitioner(VerifiableProperties properties)
         * 3、将自定义的parititoner加入到properties中
         *    properties.put("partitioner.class","cn.itcast.MyPartitioner")
         * 4、producer.send方法中必须指定一个paritionKey
         */
        properties.put("partitioner.class","cn.itcast.MyPartitioner");
        Producer producer = new Producer(new ProducerConfig(properties));
        while (true){
            producer.send(new KeyedMessage("order4","zhang","我爱我的祖国"));
//            producer.send(new KeyedMessage("order","我爱我的祖国"));
        }
    }
}
package cn.itcast;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;


public class MyPartitioner implements Partitioner {
    public MyPartitioner(VerifiableProperties properties) {
    }
    public int partition(Object key, int numPartitions) {
        return 2;
    }
}

(十一)、Kafka常用操作命令

• 查看当前服务器中的所有topic

bin/kafka-topics.sh --list --zookeeper  zk01:2181

• 创建topic

./kafka-topics.sh --create --zookeeper mini1:2181--replication-factor 1 --partitions 3 --topic first

• 删除topic

sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

    – 注意:如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
    – 此时你若想真正删除它,可以登录zookeeper客户端,进入终端后,删除相应节点

• 通过shell命令发送消息

kafka-console-producer.sh --broker-list kafka01:9092--topic itheima

• 通过shell消费消息

sh bin/kafka-console-consumer.sh --zookeeper zk01:2181--from-beginning --topic test1

• 查看消费位置

sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeperzk01:2181 --group testGroup

•  查看某个Topic的详情

sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181

 12、Flume&Kafka结合应用

• 启动FLume

–  ./bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf -name producer -Dflume.root.logger=DEBUG,console

• 启动Zookeeper
    – 略

• 启动Kafka Server

     ./bin/kafka-server-start.sh config/server.properties
• 发送消息
     # echo 'XXXX' | nc -u master 8285
• 启动Consumer进行数据监控
    –  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐