下面以Hadoop Catalog为例进行讲解

1. 表metadata API

import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.io.{FileIO, LocationProvider}
import org.apache.iceberg.{PartitionSpec, Schema, Snapshot, Table}
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`


object flink_test {

  def main(args: Array[String]): Unit = {

    // =======初始化Hadoop Catalog=============
    val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
    val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)

    // =============加载一个已经存在的表=========
    // 参数分别是数据库名和表名
    val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
    val table: Table = hadoopCatalog.loadTable(tableName)

    // ==============表metadata==================
    // 返回表的Schema
    val schema: Schema = table.schema()

    // 返回表的PartitionSpec
    val partitionSpec: PartitionSpec = table.spec()

    // 返回map形式的key:value属性,本示例返回结果为:{write.format.default=parquet, write.parquet.compression-codec=gzip}
    val properties: java.util.Map[String, String] = table.properties()

    // 返回表当前的Snapshot
    val currentSnapshot: Snapshot = table.currentSnapshot()

    // 根据snapshot id返回对应的Snapshot
    val defineSanpshot: Snapshot = table.snapshot(138573494821828246L)

    // 返回表的所有Snapshot
    val snapshots: Seq[Snapshot] = table.snapshots().toSeq

    // 返回表在HDFS上的路径,本示例结果为:hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user
    val location: String = table.location()

    // 将表更新到最新的version
    table.refresh()

    // 使用FileIO读写table files
    val fileIO: FileIO = table.io()

    // 使用LocationProvider,为data和metadata files创建path
    val locationProvider: LocationProvider = table.locationProvider()


  }
}

2. 表Scanning

my_user表的数据如下:

+------------------+--------------------+-------------------+------------------+
| my_user.user_id  | my_user.user_name  | my_user.birthday  | my_user.country  |
+------------------+--------------------+-------------------+------------------+
| 1                | zhang_san          | 2022-02-01        | china            |
| 2                | zhang_san          | 2022-02-01        | china            |
| 6                | zhang_san          | 2022-02-01        | china            |
| 5                | zhao_liu           | 2022-02-02        | japan            |
+------------------+--------------------+-------------------+------------------+

2.1 File Level

pom.xml添加依赖如下:

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.1</version>
        </dependency>

示例程序如下:

import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog

import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`


object flink_test {

  def main(args: Array[String]): Unit = {

    // =======初始化Hadoop Catalog=============
    val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
    val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)

    // =============加载一个已经存在的表=========
    // 参数分别是数据库名和表名
    val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
    val table: Table = hadoopCatalog.loadTable(tableName)


    // ==============表Scanning==================
    // TableScan是一个不可变的对象
    val tableScan: TableScan =
    table.newScan()
      .filter(Expressions.equal("user_id", 2))
      .select("user_id", "user_name")
    // .asOfTime(timestampMillis:Long)    // 从指定时间戳开始读取数据
    // .useSnapshot(snapshotId:Long)         // 从指定snapshot id开始读取数据

    // 返回files
    val fileScanTaskSeq: Seq[FileScanTask] = tableScan.planFiles().toSeq

    // 返回tasks
    val combinedScanTaskSeq: Seq[CombinedScanTask] = tableScan.planTasks().toSeq

    // 返回读projection
    val scanSchema: Schema = tableScan.schema()

  }
}

2.2 Row level

pom.xml添加如下依赖

       <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-data</artifactId>
            <version>0.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-parquet</artifactId>
            <version>0.13.1</version>
        </dependency>

示例程序如下:

import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.IcebergGenerics.ScanBuilder
import org.apache.iceberg.data.{IcebergGenerics, Record}
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog

