1 Avro简介

在互联网发展早期,很多项目都是运行在单体架构上,使用Java原生序列化机制能满足大部分场景需求。后面随着业务和访问量的增大,项目架构慢慢迁移到微服务架构。每个微服务可能采用不同的开发语言,而且部分服务对通信性能要求比较高,这时候Java原生的序列化就不能满足要求。因为Java原生序列化机制存在1)不支持跨语言2)序列化后的体积比较大等问题,所以采第三方的序列化协议就显得很有必要。
Apache Avro是Hadoop之父Doug Cutting创建,具有跨语言,序列化后空间小等优点,被广泛应用在大数据领域。Avro模式通常用JSON来写,使用二进制格式编码,支持的简单类型如下:
Avro 支持的简单类型
也支持复杂类型,详见官网https://avro.apache.org/。

2 使用Avro替代Java原生序列化机制

2.1 定义Avro的模式(Schema)

创建Avro的Schema文件user.avsc,写入以下内容:

{
    "namespace": "com.dhhy.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "id", "type": "int"},
        {"name": "phonenum", "type": ["string", "null"]}
    ]
}

这里的变量名name是字符串类型,id是整数类型,都为非空;phonenum是字符串类型,可以为空。

2.2 编译模式

下载依赖avro-tools-1.9.2.jar,执行命令java -jar avro-tools-1.9.2.jar compile schema user.avsc ./
在这里插入图片描述

2.3 创建maven工程

项目名为KafkaStudy,整体文件目录结构如下:
在这里插入图片描述

1)pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.dhhy</groupId>
    <artifactId>kafkaStudy</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
        	<groupId>org.apache.avro</groupId>
        	<artifactId>avro</artifactId>
        	<version>1.9.2</version>
		</dependency>

		<dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
		</dependency>
	</dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2)将模式编译生成的文件夹复制到KafkaStudy工程下,即User.java文件
3)创建一个Java类AvroApp.java,内容如下:

package com.dhhy.avro;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import java.io.File;
import java.io.IOException;

/**
 * Created by JayLai on 2020-02-24 16:44:18
 */
public class AvroApp {

    /**
     * 创建User对象
     * @return
     */
    public User createUser(){
        User user = User.newBuilder()
                .setName("Jay")
                .setId(1024)
                .setPhonenum("18814123346")
                .build();

        return user;
    }

    /**
     * 将User对象序列化到磁盘
     */
    public void serializeUser(User user, File file){
        DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
        DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);

        try {
            dataFileWriter.create(user.getSchema(), file);
            dataFileWriter.append(user);
        } catch (IOException e) {
            e.printStackTrace();
        } finally{
            try{
                dataFileWriter.close();
            }catch(IOException e){
                e.printStackTrace();
            }
        }
    }



    /**
     * 将User对象从磁盘序列化到内存
     */
    public void deserializeUser(File file){
        DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
        DataFileReader<User> dataFileReader = null;


        try {
            dataFileReader = new DataFileReader<User>(file, userDatumReader);
            User user = null;
            while (dataFileReader.hasNext()) {
                user = dataFileReader.next(user);
                System.out.println(user);
            }
            dataFileReader.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally{
            try{
                if(dataFileReader != null){
                    dataFileReader.close();
                }
            }catch(IOException e){
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        AvroApp app = new AvroApp();
        File file = new File("users.avro");
        app.serializeUser(app.createUser(), file);
        app.deserializeUser(file);
    }


}

2.4 测试

运行AvroApp.java,可以看到控制台输出{“name”: “Jay”, “id”: 1024, “phonenum”: “18814123346”},同时本地磁盘多了一个users.avro的文件,说明执行成功,内存中的User对象能通过Avro序列化到本地磁盘,同时也能通过Avro反序列化到内存中。
在这里插入图片描述

3 在Kafka中使用Avro进行编解码

由于消息队列Kafka具有高吞吐等优点,在大数据领域,上下游数据的实时交互大部分是通过Kafka实现。上游可能使用不同开发语言,如C++,Java等,使用Avro将数据进行序列化后写入Kafka的broker中;下游的客户端同样也存在多种开发语言,再使用Avro进行反序列化,如图所示。

在这里插入图片描述

在KafkaStudy工程的基础上,删掉无关的2个文件AvroApp.java和user.varo,整体文件结构如下:
在这里插入图片描述

3.1 创建Topic

使用Xshell登陆KAFKA_HOME,创建名为avro_topic的Topic

hadoop@ubuntu:~/app/kafka$ ls
bin  config  kafka_log  libs  LICENSE  logs  NOTICE  site-docs  start.sh
hadoop@ubuntu:~/app/kafka$:bin/kafka-topics.sh --create --zookeeper 192.168.0.131:2181 --partitions 2 --replication-factor 1 --topic avro_topic
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "avro_topic".

3.2 生产者端的话单序列化类

package com.dhhy.avro;

import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

/**
 * 序列化类
 * Created by JayLai on 2020-03-24 22:17:40
 */
public class AvroSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, User data) {
        if(data == null) {
            return null;
        }
        DatumWriter<User> writer = new SpecificDatumWriter<>(data.getSchema());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
        try {
            writer.write(data, encoder);
        }catch (IOException e) {
            throw new SerializationException(e.getMessage());
        }
        return out.toByteArray();
    }

    @Override
    public void close() {

    }
}

