1.异步发送API

(1)idea创建

1
2
3

(2) Producer生成异步发送(生成4个分区,默认取余,为乱序排序(4分区排))

package com.cevent.kafka.producer;/**
 * Created by Cevent on 2020/6/19.
 */

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

/** Kafka生产者:异步通信,且不带回调函数
 * @author cevent
 * @description
 * @date 2020/6/19 10:02
 */
public class ProducerWithoutCallBack {

    public static void main(String[] args) {

        //1.KafkaProducer需要传入的参数为Properties类型,相当于map用法,存放属性配置
        Properties props=new Properties();
        //1.1初始化服务器,获取集群信息
        props.put("bootstrap.servers","hadoop207.cevent.com:9092");
        //1.2:producer发送acknowledgement确认收到-返回值:选举新的leader时,容忍n台节点的故障,需要n+1个副本。安全,延迟高
        //全部完成同步,才发送acks
        props.put("acks","all");
        //1.3kafka挂掉,重置一次
        props.put("retries",1);
        //1.4只有数据积累到batch.size之后,sender才会发送数据,batch的大小
        props.put("batch.size",16384);
        //1.5如果数据迟迟未达到batch.size,sender等待linger.time(延迟)之后就会发送数据
        props.put("linger.ms",1);
        //1.6记录累加器record accumulator:缓冲区大小
        /**
         * 异步发送模式:
         * 消息发送涉及2个线程:main线程、sender线程
         * 1个线程共享变量:RecordAccumulator记录累加器
         * main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker
         * buffer.memory设置RecordAccumulator记录累加器的缓冲区大小
         */
        props.put("buffer.memory",33554432);
        //1.7将发送的消息序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //2.接口类型:放入props,获取数据
        Producer<String,String> producer= new KafkaProducer<String,String>(props);

        //3.发送10条消息
        for (int i=0;i<11;i++){
            //prducer发送:topic,key,value
            //Future为异步发送,但没有回调函数
            //Future<RecordMetadata> future= producer.send(new ProducerRecord<String, String>("cevent_first","message"+i));
            producer.send(new ProducerRecord<String, String>("cevent_first","cevent-message-"+i));
        }

        producer.close();
    }
}

(3)直接run-class

4

(4)启动dfs、yarn、zkServer、kafka-server-start -daemon生成消息队列

hadoop207




 
  
  [cevent@hadoop207 hadoop-2.7.2]$ sbin/start-dfs.sh 
  [cevent@hadoop207 hadoop-2.7.2]$ sbin/start-yarn.sh 
  [cevent@hadoop207 hadoop-2.7.2]$ jps
  5036 Jps
  4102 NameNode
  4597 ResourceManager
  4715 NodeManager
  4428 SecondaryNameNode
  4218 DataNode
   
  [cevent@hadoop207 zookeeper-3.4.10]$ bin/zkServer.sh start 启动zk
  ZooKeeper JMX enabled by default
  Using config:
  /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
  Starting zookeeper ... STARTED
   
  [cevent@hadoop207 kafka_2.11-0.11.0.0]$ bin/kafka-topics.sh --zookeeper hadoop207.cevent.com:2181
  --list 查看kafka队列
  __consumer_offsets
  cevent_first
  First
   
  [cevent@hadoop207 kafka_2.11-0.11.0.0]$ 启动producer
  bin/kafka-console-producer.sh
  --broker-list hadoop207.cevent.com:9092 --topic cevent_first
  >kakaxi
  >luban
  >cevent
  >echo
  >lu
  
 


hadoop208



[cevent@hadoop207 hadoop-2.7.2]$ sbin/start-dfs.sh 

[cevent@hadoop207 hadoop-2.7.2]$ sbin/start-yarn.sh 

[cevent@hadoop207 hadoop-2.7.2]$ jps

5036 Jps

4102 NameNode

4597 ResourceManager

4715 NodeManager

4428 SecondaryNameNode

4218 DataNode

 

[cevent@hadoop207 zookeeper-3.4.10]$ bin/zkServer.sh start 启动zk

ZooKeeper JMX enabled by default

Using config:
/opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

 

[cevent@hadoop207 kafka_2.11-0.11.0.0]$ bin/kafka-topics.sh --zookeeper hadoop207.cevent.com:2181
--list 查看kafka队列

__consumer_offsets

cevent_first

First

 

[cevent@hadoop207 kafka_2.11-0.11.0.0]$ 启动producer

bin/kafka-console-producer.sh
--broker-list hadoop207.cevent.com:9092 --topic cevent_first

>kakaxi

>luban

>cevent

>echo

>lu

[cevent@hadoop208 zookeeper-3.4.10]$ bin/zkServer.sh start 启动zk

ZooKeeper JMX enabled by default

Using config:
/opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[cevent@hadoop208 zookeeper-3.4.10]$ jps

