Apahce Avro入门及其在Spark中的应用
1 Avro简介在互联网发展早期,很多项目都是运行在单体架构上,使用Java原生序列化机制能满足大部分场景需求。后面随着业务和访问量的增大,项目架构慢慢迁移到微服务架构。每个微服务可能采用不同的开发语言,而且部分服务对通信性能要求比较高,这时候Java原生的序列化就不能满足要求。因为Java原生序列化机制存在1)不支持跨语言2)序列化后的体积比较大等问题,所以采第三方的序列化协议就显得很有必要。.
1 Avro简介
在互联网发展早期,很多项目都是运行在单体架构上,使用Java原生序列化机制能满足大部分场景需求。后面随着业务和访问量的增大,项目架构慢慢迁移到微服务架构。每个微服务可能采用不同的开发语言,而且部分服务对通信性能要求比较高,这时候Java原生的序列化就不能满足要求。因为Java原生序列化机制存在1)不支持跨语言2)序列化后的体积比较大等问题,所以采第三方的序列化协议就显得很有必要。
Apache Avro是Hadoop之父Doug Cutting创建,具有跨语言,序列化后空间小等优点,被广泛应用在大数据领域。Avro模式通常用JSON来写,使用二进制格式编码,支持的简单类型如下:
也支持复杂类型,详见官网https://avro.apache.org/。
2 使用Avro替代Java原生序列化机制
2.1 定义Avro的模式(Schema)
创建Avro的Schema文件user.avsc,写入以下内容:
{
"namespace": "com.dhhy.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int"},
{"name": "phonenum", "type": ["string", "null"]}
]
}
这里的变量名name是字符串类型,id是整数类型,都为非空;phonenum是字符串类型,可以为空。
2.2 编译模式
下载依赖avro-tools-1.9.2.jar,执行命令java -jar avro-tools-1.9.2.jar compile schema user.avsc ./
2.3 创建maven工程
项目名为KafkaStudy,整体文件目录结构如下:
1)pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dhhy</groupId>
<artifactId>kafkaStudy</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2)将模式编译生成的文件夹复制到KafkaStudy工程下,即User.java文件
3)创建一个Java类AvroApp.java,内容如下:
package com.dhhy.avro;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import java.io.File;
import java.io.IOException;
/**
* Created by JayLai on 2020-02-24 16:44:18
*/
public class AvroApp {
/**
* 创建User对象
* @return
*/
public User createUser(){
User user = User.newBuilder()
.setName("Jay")
.setId(1024)
.setPhonenum("18814123346")
.build();
return user;
}
/**
* 将User对象序列化到磁盘
*/
public void serializeUser(User user, File file){
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
try {
dataFileWriter.create(user.getSchema(), file);
dataFileWriter.append(user);
} catch (IOException e) {
e.printStackTrace();
} finally{
try{
dataFileWriter.close();
}catch(IOException e){
e.printStackTrace();
}
}
}
/**
* 将User对象从磁盘序列化到内存
*/
public void deserializeUser(File file){
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = null;
try {
dataFileReader = new DataFileReader<User>(file, userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
dataFileReader.close();
} catch (IOException e) {
e.printStackTrace();
} finally{
try{
if(dataFileReader != null){
dataFileReader.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
AvroApp app = new AvroApp();
File file = new File("users.avro");
app.serializeUser(app.createUser(), file);
app.deserializeUser(file);
}
}
2.4 测试
运行AvroApp.java,可以看到控制台输出{“name”: “Jay”, “id”: 1024, “phonenum”: “18814123346”},同时本地磁盘多了一个users.avro的文件,说明执行成功,内存中的User对象能通过Avro序列化到本地磁盘,同时也能通过Avro反序列化到内存中。
3 在Kafka中使用Avro进行编解码
由于消息队列Kafka具有高吞吐等优点,在大数据领域,上下游数据的实时交互大部分是通过Kafka实现。上游可能使用不同开发语言,如C++,Java等,使用Avro将数据进行序列化后写入Kafka的broker中;下游的客户端同样也存在多种开发语言,再使用Avro进行反序列化,如图所示。
在KafkaStudy工程的基础上,删掉无关的2个文件AvroApp.java和user.varo,整体文件结构如下:
3.1 创建Topic
使用Xshell登陆KAFKA_HOME,创建名为avro_topic的Topic
hadoop@ubuntu:~/app/kafka$ ls
bin config kafka_log libs LICENSE logs NOTICE site-docs start.sh
hadoop@ubuntu:~/app/kafka$:bin/kafka-topics.sh --create --zookeeper 192.168.0.131:2181 --partitions 2 --replication-factor 1 --topic avro_topic
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "avro_topic".
3.2 生产者端的话单序列化类
package com.dhhy.avro;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
/**
* 序列化类
* Created by JayLai on 2020-03-24 22:17:40
*/
public class AvroSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, User data) {
if(data == null) {
return null;
}
DatumWriter<User> writer = new SpecificDatumWriter<>(data.getSchema());
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
try {
writer.write(data, encoder);
}catch (IOException e) {
throw new SerializationException(e.getMessage());
}
return out.toByteArray();
}
@Override
public void close() {
}
}
3.3 生产者
package com.dhhy.avro;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Created by JayLai on 2020-02-24 19:55:29
*/
public class AvroProducer {
public static final String brokerList = "192.168.0.131:9092,192.168.0.132:9092,192.168.0.133:9092";
public static final String topic = "avro_topic";
static int count = 0;
/**
* 创建User对象
* @return
*/
public static User createUser(){
User user = User.newBuilder()
.setName("Jay")
.setId(++count)
.setPhonenum("18814123456")
.build();
return user;
}
public static void main(String[] args) {
User[] users = new User[10];
for (int i = 0; i < 10; i++){
users[i] = createUser();
}
Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "com.dhhy.avro.AvroSerializer");
//自己直生产者客户端参数并创建KafkaProducer实例
KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
//发送消息
Map<String, Object> map = new HashMap<>();
Gson gson= new Gson();
try {
for (User user : users) {
ProducerRecord<String, User> record = new ProducerRecord<>(topic, user);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null){
exception.printStackTrace();
}else{
map.put("topic", metadata.topic());
map.put("partition", metadata.partition());
map.put("offset", metadata.offset());
map.put("user", user);
System.out.println(gson.toJson(map));
}
}
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭生产着客户端实例
if(producer != null){
producer.close();
}
}
}
}
3.4 消费者端的话单反序列化类
package com.dhhy.avro;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;
/**
* 反序列化类
* Created by JayLai on 2020-03-24 22:30:03
*/
public class AvroDeserializer implements Deserializer<User>{
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public void close() {
}
@Override
public User deserialize(String topic, byte[] data) {
if(data == null) {
return null;
}
User user = new User();
ByteArrayInputStream in = new ByteArrayInputStream(data);
DatumReader<User> userDatumReader = new SpecificDatumReader<>(User.getClassSchema());
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
try {
user = userDatumReader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
return user;
}
}
3.4 消费者
package com.dhhy.avro;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by JayLai on 2020-03-24 23:34:17
*/
public class AvroConsumer {
public static final String brokerList = "192.168.0.131:9092,192.168.0.132:9092,192.168.0.133:9092";
public static final String topic = "avro_topic";
public static final String groupId = "avro_group_001";
public static final AtomicBoolean isRunning =new AtomicBoolean(true);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.dhhy.avro.AvroDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put ("group.id", groupId);
properties.put ("auto.offset.reset", "earliest");
//创建一个消费者客户端实例
KafkaConsumer<String, User> consumer = new KafkaConsumer(properties);
//订阅主题
consumer.subscribe(Collections.singletonList(topic));
consumer.partitionsFor(topic);
//循环消货消息
while (isRunning.get()) {
ConsumerRecords<String, User> records =
consumer.poll(5000);
System.out.println(records.count());
for (ConsumerRecord<String, User> record :records ) {
System.out.println("{topic:" + record.topic() + " ,partition:" + record.partition()
+ " ,offset:" + record.offset() +
" ,key:" + record.topic() + " ,value:" +record.value().toString() + "}");
}
}
}
}
3.5 测试
先启动生产者AvroProducer类,再运行消费者AvroConsumer类,可以看到消费者的控制台打印了消费记录。
4 Spark消费以Avro编码的kafka消息
Kafk下游的消费者也可以是不同的大数据组件,如Spark Streaming, Flink等,在这里以Spark Streaming为例子。
4.1 创建maven工程
在实际工作中,Spark的应用主要以Scala开发为主,因此需要重新创建一个Scala工程,名称为sparkstudy。
Pom文件的内容如下:
```markup
```yaml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dhhy</groupId>
<artifactId>sparkstudy</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.0</spark.version>
</properties>
<dependencies>
<!--scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--SparkSQL-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>
<!-- Spark ML-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Spark Streaming整合kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!--avro序列化 -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- mixed scala/java compile -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- for jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.dhhy.spark.streaming.kafkaConsumerApp</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>aliyunmaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
</project>
并将User.java和AvroDeserializer.java两个类引入到sparkstudy中。
4.2 Spark应用的消费类
package com.dhhy.avro
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object AvroSparkConsumer {
def main(args: Array[String]): Unit = {
//配置项
val conf = new SparkConf().setMaster("local[2]").setAppName("AvroSparkConsumer")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.0.131:9092,192.168.0.132:9092,192.168.0.133:9092", //kafka集群地址
"key.deserializer" -> classOf[StringDeserializer], //序列化类型,此处为字符类型
"value.deserializer" -> classOf[AvroDeserializer], //序列化类型,此处为AVRO
"group.id" -> "AvroConsumeGroup2", //Kafka消费组
"auto.offset.reset" -> "earliest" //读取最新offset
)
//kafka的话题
val topics = Array("avro_topic")
val stream: InputDStream[ConsumerRecord[String, User]] = KafkaUtils.createDirectStream[String, User](
ssc,
PreferConsistent,
Subscribe[String, User](topics, kafkaParams)
)
stream.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach(record =>
println("{" +
"topic:" + record.topic() +
" ,partition:" + record.partition() +
" ,offset:" + record.offset() +
" ,key:" + record.topic() +
" ,value:" + record.value().toString() +
"}")
)
)
)
ssc.start()
ssc.awaitTermination()
}
}
4.3 工程的目录结构
4.4 测试
运行AvroSparkConsumer.scala,消费的结果打印在控制台上:
5 参考文献
[1] Hadoop权威指南 第4版. 2017, 清华大学出版社.
[2] Avro官网 https://avro.apache.org/
[3] 深入理解Kafka:核心设计与实践原理 第1版. 2019, 电子工业出版社.
[4] Spark官网 http://spark.apache.org/
更多推荐
所有评论(0)