3.3 生产者

package com.dhhy.avro;

import com.google.gson.Gson;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Created by JayLai on 2020-02-24 19:55:29
 */
public class AvroProducer {
    public static final String brokerList = "192.168.0.131:9092,192.168.0.132:9092,192.168.0.133:9092";
    public static final String topic = "avro_topic";
    static int count = 0;


    /**
     * 创建User对象
     * @return
     */
    public static User createUser(){
        User user = User.newBuilder()
                .setName("Jay")
                .setId(++count)
                .setPhonenum("18814123456")
                .build();

        return user;
    }

    public static void main(String[] args) {
        User[] users = new User[10];
        for (int i = 0; i < 10; i++){
            users[i] = createUser();
        }


        Properties properties =  new Properties();
        properties.put("bootstrap.servers", brokerList);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "com.dhhy.avro.AvroSerializer");


        //自己直生产者客户端参数并创建KafkaProducer实例
        KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
        //发送消息
        Map<String, Object> map = new HashMap<>();
        Gson gson= new Gson();
        try {
            for (User user : users) {
                ProducerRecord<String, User> record = new ProducerRecord<>(topic, user);
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null){
                            exception.printStackTrace();
                        }else{
                            map.put("topic", metadata.topic());
                            map.put("partition", metadata.partition());
                            map.put("offset", metadata.offset());
                            map.put("user", user);
                            System.out.println(gson.toJson(map));
                        }
                    }
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //关闭生产着客户端实例
            if(producer != null){
                producer.close();
            }
        }

    }
}

3.4 消费者端的话单反序列化类

package com.dhhy.avro;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;

/**
 * 反序列化类
 * Created by JayLai on 2020-03-24 22:30:03
 */
public class AvroDeserializer implements Deserializer<User>{
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }


    @Override
    public void close() {

    }

    @Override
    public User deserialize(String topic, byte[] data) {
        if(data == null) {
            return null;
        }
        User user = new User();
        ByteArrayInputStream in = new ByteArrayInputStream(data);
        DatumReader<User> userDatumReader = new SpecificDatumReader<>(User.getClassSchema());
        BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
        try {
            user = userDatumReader.read(null, decoder);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return user;
    }
}
3.4 消费者
package com.dhhy.avro;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Created by JayLai on 2020-03-24 23:34:17
 */
public class AvroConsumer {
    public static final String brokerList = "192.168.0.131:9092,192.168.0.132:9092,192.168.0.133:9092";
    public static final String topic = "avro_topic";
    public static final String groupId = "avro_group_001";
    public static final AtomicBoolean isRunning =new AtomicBoolean(true);