3824 DataNode

3949 NodeManager

4126 QuorumPeerMain

4157 Jps

 

[cevent@hadoop208 kafka_2.11-0.11.0.0]$ 启动kafka-console-consumer.sh接收日志--bootstrap-server

kafka-console-consumer.sh
--bootstrap-server hadoop207.cevent.com:9092 --topic cevent_first
--from-beginning

luban

hha

cevent

s 

kakaxi

lu

cck

echo

kaka

cevent-message-3

cevent-message-7

cevent-message-0

cevent-message-4

cevent-message-8

cevent-message-1

cevent-message-5

cevent-message-9

cevent-message-2

cevent-message-6

cevent-message-10

 

hadoop209




 
  
  [cevent@hadoop209 zookeeper-3.4.10]$ bin/zkServer.sh start
  ZooKeeper JMX enabled by default
  Using config:
  /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
  Starting zookeeper ... STARTED
  [cevent@hadoop209 zookeeper-3.4.10]$ jps
  3899 NodeManager
  4121 Jps
  4092 QuorumPeerMain
   
  [cevent@hadoop209 kafka_2.11-0.11.0.0]$ 启动bootstrap-server
  kafka-console-consumer.sh
  --bootstrap-server hadoop207.cevent.com:9092 --topic cevent_first
  --from-beginning
  luban
  kaka
  hha
  cevent
  s 
  kakaxi
  lu
  cck
  echo
  cevent-message-3
  cevent-message-7
  cevent-message-0
  cevent-message-4
  cevent-message-8
  cevent-message-1
  cevent-message-5
  cevent-message-9
  cevent-message-2
  cevent-message-6
  cevent-message-10
  
 


(5) 添加log4j,配置POM




 
  
  <?xml version="1.0" encoding="UTF-8"?>

  <project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

  

    <groupId>com.cevent</groupId>

    <artifactId>kafka_asnyc_api</artifactId>

    <version>1.0-SNAPSHOT</version>

  

    <dependencies>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>0.11.0.0</version>

        </dependency>

  

        <dependency>

            <groupId>org.apache.logging.log4j</groupId>

            <artifactId>log4j-slf4j-impl</artifactId>

            <version>2.12.0</version>

        </dependency>

  

    </dependencies>

  

  

</project>
   
  
 


(6)log4j2.xml配置




 
  
  <?xml version="1.0" encoding="UTF-8"?>

  

  <Configuration status="error" strict="true" name="XMLConfig">

    <Appenders>

        <!--1.appender附加器:类型为console,名称为必填属性-->

        <Appender type="Console" name="STDOUT">

            <!--布局为PatternLayout的方式

            输出样式:[INFO] [2020-06-19 10:52:00] [org.test.Console] I'm here

            -->

            <Layout type="PatternLayout"

                    pattern="[%P] [%d{yyyy-MM-dd HH:mm:ss}] [%c{10}]%m%n"

            />

        </Appender>

  

    </Appenders>

  

    <!--Logger配置-->

  

    <Loggers>

        <!--可加性为false-->

        <Logger type="test" lebel="info" additivity="false">

            <AppenderRef ref="STDOUT"/>

        </Logger>

        <!--root LoggerConfig配置-->

        <Root level="info">

            <AppenderRef ref="STDOUT"/>

        </Root>

    </Loggers>

  

</Configuration>
   
  
 


