flink读取kafka数据存储iceberg
使用flink实时的读取kafka的数据,并且实时的存储到iceberg中。
1、说明
使用flink实时的读取kafka的数据,并且实时的存储到iceberg中。好处是可以一边存数据,一边查询数据。当然使用clickhouse也可以实现数据的既存既取。而hive数据既存既读则会有问题。iceberg中数据读写数据都是从快照中开始的,读和写对应的不同快照,所以读写互不影响。而hive中写的时候数据就不能读。
下面是使用flink读取kafka数据存储到iceberg的例子。本案例,可以直接在本地直接运行,无需搭建hadoop,hive集群。其中遇到的问题及解决思路。用到kafka,可以直接使用docker,来搞一个,跑起来。
2、实现步骤
1)确保flink和iceberg的版本对应
这里使用的是(flink:1.13.5,iceberg: 0.12.1 )
2) 创建流式执行环境
-
使用getExecutionEnvironment()的静态方法可以自动识别是本地环境还是集群服务环境。当然也可以使用createLocalEnvironment()方法创建本地环境。
该环境变量类似于上下文,可以配置一些基本的信息,如并行度(默认是CPU数)、检查时间间隔(默认不检查)。
-
这里设置检查为5000毫秒,到检查时间的时候 ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据(必须设置checkpoint)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
3)指定kafka数据源创建数据流
-
设置读取kafka的一些参数,值的序列化使用的是字符串序列化模式(SimpleStringSchema);开始读取kafka数据,偏移量设置,则从最新的数据开始读取(latest)。
-
从指定的
source
(源)中创建一个数据流。这里的source
可能是Kafka
读取数据,当然还可以从其他外部源如socket
、Kinesis
或其他数据源读取的数据。WatermarkStrategy.noWatermarks()
,这是一个水印策略。用于处理事件时间(event-time)和处理有延迟或乱序的数据。WatermarkStrategy.noWatermarks()
表示我们不想为这个数据流生成任何水印。这意味着我们可能是在做纯粹的基于处理时间(processing-time)的流处理,或者我们不关心事件时间的顺序。"kafka_source"
,这是给这个数据源分配的名称,主要用于日志记录和调试。
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9094,node2:9094")
.setTopics("topic_users")
.setGroupId("flink-test-1")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
DataStreamSource<String> kafkaDs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");
4)流式数据转换器
- 上一步获取到的数据流,将数据流中的json字符串转为
RowData
. 当然输入的数据格式可能是其它格式,根据实际情况进行修改。 - 注意
GenericRowData
中字符串格式必须使用StringData.fromString(session.userId)
来转换一下,否则无法存储到iceberg表中。
SingleOutputStreamOperator<RowData> dataStream = kafkaDs.map((MapFunction<String, RowData>) value -> {
Gson gson = new Gson();
Sessions session = gson.fromJson(value, Sessions.class);
GenericRowData row = new GenericRowData(9);
row.setField(0, session.version);
row.setField(1, StringData.fromString(session.userId));
row.setField(2, StringData.fromString(session.appType));
row.setField(3, session.loginTime);
row.setField(4, StringData.fromString(session.clientIp));
row.setField(5, StringData.fromString(session.service));
row.setField(6, session.status);
row.setField(7, StringData.fromString(session.channel));
row.setField(8, StringData.fromString(timeStamp2DateStr(session.loginTime)));
return row;
});
5)创建表(创建Catalog、表Id,表Schema、表分区、表文件存储格式等)
这里内容看起来比较多,但也就是一件事,建表。
-
创建 Hadoop 配置: 加载系统默认配置(core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)
-
创建 Iceberg 的 Hadoop Catalog: 用来管理表创建、删除,以及元数据存储位置。这里使用hadoop存储元数据和实际数据。
-
指定表标识符: 对应数据库名称和表的名称。对应的数据也会存在
iceberg_db/flink_iceberg_tbl
目录下。 -
定义表的 Schema:
这部分代码定义了表的结构,指定了每个字段的名称和类型。 -
定义分区:
PartitionSpec.builderFor(schema).identity("hour_p").build();
按照hour_p
字段来分区数据. 如果不见分许则使用PartitionSpec.unpartitioned()
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("hour_p").build();
-
设置表属性: 设置了默认的文件格式为
PARQUET
。 -
检查表是否存在,如果不存在则创建
-
创建 TableLoader:
TableLoader
是用于加载Iceberg表的工具,这个目录是本地的数据存储的目录,以及数据库、数据表对应的名称。如果使用hdfs,则改为相应目录结构即可。
TableLoader tableLoader = TableLoader.fromHadoopTable("data/flinkwarehouse/iceberg_db/flink_iceberg_tbl");
// 创建默认的配置类,会自动加载hadoop相关的配置文件
Configuration hadoopConf = new Configuration();
// 设置Catalog的存储位置
Catalog catalog = new HadoopCatalog(hadoopConf, "data/flinkwarehouse");
// iceberg 数据库名称,数据表名称
TableIdentifier name = TableIdentifier.of("iceberg_db", "flink_iceberg_tbl");
// 数据表明模式,以及字段名称,字段类型
Schema schema = new Schema(
Types.NestedField.required(1, "version", Types.IntegerType.get()),
Types.NestedField.required(2, "userId", Types.StringType.get()),
Types.NestedField.required(3, "appType", Types.StringType.get()),
Types.NestedField.required(4, "loginTime", Types.LongType.get()),
Types.NestedField.required(5, "clientIp", Types.StringType.get()),
Types.NestedField.required(6, "service", Types.StringType.get()),
Types.NestedField.required(7, "status", Types.IntegerType.get()),
Types.NestedField.required(8, "channel", Types.StringType.get()),
Types.NestedField.required(9, "hour_p", Types.StringType.get())
);
// 设置分区 PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("hour_p").build();
// 设置 默认文件存储格式 parquet格式
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = null;
if (!catalog.tableExists(name)) {
table = catalog.createTable(name, schema, spec, props);
} else {
table = catalog.loadTable(name);
}
TableLoader tableLoader = TableLoader.fromHadoopTable("data/flinkwarehouse/iceberg_db/flink_iceberg_tbl");
6)创建FlinkSink读取流式数据存储到表中
- 接收一个数据流
- 指定相应的表
- 数据写入加载器中
7)执行环境开始执行任务
这里给任务一个简短的描述。
env.execute("iceberg api and flink ");
3、全部代码如下
1)build.gradle依赖如下配置
def flink = [
version: '1.13.5'
]
def hadoop = [
version: '3.2.2'
]
def iceberg = [
version: '0.12.1'
]
dependencies {
implementation 'com.alibaba.ververica:ververica-connector-iceberg:1.13-vvr-4.0.7'
implementation "org.apache.iceberg:iceberg-flink-runtime:${iceberg.version}"
implementation "org.apache.flink:flink-java:${flink.version}"
implementation "org.apache.flink:flink-streaming-java_2.11:${flink.version}"
implementation "org.apache.flink:flink-clients_2.11:${flink.version}"
implementation "org.apache.flink:flink-streaming-scala_2.11:${flink.version}"
implementation "org.apache.flink:flink-connector-kafka_2.11:${flink.version}"
implementation "org.apache.flink:flink-connector-base:${flink.version}"
implementation "org.apache.hadoop:hadoop-client:${hadoop.version}"
implementation "org.apache.flink:flink-table-runtime-blink_2.11:${flink.version}"
implementation "org.apache.flink:flink-table:${flink.version}"
implementation "org.apache.flink:flink-table-common:${flink.version}"
implementation "org.apache.flink:flink-table-api-java:${flink.version}"
implementation "org.apache.flink:flink-table-api-java-bridge_2.11:${flink.version}"
implementation "org.apache.flink:flink-table-planner_2.11:${flink.version}"
implementation "org.apache.flink:flink-table-planner-blink_2.11:${flink.version}"
testImplementation 'junit:junit:4.11'
// log4j and slf4j dependencies
testImplementation 'org.slf4j:slf4j-log4j12:1.7.25'
testImplementation 'log4j:log4j:1.2.17'
implementation 'org.slf4j:slf4j-api:1.7.25'
testImplementation 'org.slf4j:slf4j-nop:1.7.25'
testImplementation 'org.slf4j:slf4j-simple:1.7.5'
implementation 'com.google.code.gson:gson:2.3.1'
// 没有这个会出现报错,使用compile
compile 'com.google.guava:guava:28.2-jre'
}
2)案例代码
package com.subao.flink;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
public class FlinkIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据(必须设置checkpoint)
env.enableCheckpointing(5000);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("124.70.194.33:9094,124.71.180.217:9094")
.setTopics("sessions")
.setGroupId("flink-test-1")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
DataStreamSource<String> kafkaDs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");
SingleOutputStreamOperator<RowData> dataStream = kafkaDs.map((MapFunction<String, RowData>) value -> {
Gson gson = new Gson();
Sessions session = gson.fromJson(value, Sessions.class);
GenericRowData row = new GenericRowData(9);
row.setField(0, session.version);
row.setField(1, StringData.fromString(session.userId));
row.setField(2, StringData.fromString(session.appType));
row.setField(3, session.loginTime);
row.setField(4, StringData.fromString(session.clientIp));
row.setField(5, StringData.fromString(session.service));
row.setField(6, session.status);
row.setField(7, StringData.fromString(session.channel));
row.setField(8, StringData.fromString(timeStamp2DateStr(session.loginTime)));
System.out.println(row);
return row;
});
// 创建默认的配置类,会自动加载hadoop相关的配置文件
Configuration hadoopConf = new Configuration();
// 设置Catalog的存储位置
Catalog catalog = new HadoopCatalog(hadoopConf, "data/flinkwarehouse");
// iceberg 数据库名称,数据表名称
TableIdentifier name = TableIdentifier.of("iceberg_db", "flink_iceberg_tbl");
// 数据表明模式,以及字段名称,字段类型
Schema schema = new Schema(
Types.NestedField.required(1, "version", Types.IntegerType.get()),
Types.NestedField.required(2, "userId", Types.StringType.get()),
Types.NestedField.required(3, "appType", Types.StringType.get()),
Types.NestedField.required(4, "loginTime", Types.LongType.get()),
Types.NestedField.required(5, "clientIp", Types.StringType.get()),
Types.NestedField.required(6, "service", Types.StringType.get()),
Types.NestedField.required(7, "status", Types.IntegerType.get()),
Types.NestedField.required(8, "channel", Types.StringType.get()),
Types.NestedField.required(9, "hour_p", Types.StringType.get())
);
// 设置分区 PartitionSpec spec = PartitionSpec.unpartitioned();
PartitionSpec spec = PartitionSpec.builderFor(schema).identity("hour_p").build();
// 设置 默认文件存储格式 parquet格式
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
Table table = null;
if (!catalog.tableExists(name)) {
table = catalog.createTable(name, schema, spec, props);
} else {
table = catalog.loadTable(name);
}
TableLoader tableLoader = TableLoader.fromHadoopTable("data/flinkwarehouse/iceberg_db/flink_iceberg_tbl");
FlinkSink
.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.overwrite(false)
.build();
env.execute("iceberg api and flink ");
}
private static String timeStamp2DateStr(long timestamp) {
// 将时间戳转为LocalDateTime对象
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp * 1000), ZoneId.systemDefault());
// 定义日期时间格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHH");
// 格式化日期时间
return dateTime.format(formatter);
}
class Sessions {
private int version;
private String userId;
private String appType;
private long loginTime;
private String clientIp;
private String service;
private int status;
private String channel;
public Sessions(int version, String userId, String appType, long loginTime, String clientIp, String service, int status, String channel) {
this.version = version;
this.userId = userId;
this.appType = appType;
this.loginTime = loginTime;
this.clientIp = clientIp;
this.service = service;
this.status = status;
this.channel = channel;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getAppType() {
return appType;
}
public void setAppType(String appType) {
this.appType = appType;
}
public long getLoginTime() {
return loginTime;
}
public void setLoginTime(long loginTime) {
this.loginTime = loginTime;
}
public String getClientIp() {
return clientIp;
}
public void setClientIp(String clientIp) {
this.clientIp = clientIp;
}
public String getService() {
return service;
}
public void setService(String service) {
this.service = service;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
@Override
public String toString() {
return "Sessions{" +
"version=" + version +
", userId='" + userId + '\'' +
", appType='" + appType + '\'' +
", loginTime=" + loginTime +
", clientIp='" + clientIp + '\'' +
", service='" + service + '\'' +
", status=" + status +
", channel='" + channel + '\'' +
'}';
}
}
}
4、遇到的问题
1)找不到方法 java.lang.NoSuchMethodError:com.google.common.base.Preconditions.checkState。
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V
这个主要是相应的com.google.guava的版本不兼容,在build.gradle中加入如下即可
compile 'com.google.guava:guava:28.2-jre'
排查思路:
a) 看到这个我就想应该是相应的包没有正常添加到依赖中。
可以百度一下看看是哪个包,发现是com.google.guava包。
b) 检查现有依赖中是否包含该包
因为使用的是gradle,所以使用gradle的命令:gradle dependencies
,从结果中查看,发现使用的hadoop
中有多个包,版本不同,最大的是27.0的包。
c) 尝试使用高版本的guava的包
compile 'com.google.guava:guava:28.2-jre'
经测试是可以的。但这里主要要使用compile
关键字。
compile
和implementation
都表示依赖文件在引入项目的编译期、运行期和测试期使用。但是compile
标识的包可被其它依赖包使用,implementation
则不可以。
compile
关键字会将依赖项传递到项目的所有模块和依赖项中。这意味着如果模块 A 依赖于模块 B,并且模块 B 使用了compile
关键字引入了一些依赖项,那么这些依赖项也会传递给模块 A。
implementation
关键字只将依赖项传递到当前模块中,不会传递给依赖模块。这意味着如果模块 A 依赖于模块 B,并且模块 B 使用了implementation
关键字引入了一些依赖项,那么这些依赖项不会传递给模块 A。由于
compile
关键字会引入依赖的传递性,可能导致不可预期的副作用和冲突。为了解决这个问题,从 Gradle 3.4 版本开始,建议使用implementation
关键字代替compile
关键字,以减少依赖项传递引起的问题。
2) 数据文件为空
flink能够读取kafka的数据,但是不能将数据写到文件中。怎么办?于是问了问GPT,它说看看日志。我没有配置log4j的日志配置,当然看不到错误日志了。于是我就加上log4j.properties
的配置文件。然后发现了如下错误:
2023-08-08 19:38:16 DEBUG JobMaster:658 - Archive local failure causing attempt 8d8beb11aa845d873ebbc317b21bf435 to fail: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.table.data.StringData
at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221)
at org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:453)
原来是数据格式不兼容的问题,对于Flink中的RowData中使用的是StringData表示字符串,因此在存入的时候需要把String类型转为StringData即可。如:
row.setField(2, StringData.fromString(session.appType));
总结:flink读取kafak数据用户的应该是比较多的,但是保存到iceberg中,这个相对用的比较少。flink中比较常用flink-sql来处理和数据表相关的数据。
更多推荐
所有评论(0)