    public static void main(String[] args) {
        Properties properties =  new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "com.dhhy.avro.AvroDeserializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put ("group.id", groupId);
        properties.put ("auto.offset.reset", "earliest");


        //创建一个消费者客户端实例
        KafkaConsumer<String, User> consumer = new KafkaConsumer(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList(topic));
        consumer.partitionsFor(topic);

        //循环消货消息
        while (isRunning.get()) {
            ConsumerRecords<String, User> records =
                    consumer.poll(5000);

            System.out.println(records.count());

            for (ConsumerRecord<String, User> record :records  ) {
                System.out.println("{topic:"  + record.topic() + " ,partition:" + record.partition()
                        +  " ,offset:" + record.offset() +
                        " ,key:" + record.topic() + " ,value:" +record.value().toString() + "}");
            }

        }

    }
}

3.5 测试

先启动生产者AvroProducer类,再运行消费者AvroConsumer类,可以看到消费者的控制台打印了消费记录。
在这里插入图片描述

4 Spark消费以Avro编码的kafka消息

Kafk下游的消费者也可以是不同的大数据组件,如Spark Streaming, Flink等,在这里以Spark Streaming为例子。
在这里插入图片描述

4.1 创建maven工程

在实际工作中,Spark的应用主要以Scala开发为主,因此需要重新创建一个Scala工程,名称为sparkstudy。
Pom文件的内容如下:


```markup

```yaml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>


  <groupId>com.dhhy</groupId>
  <artifactId>sparkstudy</artifactId>
  <version>1.0-SNAPSHOT</version>

    <properties>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
    </properties>

    <dependencies>

        <!--scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--SparkSQL-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.spark-project.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1.spark2</version>
        </dependency>


        <!-- Spark ML-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <!-- Spark Streaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <!--Spark Streaming整合kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>


        <!--avro序列化 -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>



    </dependencies>


    <build>
        <plugins>
            <!-- mixed scala/java compile -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <id>compile</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <phase>compile</phase>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                        <phase>test-compile</phase>
                    </execution>
                    <execution>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- for jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>assemble-all</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>com.dhhy.spark.streaming.kafkaConsumerApp</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>

    </repositories>
</project>

并将User.java和AvroDeserializer.java两个类引入到sparkstudy中。

4.2 Spark应用的消费类

package com.dhhy.avro

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object AvroSparkConsumer {

  def main(args: Array[String]): Unit = {

    //配置项
    val conf = new SparkConf().setMaster("local[2]").setAppName("AvroSparkConsumer")

    val ssc = new StreamingContext(conf, Seconds(5))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.0.131:9092,192.168.0.132:9092,192.168.0.133:9092", //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer], //序列化类型,此处为字符类型
      "value.deserializer" -> classOf[AvroDeserializer], //序列化类型,此处为AVRO
      "group.id" -> "AvroConsumeGroup2", //Kafka消费组
      "auto.offset.reset" -> "earliest" //读取最新offset
    )


    //kafka的话题
    val topics = Array("avro_topic")
    val stream: InputDStream[ConsumerRecord[String, User]] = KafkaUtils.createDirectStream[String, User](
      ssc,
      PreferConsistent,
      Subscribe[String, User](topics, kafkaParams)
    )

    stream.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>
        partition.foreach(record =>
          println("{" +
            "topic:" + record.topic() +
            " ,partition:" + record.partition() +
            " ,offset:" + record.offset() +
            " ,key:" + record.topic() +
            " ,value:" + record.value().toString() +
            "}")
        )
      )

    )


    ssc.start()
    ssc.awaitTermination()
  }

}

4.3 工程的目录结构

在这里插入图片描述

4.4 测试

运行AvroSparkConsumer.scala,消费的结果打印在控制台上:
在这里插入图片描述

5 参考文献

[1] Hadoop权威指南 第4版. 2017, 清华大学出版社.
[2] Avro官网 https://avro.apache.org/
[3] 深入理解Kafka:核心设计与实践原理 第1版. 2019, 电子工业出版社.
[4] Spark官网 http://spark.apache.org/

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