(7)console信息




 
  
  C:\JAVA\JDK\bin\java
  "-javaagent:C:\DevTools\IntelliJ IDEA 2017.1.4\lib\idea_rt.jar=55347:C:\DevTools\IntelliJ
  IDEA 2017.1.4\bin" -Dfile.encoding=UTF-8 -classpath
  C:\JAVA\JDK\jre\lib\charsets.jar;C:\JAVA\JDK\jre\lib\deploy.jar;C:\JAVA\JDK\jre\lib\ext\access-bridge-64.jar;C:\JAVA\JDK\jre\lib\ext\cldrdata.jar;C:\JAVA\JDK\jre\lib\ext\dnsns.jar;C:\JAVA\JDK\jre\lib\ext\jaccess.jar;C:\JAVA\JDK\jre\lib\ext\jfxrt.jar;C:\JAVA\JDK\jre\lib\ext\localedata.jar;C:\JAVA\JDK\jre\lib\ext\nashorn.jar;C:\JAVA\JDK\jre\lib\ext\sunec.jar;C:\JAVA\JDK\jre\lib\ext\sunjce_provider.jar;C:\JAVA\JDK\jre\lib\ext\sunmscapi.jar;C:\JAVA\JDK\jre\lib\ext\sunpkcs11.jar;C:\JAVA\JDK\jre\lib\ext\zipfs.jar;C:\JAVA\JDK\jre\lib\javaws.jar;C:\JAVA\JDK\jre\lib\jce.jar;C:\JAVA\JDK\jre\lib\jfr.jar;C:\JAVA\JDK\jre\lib\jfxswt.jar;C:\JAVA\JDK\jre\lib\jsse.jar;C:\JAVA\JDK\jre\lib\management-agent.jar;C:\JAVA\JDK\jre\lib\plugin.jar;C:\JAVA\JDK\jre\lib\resources.jar;C:\JAVA\JDK\jre\lib\rt.jar;D:\DEV_CODE\Intelligy_idead_code\kafka\kafka_asnyc_api\target\classes;C:\Users\asus\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.0\kafka-clients-0.11.0.0.jar;C:\Users\asus\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\asus\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\asus\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\asus\.m2\repository\org\apache\logging\log4j\log4j-slf4j-impl\2.12.0\log4j-slf4j-impl-2.12.0.jar;C:\Users\asus\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.0\log4j-api-2.12.0.jar;C:\Users\asus\.m2\repository\org\apache\logging\log4j\log4j-core\2.12.0\log4j-core-2.12.0.jar
  com.cevent.kafka.producer.ProducerWithoutCallBack
  2020-06-19 11:48:54,561 main ERROR Error
  processing element test ([Loggers: null]): CLASS_NOT_FOUND
  2020-06-19 11:48:54,583 main ERROR
  Unrecognized format specifier [P]
  2020-06-19 11:48:54,583 main ERROR
  Unrecognized conversion specifier [P] starting at position 3 in conversion
  pattern.
  [%P] [2020-06-19 11:48:54]
  [org.apache.kafka.clients.producer.ProducerConfig]ProducerConfig values: 
           acks
  = all
           batch.size
  = 16384
           bootstrap.servers
  = [hadoop207.cevent.com:9092]
           buffer.memory
  = 33554432
           client.id
  = 
           compression.type
  = none
           connections.max.idle.ms
  = 540000
           enable.idempotence
  = false
           interceptor.classes
  = null
           key.serializer
  = class org.apache.kafka.common.serialization.StringSerializer
           linger.ms
  = 1
           max.block.ms
  = 60000
           max.in.flight.requests.per.connection
  = 5
           max.request.size
  = 1048576
           metadata.max.age.ms
  = 300000
           metric.reporters
  = []
           metrics.num.samples
  = 2
           metrics.recording.level
  = INFO
           metrics.sample.window.ms
  = 30000
           partitioner.class
  = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
           receive.buffer.bytes
  = 32768
           reconnect.backoff.max.ms
  = 1000
           reconnect.backoff.ms
  = 50
           request.timeout.ms
  = 30000
           retries
  = 1
           retry.backoff.ms
  = 100
           sasl.jaas.config
  = null
           sasl.kerberos.kinit.cmd
  = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin
  = 60000
           sasl.kerberos.service.name
  = null
           sasl.kerberos.ticket.renew.jitter
  = 0.05
           sasl.kerberos.ticket.renew.window.factor
  = 0.8
           sasl.mechanism
  = GSSAPI
           security.protocol
  = PLAINTEXT
           send.buffer.bytes
  = 131072
           ssl.cipher.suites
  = null
           ssl.enabled.protocols
  = [TLSv1.2, TLSv1.1, TLSv1]
           ssl.endpoint.identification.algorithm
  = null
           ssl.key.password
  = null
           ssl.keymanager.algorithm
  = SunX509
           ssl.keystore.location
  = null
           ssl.keystore.password
  = null
           ssl.keystore.type
  = JKS
           ssl.protocol
  = TLS
           ssl.provider
  = null
           ssl.secure.random.implementation
  = null
           ssl.trustmanager.algorithm
  = PKIX
           ssl.truststore.location
  = null
           ssl.truststore.password
  = null
           ssl.truststore.type
  = JKS
           transaction.timeout.ms
  = 60000
           transactional.id
  = null
           value.serializer
  = class org.apache.kafka.common.serialization.StringSerializer
   
  [%P] [2020-06-19 11:48:54]
  [org.apache.kafka.common.utils.AppInfoParser]Kafka version : 0.11.0.0
  [%P] [2020-06-19 11:48:54]
  [org.apache.kafka.common.utils.AppInfoParser]Kafka commitId :
  cb8625948210849f
  [%P] [2020-06-19 11:48:54]
  [org.apache.kafka.clients.producer.KafkaProducer]Closing the Kafka producer
  with timeoutMillis = 9223372036854775807 ms.
   
  Process finished with exit code 0
   
  
 


2.异步发送:main线程发送,回调ack信息,sender线程发送




 
  
  package com.cevent.kafka.producer;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.Metadata;

import org.apache.kafka.clients.producer.*;

  