import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`


object flink_test {

  def main(args: Array[String]): Unit = {

    // =======初始化Hadoop Catalog=============
    val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
    val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)

    // =============加载一个已经存在的表=========
    // 参数分别是数据库名和表名
    val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
    val table: Table = hadoopCatalog.loadTable(tableName)


    // ==============表Scanning==================
    val scanBuilder: ScanBuilder = IcebergGenerics.read(table)

    val recordSeq: Seq[Record] =
      scanBuilder.where(Expressions.equal("user_id", 2))
        .select("user_id", "user_name")
        .build()
        .toSeq

  }
}

3. 表update操作

表update操作返回的类,都是PendingUpdate[T]的子类,最后都需要调用PendingUpdate.commit()进行update操作提交

示例程序如下:

import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types


object flink_test {

  def main(args: Array[String]): Unit = {

    // =======初始化Hadoop Catalog=============
    val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
    val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)

    // =============加载一个已经存在的表=========
    // 参数分别是数据库名和表名
    val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
    val table: Table = hadoopCatalog.loadTable(tableName)


    // ==============表update操作==================

    // 更新表的schema
    table.updateSchema()
      .addColumn("age", Types.IntegerType.get())
    // .commit()

    // 更新表的properties属性
    val updateProperties: UpdateProperties = table.updateProperties()

    // 更新表的根路径
    val updateLocation: UpdateLocation = table.updateLocation()

    // 添加data files到表
    val appendFiles: AppendFiles = table.newAppend()

    // 添加data files到表, 但不会compact metadata
    val fastAppendFiles: AppendFiles = table.newFastAppend()

    // 添加data files到表, 且删除被覆盖的files
    val overwriteFiles: OverwriteFiles = table.newOverwrite()

    // 删除data files
    val deleteFiles: DeleteFiles = table.newDelete()

    // rewrite data files, 用new versions替换已经存在的files
    val rewriteFiles: RewriteFiles = table.newRewrite()

    // 创建一个新的表级别事务
    val transaction: Transaction = table.newTransaction()

    // 为了更快的scan planning,用clustering files重写manifest
    val rewriteManifests: RewriteManifests = table.rewriteManifests()

    // 对表snapshot进行管理,比如将表state回退到某个snapshot id
    val manageSnapshots: ManageSnapshots = table.manageSnapshots()

  }
}

4. Transactions

作用:在一个原子性的操作中,对一个表的多个改变进行commit

示例程序如下:

import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog


object flink_test {

  def main(args: Array[String]): Unit = {

    // =======初始化Hadoop Catalog=============
    val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
    val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)

    // =============加载一个已经存在的表=========
    // 参数分别是数据库名和表名
    val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
    val table: Table = hadoopCatalog.loadTable(tableName)


    // ==============Transactions==================
    val transaction: Transaction = table.newTransaction()

    // 提交一个delete操作到Transaction
    transaction
      .newDelete()
      .deleteFromRowFilter(Expressions.equal("user_id", 2))
    // .commit()

    // transaction.newAppend().appendFile(DataFile).commit()

    // 提交所有操作到表
    // transaction.commitTransaction()

  }
}

5. Types数据类型

5.1 基础数据类型

	import org.apache.iceberg.types.Types
	import org.apache.iceberg.types.Types.{DecimalType, IntegerType}

    // 没有参数的使用get()
    val integerType:IntegerType = Types.IntegerType.get()

    // 有参数的使用of(params...)
    val decimalType:DecimalType = Types.DecimalType.of(5, 2)    // 第一个参数表示精度,第二个参数表示小数位数

5.2 集合数据类型

注意:集合类型的嵌套字段必须指定唯一字段ID,且嵌套字段可以为optional可选的

    import org.apache.iceberg.types.Types
    import org.apache.iceberg.types.Types.{ListType, MapType, StructType}
    
    // struct<1 id: int, 2 data: optional string>
    val structType:StructType = StructType.of(
      Types.NestedField.required(1, "id", Types.IntegerType.get(), "人员ID"),
      Types.NestedField.optional(2, "name", Types.StringType.get())
    )

    // map<1 key: int, 2 value: optional string>
    val mapType:MapType = MapType.ofOptional(
      1, 2,
      Types.StringType.get(),
      Types.IntegerType.get()
    )


    // array<1 element: int>
    val listType:ListType = ListType.ofRequired(
      1, Types.IntegerType.get()
    )

6. Expressions表达式

Expressions表达式用于表数据Scans

Expression创建后是unbound状态,之后会和Expression对应的字段ID进行绑定,并将表达式的字段数据值,转换为字段对应的数据类型。比如Expressions.equal(“user_name”, “zhang_san”),会和user_name字段的字段ID进行绑定,并将zhang_san转换为user_name对应的字段类型,即字符串类型

示例程序如下:

    import org.apache.iceberg.expressions.{Expressions, False, True}

    // 返回数据类型为org.apache.iceberg.expressions.UnboundPredicate[T],
    // 是org.apache.iceberg.expressions.Expression的子类
    Expressions.isNull("user_name")
    Expressions.notNull("user_name")
    Expressions.equal("user_name", "zhang_san")
    Expressions.notEqual("user_name", "zhang_san")
    Expressions.lessThan("user_id", 3)
    Expressions.lessThanOrEqual("user_id", 3)
    Expressions.greaterThan("user_id", 3)
    Expressions.greaterThanOrEqual("user_id", 3)
    Expressions.in("user_id", 1, 2)
    Expressions.notIn("user_id", 1, 2)
    Expressions.startsWith("user_name", "zhang")
    Expressions.notStartsWith("user_name", "zhang")

    // 与或非, 返回数据类型为org.apache.iceberg.expressions.Expression
    Expressions.and(
      Expressions.isNull("user_name"),
      Expressions.greaterThan("user_id", 1)
    )
    Expressions.or(
      Expressions.isNull("user_name"),
      Expressions.greaterThan("user_id", 1)
    )
    Expressions.not(
      Expressions.isNull("user_name")
    )

    // 返回数据类型为org.apache.iceberg.expressions.Expression的子类
    val alwaysTrue:True = Expressions.alwaysTrue()
    val alwaysFalse:False = Expressions.alwaysFalse()

7. Iceberg各模块说明

Iceberg table相关模块如下:

  • iceberg-common:为其它模块提供实用的classes
  • iceberg-api:包含公共的Iceberg API,比如expressions, types, tables, and operations
  • iceberg-arrow:Iceberg tables使用Apache Arrow作为内存中的数据format,而iceberg-arrow模块实现了Iceberg type系统,以便能读写Iceberg tables中的数据
  • iceberg-core:包含Iceberg API的实现,和对Avro data files的支持。是数据计算引擎依赖的模块
  • iceberg-parquet:可选的,当表使用parquet格式需要用到
  • iceberg-orc:可选的,当表使用orc格式需要用到
  • iceberg-hive-metastore:对Iceberg tables使用Hive作为Catalog的实现

Iceberg数据计算引擎和相关工具模块如下:

  • iceberg-spark:Spark的Iceberg数据源API实现,底层依赖iceberg-spark-runtime-3.1
  • iceberg-flink:Flink的Table和DataStream关于Iceberg的API实现,底层依赖iceberg-flink-runtime
  • iceberg-hive3:对Hive3特殊的序列化解序列化的实现,特殊的序列化解序列化包括Timestamp、TimestampWithZone、Date object inspectors。底层依赖iceberg-hive-runtime
  • iceberg-mr:对MapReduce、Hive InputFormats和SerDes的Iceberg实现。当使用hive时,底层依赖iceberg-hive-runtime
  • iceberg-data:JVM应用客户端读Iceberg的实现
  • iceberg-runtime:生成一个runtime jar包,为Spark集成Iceberg tables提供支持
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