import java.util.Properties;

  

  /** Kafka生产者:异步通信,且不带回调函数

 * @author cevent

 * @description

 * @date 2020/6/19 10:02

 */

  public class ProducerWithoutCallBackACK {

  

    public static void main(String[] args) {

  

        //1.KafkaProducer需要传入的参数为Properties类型,相当于map用法,存放属性配置

        Properties props=new Properties();

        //1.1初始化服务器,获取集群信息

        props.put("bootstrap.servers","hadoop207.cevent.com:9092");

        //1.2:producer发送acknowledgement确认收到-返回值:选举新的leader时,容忍n台节点的故障,需要n+1个副本。安全,延迟高

        //全部完成同步,才发送acks

        props.put("acks","all");

        //1.3kafka挂掉,重置一次

        props.put("retries",1);

        //1.4只有数据积累到batch.size之后,sender才会发送数据,batch的大小

        props.put("batch.size",16384);

        //1.5如果数据迟迟未达到batch.size,sender等待linger.time(延迟)之后就会发送数据

        props.put("linger.ms",1);

        //1.6记录累加器record accumulator:缓冲区大小

        /**

         * 异步发送模式:

         * 消息发送涉及2个线程:main线程、sender线程

         * 1个线程共享变量:RecordAccumulator记录累加器

         * main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker

         * buffer.memory设置RecordAccumulator记录累加器的缓冲区大小

         */

        props.put("buffer.memory",33554432);

        //1.7将发送的消息序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer

        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

  

        //2.接口类型:放入props,获取数据

        Producer<String,String> producer= new KafkaProducer<String,String>(props);

  

        //3.发送10条消息

        for (int i=0;i<11;i++){

            //prducer发送:topic,key,value

            //Future为异步发送,但没有回调函数

            //Future<RecordMetadata> future= producer.send(new ProducerRecord<String, String>("cevent_first","message"+i));

            producer.send(new ProducerRecord<String, String>("cevent_first", "cevent-message-callback-" + i), new Callback() {//回调callback函数

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                    System.out.println(recordMetadata.toString()+"发送成功,在--> "+recordMetadata.partition()+" 分区 , 偏移量--> "+recordMetadata.offset());

                }

            });

        }

  

        producer.close();

    }

}
   
  
 


3.同步发送




 
  
  package com.cevent.kafka.producer;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

  

import java.util.Properties;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Future;

  

  /**

 * @author cevent

 * @description

 * @date 2020/6/19 13:31

 */

  public class ProduceSYNC {

    public static void main(String[] args) throws ExecutionException,InterruptedException{

  

        //1.KafkaProducer需要传入的参数为Properties类型,相当于map用法,存放属性配置

        Properties props=new Properties();

        //1.1初始化服务器,获取集群信息

        props.put("bootstrap.servers","hadoop207.cevent.com:9092");

        //1.2:producer发送acknowledgement确认收到-返回值:选举新的leader时,容忍n台节点的故障,需要n+1个副本。安全,延迟高

        //全部完成同步,才发送acks

        props.put("acks","all");

        //1.3kafka挂掉,重置一次

        props.put("retries",1);

        //1.4只有数据积累到batch.size之后,sender才会发送数据,batch的大小

        props.put("batch.size",16384);

        //1.5如果数据迟迟未达到batch.size,sender等待linger.time(延迟)之后就会发送数据

        props.put("linger.ms",1);

        //1.6记录累加器record accumulator:缓冲区大小

        /**

         * 异步发送模式:

         * 消息发送涉及2个线程:main线程、sender线程

         * 1个线程共享变量:RecordAccumulator记录累加器

         * main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker

         * buffer.memory设置RecordAccumulator记录累加器的缓冲区大小

         */

        props.put("buffer.memory",33554432);

        //1.7将发送的消息序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer

        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

  

        //2.接口类型:放入props,获取数据

        Producer<String,String> producer= new KafkaProducer<String,String>(props);

  

        //3.同步发送10条消息

        for (int i=0;i<11;i++){

            //prducer发送:topic,key,value

            //Future为异步发送,但没有回调函数

            //Future<RecordMetadata> future= producer.send(new ProducerRecord<String, String>("cevent_first","message"+i));

            Future<RecordMetadata> future= producer.send(new ProducerRecord<String, String>

                    ("cevent_first", "cevent-message-sync-同步消息:" + i));

            RecordMetadata recordMetadata=future.get();

  

            System.out.println("synchronize:"+recordMetadata.offset());

        }

  

        System.out.println("同步发送完毕!");

  

        producer.close();

    }

}
   
  
 


4.同步发送

在这里插入代码片

5.Consumer API




 
  
  package com.cevent.kafka.consumer;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

  

import java.util.Collections;

import java.util.Properties;

  

  /**

 * @author cevent

 * @description

 * @date 2020/6/19 13:46

 */

  public class AutoCommitConsumer {

    public static void main(String[] args) {

        Properties props=new Properties();

        //1.1初始化服务器,获取集群信息

        props.put("bootstrap.servers","hadoop207.cevent.com:9092");

        //1.2配置consumer组id

        props.put("group.id","consumer_group");

        //1.3启用自动提交

        props.put("enable.auto.commit","true");

        //1.4自动提交时间间隔

        props.put("auto.commit.interval.ms","1000");

        //1.5将发送的消息反序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer

        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

  

        //2.创建kafka-consumer容器

        KafkaConsumer<String,String> kafkaConsumer= new KafkaConsumer<String,String>(props);

  

        //订阅topic

        kafkaConsumer.subscribe(Collections.singleton("cevent_first"));

  

        //3.消费消息(轮循消费)

        while (true){

            ConsumerRecords<String,String> consumerRecords= kafkaConsumer.poll(1000);

            for (ConsumerRecord<String,String> record:consumerRecords){

                System.out.println("ConsumerRecord: "+record.value());

            }

        }

  

  

    }

}
   
  
 


6.手动提交offset




 
  
  package com.cevent.kafka.consumer;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

  

import java.util.Collections;

import java.util.Properties;

  

  /** 同步手动提交

 * @author cevent

 * @description

 * @date 2020/6/19 14:10

 */

  public class SYNCCommitConsumer {

    public static void main(String[] args) {

        Properties props=new Properties();

        //1.1初始化服务器,获取集群信息

        props.put("bootstrap.servers","hadoop207.cevent.com:9092");

        //1.2配置consumer组id

        props.put("group.id","consumer_group");

        //1.3启用自动提交

        props.put("enable.auto.commit","true");

        //1.4自动提交时间间隔

        props.put("auto.commit.interval.ms","1000");

        //1.5将发送的消息反序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer

        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

  

        //2.创建kafka-consumer容器

        KafkaConsumer<String,String> kafkaConsumer= new KafkaConsumer<String,String>(props);

  

        //订阅topic

        kafkaConsumer.subscribe(Collections.singleton("cevent_first"));

  

        //3.消费消息(轮循消费)

        while (true){

            ConsumerRecords<String,String> consumerRecords= kafkaConsumer.poll(1000);

            for (ConsumerRecord<String,String> record:consumerRecords){

                System.out.println("同步提交ConsumerRecord: "+record.value());

            }

  

            kafkaConsumer.commitSync();

        }

  

  

    }

}
   
  
 


7.异步提交offset




 
  
  package com.cevent.kafka.consumer;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

  

import java.util.Collections;

import java.util.Map;

import java.util.Properties;

  

  /** 异步手动提交

 * @author cevent

 * @description

 * @date 2020/6/19 14:10

 */

  public class DESYNCCommitConsumer {

    public static void main(String[] args) {

        Properties props=new Properties();

        //1.1初始化服务器,获取集群信息

        props.put("bootstrap.servers","hadoop207.cevent.com:9092");

        //1.2配置consumer组id

        props.put("group.id","consumer_group");

        //1.3启用自动提交

        props.put("enable.auto.commit","true");

        //1.4自动提交时间间隔

        props.put("auto.commit.interval.ms","1000");

        //1.5将发送的消息反序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer

        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

  

        //2.创建kafka-consumer容器

        KafkaConsumer<String,String> kafkaConsumer= new KafkaConsumer<String,String>(props);

  

        //订阅topic

        kafkaConsumer.subscribe(Collections.singleton("cevent_first"));

  

        //3.消费消息(轮循消费)

        while (true){

            ConsumerRecords<String,String> consumerRecords= kafkaConsumer.poll(1000);

            for (ConsumerRecord<String,String> record:consumerRecords){

                System.out.println("ASYNC异步提交ConsumerRecord: "+record.value());

            }

  

            kafkaConsumer.commitAsync(new OffsetCommitCallback() {

                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {

                    System.out.println("异步提交完成!");

                }

            });

        }

  

  

    }

}
   
  
 


8.自定义存储offset



自定义存储offset

package com.cevent.kafka.consumer;/**

 * Created by Cevent on 2020/6/19.

 */



import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;



import java.io.*;

import java.util.*;



/** 自定义保存offset

 * @author cevent

 * @description

 * @date 2020/6/19 14:43

 */

public class DefinedOffSetConsumer {



    //offset初始化:目前消费的offset临时存储位置

    private static Map<TopicPartition,Long> offset=new HashMap<TopicPartition, Long>();



    public static void main(String[] args) {

        Properties props=new Properties();

        //1.1初始化服务器,获取集群信息

        props.put("bootstrap.servers","hadoop207.cevent.com:9092");

        //1.2配置consumer组id

        props.put("group.id","consumer_group");

        //1.3启用自动提交

        props.put("enable.auto.commit","true");

        //1.4自动提交时间间隔

        props.put("auto.commit.interval.ms","1000");

        //1.5将发送的消息反序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer

        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");



        //2.consumer容器

        final Consumer<String,String> consumer=new KafkaConsumer<String, String>(props);



        //3.订阅topic

        consumer.subscribe(Collections.singleton("cevent_first"), new ConsumerRebalanceListener() {

            //3.1当消费者再平衡触发时,执行之前run

            public void onPartitionsRevoked(Collection<TopicPartition> collectionPartitionsRevoked) {

                //提交(之前的)offset保存到本地存储,确认上次的提交完毕

                commitOffset(offset);

            }



            //3.2当消费者再平衡之后,根据重新分配的partitions进行获取

            public void onPartitionsAssigned(Collection<TopicPartition> collectionPartitionsAssigned) {



                //3.3获取新分配的offset分区:读取到offset列表

                getOffset(offset);



                //3.4kafka从哪里开始消费,遍历新的partitions

                for(TopicPartition partition: collectionPartitionsAssigned){

                    //通知consumer从哪个分区开始消费,定位分区

                    Long offsetOBJ=offset.get(partition);

                    if(offsetOBJ==null){

                        //取值为null。分区没有被消费,则设置value=0

                        offsetOBJ=0L;

                    }

                    consumer.seek(partition,offsetOBJ);

                }

            }

        });



        //4.消费消息

        while (true){

            ConsumerRecords<String,String> consumerRecords=consumer.poll(100);





            //拉取消费数据:需要进行原子化(消费+提交),实现精确的数据消费

            {

                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                    System.out.println("ConsumerRecords消费消息中..." + consumerRecord.value());

                    //获取topic信息,更新自定义临时map

                    offset.put(new TopicPartition(consumerRecord.topic(),consumerRecord.partition()),consumerRecord.offset());

                }



                commitOffset(offset);

            }



        }



    }



    //根据目前消费到的数据,保存-->自定义offset存储

    private static void commitOffset(Map<TopicPartition,Long> offset){

        //Map中有数据,保存到自定义容器

        ObjectOutputStream objectOutputStream=null;

        //1.包装流

        try {

            objectOutputStream= new ObjectOutputStream(

                    //这里不要预先添加outputCommitOffset文件

                   new FileOutputStream("D:/DEV_CODE/Intelligy_idead_code/kafka/outputCommitOffset"));

            objectOutputStream.writeObject(offset);



        } catch (IOException e) {

            e.printStackTrace();

        } finally {



            if(objectOutputStream!=null){

                try {



                    objectOutputStream.close();



                } catch (IOException e) {

                    e.printStackTrace();

                }

            }



        }

    }



    //自定义存储offset方法,从offset容器中读取-->进Map

    private static void getOffset(

            Map<TopicPartition,Long> offset){

        ObjectInputStream objectInputStream=null;

        //1.从容器中读取offset

        try {

            objectInputStream= new ObjectInputStream(

                    new FileInputStream("D:/DEV_CODE/Intelligy_idead_code/kafka/inputCommitOffset"));

            offset=(Map<TopicPartition, Long>) objectInputStream.readObject();



        } catch (IOException e) {

            e.printStackTrace();

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

        } finally {

            if(objectInputStream!=null){

                try {

                    objectInputStream.close();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

        //2.如果没有读取到,初始化



        //3.读取到,继续读取



    }

}

 

9.校验




 
  
  C:\JAVA\JDK\bin\java
  "-javaagent:C:\DevTools\IntelliJ IDEA 2017.1.4\lib\idea_rt.jar=50577:C:\DevTools\IntelliJ
  IDEA 2017.1.4\bin" -Dfile.encoding=UTF-8 -classpath
  C:\JAVA\JDK\jre\lib\charsets.jar;C:\JAVA\JDK\jre\lib\deploy.jar;C:\JAVA\JDK\jre\lib\ext\access-bridge-64.jar;C:\JAVA\JDK\jre\lib\ext\cldrdata.jar;C:\JAVA\JDK\jre\lib\ext\dnsns.jar;C:\JAVA\JDK\jre\lib\ext\jaccess.jar;C:\JAVA\JDK\jre\lib\ext\jfxrt.jar;C:\JAVA\JDK\jre\lib\ext\localedata.jar;C:\JAVA\JDK\jre\lib\ext\nashorn.jar;C:\JAVA\JDK\jre\lib\ext\sunec.jar;C:\JAVA\JDK\jre\lib\ext\sunjce_provider.jar;C:\JAVA\JDK\jre\lib\ext\sunmscapi.jar;C:\JAVA\JDK\jre\lib\ext\sunpkcs11.jar;C:\JAVA\JDK\jre\lib\ext\zipfs.jar;C:\JAVA\JDK\jre\lib\javaws.jar;C:\JAVA\JDK\jre\lib\jce.jar;C:\JAVA\JDK\jre\lib\jfr.jar;C:\JAVA\JDK\jre\lib\jfxswt.jar;C:\JAVA\JDK\jre\lib\jsse.jar;C:\JAVA\JDK\jre\lib\management-agent.jar;C:\JAVA\JDK\jre\lib\plugin.jar;C:\JAVA\JDK\jre\lib\resources.jar;C:\JAVA\JDK\jre\lib\rt.jar;D:\DEV_CODE\Intelligy_idead_code\kafka\kafka_asnyc_api\target\classes;C:\Users\asus\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.0\kafka-clients-0.11.0.0.jar;C:\Users\asus\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\asus\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\asus\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\asus\.m2\repository\org\apache\logging\log4j\log4j-slf4j-impl\2.12.0\log4j-slf4j-impl-2.12.0.jar;C:\Users\asus\.m2\repository\org\apache\logging\log4j\log4j-api\2.12.0\log4j-api-2.12.0.jar;C:\Users\asus\.m2\repository\org\apache\logging\log4j\log4j-core\2.12.0\log4j-core-2.12.0.jar
  com.cevent.kafka.consumer.DefinedOffSetConsumer
  2020-06-19 16:11:52,572 main ERROR Error
  processing element test ([Loggers: null]): CLASS_NOT_FOUND
  2020-06-19 16:11:52,590 main ERROR
  Unrecognized format specifier [P]
  2020-06-19 16:11:52,590 main ERROR
  Unrecognized conversion specifier [P] starting at position 3 in conversion
  pattern.
  [%P] [2020-06-19 16:11:52]
  [org.apache.kafka.clients.consumer.ConsumerConfig]ConsumerConfig values: 
           auto.commit.interval.ms
  = 1000
           auto.offset.reset
  = latest
           bootstrap.servers
  = [hadoop207.cevent.com:9092]
           check.crcs
  = true
           client.id
  = 
           connections.max.idle.ms
  = 540000
           enable.auto.commit
  = true
           exclude.internal.topics
  = true
           fetch.max.bytes
  = 52428800
           fetch.max.wait.ms
  = 500
           fetch.min.bytes
  = 1
   
  [%P] [2020-06-19 16:11:52]
  [org.apache.kafka.common.utils.AppInfoParser]Kafka version : 0.11.0.0
  [%P] [2020-06-19 16:11:52]
  [org.apache.kafka.common.utils.AppInfoParser]Kafka commitId :
  cb8625948210849f
  [%P] [2020-06-19 16:11:52]
  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Discovered
  coordinator hadoop207.cevent.com:9092 (id: 2147483640 rack: null) for group
  consumer_group.
  [%P] [2020-06-19 16:11:52]
  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Revoking
  previously assigned partitions [] for group consumer_group
  [%P] [2020-06-19 16:11:52]
  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator](Re-)joining
  group consumer_group
  [%P] [2020-06-19 16:11:57] [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]Successfully
  joined group consumer_group with generation 10
  [%P] [2020-06-19 16:11:57]
  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Setting
  newly assigned partitions [cevent_first-3, cevent_first-2, cevent_first-1,
  cevent_first-0] for group consumer_group
   
  
 


5

10.自定义Interceptor(消息预处理)

(1)InterceptorCounterConsumer




 
  
  package com.cevent.kafka.interceptor;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

  

import java.util.Map;

  

  /**

 * @author cevent

 * @description

 * @date 2020/6/19 16:18

 */

  public class InterceptorCounterConsumer implements ProducerInterceptor<String,String>{

  

    private long success=0;

    private long fail=0;

  

    //1.更改即将发送的消息,修改完毕即发送

    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {

        return producerRecord;

    }

  

    //2.收到ACK的回调,可生成事务统计

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

  

        if(e==null){

            success++;

        }else{

            fail++;

        }

    }

  

    //3.Producer关闭的时候,生成事件

    public void close() {

        System.out.println("Interceptor成功发送"+success+" 条");

        System.out.println("Interceptor失败发送"+fail+" 条");

    }

  

    //4.配置

    public void configure(Map<String, ?> map) {

  

    }

}
   
  
 


(2)InterceptorTimeConsumer




 
  
  package com.cevent.kafka.interceptor;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

  

import java.util.Map;

  

  /**

 * @author cevent

 * @description

 * @date 2020/6/19 16:44

 */

  public class InterceptorTimeConsumer implements ProducerInterceptor<String,String> {

    //1.发生之前:更改即将发送的消息,修改完毕即发送

    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {

  

        //获取元消息,添加属性,生成新消息

        String value=producerRecord.value();

        value=System.currentTimeMillis()+" , "+value;

        //producerRecord只读

        ProducerRecord<String,String> newRecord= new ProducerRecord<String,String>(

                producerRecord.topic(),

                producerRecord.partition(),

                producerRecord.timestamp(),

                producerRecord.key(),

                value,

                producerRecord.headers());

  

        return newRecord;

    }

  

    //2.发送之后,收到ACK的回调,可生成事务统计

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

  

    }

  

    //3.Producer关闭的时候,生成事件

    public void close() {

  

    }

  

    //4.配置

    public void configure(Map<String, ?> map) {

  

    }

}
   
  
 


(3)InterceptorProducer




 
  
  package com.cevent.kafka.interceptor;/**

 * Created by Cevent on 2020/6/19.

 */

  

  import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

  

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

  

  /**

 * @author cevent

 * @description

 * @date 2020/6/19 17:05

 */

  public class InterceptorProducer {

  

    public static void main(String[] args) {

        //1.KafkaProducer需要传入的参数为Properties类型,相当于map用法,存放属性配置

        Properties props=new Properties();

        //1.1初始化服务器,获取集群信息

        props.put("bootstrap.servers","hadoop207.cevent.com:9092");

        //1.2:producer发送acknowledgement确认收到-返回值:选举新的leader时,容忍n台节点的故障,需要n+1个副本。安全,延迟高

        //全部完成同步,才发送acks

        props.put("acks","all");

        //1.3kafka挂掉,重置一次

        props.put("retries",1);

        //1.4只有数据积累到batch.size之后,sender才会发送数据,batch的大小

        props.put("batch.size",16384);

        //1.5如果数据迟迟未达到batch.size,sender等待linger.time(延迟)之后就会发送数据

        props.put("linger.ms",1);

        //1.6记录累加器record accumulator:缓冲区大小

        /**

         * 异步发送模式:

         * 消息发送涉及2个线程:main线程、sender线程

         * 1个线程共享变量:RecordAccumulator记录累加器

         * main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker

         * buffer.memory设置RecordAccumulator记录累加器的缓冲区大小

         */

        props.put("buffer.memory",33554432);

  

        //拦截器配置:新建拦截器链(集合)

        List<String> interceptors=new ArrayList<String>();

        interceptors.add("com.cevent.kafka.interceptor.InterceptorTimeConsumer");

        interceptors.add("com.cevent.kafka.interceptor.InterceptorCounterConsumer");

  

        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

  

        //1.7将发送的消息序列化<key,value>为kafka需求的类型,基于发送的消息为string类型,所以引用StringSerializer

        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

  

        //2.接口类型:放入props,获取数据

        Producer<String,String> producer= new KafkaProducer<String,String>(props);

  

        //3.发送10条消息

        for (int i=0;i<11;i++){

            //prducer发送:topic,key,value

            //Future为异步发送,但没有回调函数

            //Future<RecordMetadata> future= producer.send(new ProducerRecord<String, String>("cevent_first","message"+i));

            producer.send(new ProducerRecord<String, String>("cevent_first_interceptor","cevent-interceptor-"+i));

        }

  

        producer.close();

    }

}
   
  
 


(4)校验

6

11.flume对接Kafka

1.配conf



[cevent@hadoop207 hadoop-2.7.2]$ cd /opt/module/apache-flume-1.7.0/

[cevent@hadoop207 apache-flume-1.7.0]$ vim job/flume-file-kafka.conf




 
  
  ###flume配置kafka
  # define
  a1.sources = r1
  a1.sinks = k1
  a1.channels = c1
   
  # source
  a1.sources.r1.type = exec
  a1.sources.r1.command = tail -F -c +0
  /opt/module/datas/flume.log
  a1.sources.r1.shell = /bin/bash -c
   
  # sink
  a1.sinks.k1.type =
  org.apache.flume.sink.kafka.KafkaSink
  a1.sinks.k1.kafka.bootstrap.servers =
  hadoop207.cevent.com:9092,hadoop208.cevent.com:9092,hadoop209.cevent.com:9092
  a1.sinks.k1.kafka.topic = first
  a1.sinks.k1.kafka.flumeBatchSize = 20
  a1.sinks.k1.kafka.producer.acks = 1
  a1.sinks.k1.kafka.producer.linger.ms = 1
   
  # channel
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 1000
  a1.channels.c1.transactionCapacity = 100
   
  # bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  
 


2.启动



1.     启动

[cevent@hadoop207 apache-flume-1.7.0]$ 

bin/flume-ng agent -n
a1 -c conf/ -f job/flume-file-kafka.conf -Dflume.root.logger=INFO,console

Info: Sourcing environment configuration
script /opt/module/apache-flume-1.7.0/conf/flume-env.sh

Info: Including Hadoop libraries found via
(/opt/module/hadoop-2.7.2/bin/hadoop) for HDFS access

 

[cevent@hadoop207 datas]$ cat >> /opt/module/datas/flume.log

kaka

hehe

lala

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