Flink自定义Catalog之oracle

概述

Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

从Flink1.15后官方提供了oracle版Table的DDL定义未提供catalog的DDL定义。为满足大数据中间件要求,参考官方PostgreSQL的catalog源代码,扩展JdbcCatalog实现oracle的catalog。

JdbcCatalog使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。

具体实现

第一步创建OracleCatalog类

创建OracleCatalog类继承于AbstractJdbcCatalog抽象类,新类能够复用catalog常规功能。在新类中覆盖部分方法实现专有功能。

public class OracleCatalog extends AbstractJdbcCatalog {

    private static final Logger LOG = LoggerFactory.getLogger(OracleCatalog.class);

    private final JdbcDialectTypeMapper dialectTypeMapper;

    private static final Set<String> builtinDatabases = new HashSet<String>() {

        {

            add("SCOTT");

            add("ANONYMOUS");

            add("XS$NULL");

            add("DIP");

            add("SPATIAL_WFS_ADMIN_USR");

            add("SPATIAL_CSW_ADMIN_USR");

            add("APEX_PUBLIC_USER");

            add("ORACLE_OCM");

            add("MDDATA");

        }

    };

    public OracleCatalog(ClassLoader userClassLoader,

                         String catalogName,

                         String defaultDatabase,

                         String username,

                         String pwd,

                         String baseUrl) {

        super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);

        String driverVersion =

                Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null.");

        String databaseVersion =

                Preconditions.checkNotNull(

                        getDatabaseVersion(), "Database version must not be null.");

        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);

        this.dialectTypeMapper = new OracleTypeMapper(databaseVersion, driverVersion);

    }

    private String getDatabaseVersion() {

        try (TemporaryClassLoaderContext ignored =

                     TemporaryClassLoaderContext.of(userClassLoader)) {

            try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

                return conn.getMetaData().getDatabaseProductVersion();

            } catch (Exception e) {

                throw new CatalogException(

                        String.format("Failed in getting Oracle version by %s.", defaultUrl), e);

            }

        }

    }

    private String getDriverVersion() {

        try (TemporaryClassLoaderContext ignored =

                     TemporaryClassLoaderContext.of(userClassLoader)) {

            try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

                String driverVersion = conn.getMetaData().getDriverVersion();

                Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");

                Matcher matcher = regexp.matcher(driverVersion);

                return matcher.find() ? matcher.group(0) : null;

            } catch (Exception e) {

                throw new CatalogException(

                        String.format("Failed in getting Oracle driver version by %s.", defaultUrl),

                        e);

            }

        }

    }

    @Override

    public List<String> listDatabases() throws CatalogException {

        return extractColumnValuesBySQL(

                this.defaultUrl,

                "select username from sys.dba_users " +

                        "where DEFAULT_TABLESPACE <> 'SYSTEM' and DEFAULT_TABLESPACE <> 'SYSAUX' " +

                        " order by username",

                1,

                dbName -> !builtinDatabases.contains(dbName));

    }

    @Override

    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {

        Preconditions.checkState(

                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");

        if (!databaseExists(databaseName)){// 注意这个值是 oracle 实例名称

            throw new DatabaseNotExistException(getName(), databaseName);

        }

        List<String> listDatabases = listDatabases().stream().map(username -> "'" + username + "'")

                .collect(Collectors.toList());

        return extractColumnValuesBySQL(

                this.defaultUrl,

                "SELECT OWNER||'.'||TABLE_NAME AS schemaTableName FROM sys.all_tables WHERE OWNER IN (" + String.join(",", listDatabases) + ")"+

                "ORDER BY OWNER,TABLE_NAME",1, null, null);

    }

    @Override

    public boolean tableExists(ObjectPath tablePath) throws CatalogException {

        String[] schemaTableNames = getSchemaTableName(tablePath).split("\\.");

        return !extractColumnValuesBySQL(

                defaultUrl,

                "SELECT table_name FROM sys.all_tables where OWNER = ? and table_name = ?",

                1, null,

                schemaTableNames[0], schemaTableNames[1])

                .isEmpty();

    }

    /**

     * Converts Oracle type to Flink {@link DataType}.

     */

    @Override

    protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)

            throws SQLException {

        return dialectTypeMapper.mapping(tablePath, metadata, colIndex);

    }

    @Override

    protected String getTableName(ObjectPath tablePath) {

//        return tablePath.getObjectName();

        return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName();

    }

    @Override

    protected String getSchemaName(ObjectPath tablePath) {

//        return tablePath.getDatabaseName();

        return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();

    }

    @Override

    protected String getSchemaTableName(ObjectPath tablePath) {

//        return tablePath.getObjectName();

        return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath();

    }

}

需要特别说明的:

  • builtinDatabases中定义的数据库名是不需要加载到catalog中,一般是系统级数据库
  • 通过SQL语句获取库、表的元数据,注意全表名的命名为<user>.<table>
  • 改写tableExists方法,避免调用listTables方法(Postgres的Catalog没有避免),Flink发起表查询之前有更好的性能
  • 覆盖getTableName/getSchemaName/getSchemaTableName三个方法实现,直接使用PostgresTablePath中的方法

第二步创建OracleTypeMapper类

创建OracleTypeMapper类继承于JdbcDialectTypeMap抽象类,实现oracle字段类型转换FlinkSql字段类型。这个转换是最容易出错的地方,需要在使用过程中持续完善。

public class OracleTypeMapper implements JdbcDialectTypeMapper {

    private final String databaseVersion;

    private final String driverVersion;

    public OracleTypeMapper(String databaseVersion, String driverVersion) {

        this.databaseVersion = databaseVersion;

        this.driverVersion = driverVersion;

    }

    @Override

    public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)

            throws SQLException {

        int jdbcType = metadata.getColumnType(colIndex);

        String columnName = metadata.getColumnName(colIndex);

        String oracleType = metadata.getColumnTypeName(colIndex).toUpperCase();

        int precision = metadata.getPrecision(colIndex);

        int scale = metadata.getScale(colIndex);

        switch (jdbcType) {

            case Types.CHAR:

            case Types.VARCHAR:

            case Types.NCHAR:

            case Types.NVARCHAR:

            case Types.STRUCT:

            case Types.CLOB:

                return DataTypes.STRING();

            case Types.BLOB:

                return DataTypes.BYTES();

            case Types.INTEGER:

            case Types.SMALLINT:

            case Types.TINYINT:

                return DataTypes.INT();

            case Types.FLOAT:

            case Types.REAL:

            case OracleTypes.BINARY_FLOAT:

                return DataTypes.FLOAT();

            case Types.DOUBLE:

            case OracleTypes.BINARY_DOUBLE:

                return DataTypes.DOUBLE();

            case Types.NUMERIC:

            case Types.DECIMAL:

                if (precision > 0 && precision < DecimalType.MAX_PRECISION) {

                    return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));

                }

                return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);

            case Types.DATE:

                return DataTypes.DATE();

            case Types.TIMESTAMP:

            case Types.TIMESTAMP_WITH_TIMEZONE:

            case OracleTypes.TIMESTAMPTZ:

            case OracleTypes.TIMESTAMPLTZ:

                return scale > 0 ? DataTypes.TIMESTAMP(scale) : DataTypes.TIMESTAMP();

            case OracleTypes.INTERVALYM:

                return DataTypes.INTERVAL(DataTypes.YEAR(), DataTypes.MONTH());

            case OracleTypes.INTERVALDS:

                return DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND());

            case Types.BOOLEAN:

                return DataTypes.BOOLEAN();

            default:

                final String jdbcColumnName = metadata.getColumnName(colIndex);

                throw new UnsupportedOperationException(

                        String.format(

                                "Doesn't support Oracle type '%s' on column '%s' in Oracle version %s, driver version %s yet.",

                                oracleType, jdbcColumnName, databaseVersion, driverVersion));

        }

    }

}

第三步改造OracleDialect

在Flink新版本中官方已经提供了这个类,为什么还要改造呢?原因如下:

  1. 定义为私有类不能在第四步创建Catalog对象
  2. 在getLimitClause方法中使用oracle12("FETCH FIRST " + limit + " ROWS ONLY")的语法,而产品线大量使用oracle11版本,需要改造为更通用的语法("WHERE ROWNUM <= " + limit)

public class OracleDialect extends AbstractDialect {

    private static final long serialVersionUID = 1L;

    // Define MAX/MIN precision of TIMESTAMP type according to Oracle docs:

    // https://www.techonthenet.com/oracle/datatypes.php

    private static final int MAX_TIMESTAMP_PRECISION = 9;

    private static final int MIN_TIMESTAMP_PRECISION = 1;

    // Define MAX/MIN precision of DECIMAL type according to Oracle docs:

    // https://www.techonthenet.com/oracle/datatypes.php

    private static final int MAX_DECIMAL_PRECISION = 38;

    private static final int MIN_DECIMAL_PRECISION = 1;

    @Override

    public JdbcRowConverter getRowConverter(RowType rowType) {

        return new OracleRowConverter(rowType);

    }

    @Override

    public String getLimitClause(long limit) {

        // 该语法直到 Oracle Database 12c 才有效。

//        return "FETCH FIRST " + limit + " ROWS ONLY";

        // oracle 更通用的语法

        return "WHERE ROWNUM <= " + limit;

    }

    @Override

    public Optional<String> defaultDriverName() {

        return Optional.of("oracle.jdbc.OracleDriver");

    }

    @Override

    public String dialectName() {

        return "Oracle";

    }

    @Override

    public String quoteIdentifier(String identifier) {

        return identifier;

        // 不能指定下面的字符,会修改`库.名`为"库.名"而报错

//        return "\"" + identifier + "\"";

    }

    @Override

    public Optional<String> getUpsertStatement(

            String tableName, String[] fieldNames, String[] uniqueKeyFields) {

        String sourceFields =

                Arrays.stream(fieldNames)

                        .map(f -> ":" + f + " " + quoteIdentifier(f))

                        .collect(Collectors.joining(", "));

        String onClause =

                Arrays.stream(uniqueKeyFields)

                        .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))

                        .collect(Collectors.joining(" and "));

        final Set<String> uniqueKeyFieldsSet =

                Arrays.stream(uniqueKeyFields).collect(Collectors.toSet());

        String updateClause =

                Arrays.stream(fieldNames)

                        .filter(f -> !uniqueKeyFieldsSet.contains(f))

                        .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))

                        .collect(Collectors.joining(", "));

        String insertFields =

                Arrays.stream(fieldNames)

                        .map(this::quoteIdentifier)

                        .collect(Collectors.joining(", "));

        String valuesClause =

                Arrays.stream(fieldNames)

                        .map(f -> "s." + quoteIdentifier(f))

                        .collect(Collectors.joining(", "));

        // if we can't divide schema and table-name is risky to call quoteIdentifier(tableName)

        // for example [tbo].[sometable] is ok but [tbo.sometable] is not

        String mergeQuery =

                " MERGE INTO "

                        + tableName

                        + " t "

                        + " USING (SELECT "

                        + sourceFields

                        + " FROM DUAL) s "

                        + " ON ("

                        + onClause

                        + ") "

                        + " WHEN MATCHED THEN UPDATE SET "

                        + updateClause

                        + " WHEN NOT MATCHED THEN INSERT ("

                        + insertFields

                        + ")"

                        + " VALUES ("

                        + valuesClause

                        + ")";

        return Optional.of(mergeQuery);

    }

    @Override

    public Optional<Range> decimalPrecisionRange() {

        return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));

    }

    @Override

    public Optional<Range> timestampPrecisionRange() {

        return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));

    }

    @Override

    public Set<LogicalTypeRoot> supportedTypes() {

        // The data types used in Oracle are list at:

        // https://www.techonthenet.com/oracle/datatypes.php

        return EnumSet.of(

                LogicalTypeRoot.CHAR,

                LogicalTypeRoot.VARCHAR,

                LogicalTypeRoot.BOOLEAN,

                LogicalTypeRoot.VARBINARY,

                LogicalTypeRoot.DECIMAL,

                LogicalTypeRoot.TINYINT,

                LogicalTypeRoot.SMALLINT,

                LogicalTypeRoot.INTEGER,

                LogicalTypeRoot.BIGINT,

                LogicalTypeRoot.FLOAT,

                LogicalTypeRoot.DOUBLE,

                LogicalTypeRoot.DATE,

                LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,

                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,

                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,

                LogicalTypeRoot.ARRAY);

    }

}

第四步改造JdbcCatalogUtils

这个类是创建Catalog的关键类。修改该类的createCatalog方法,添加创建OracleCatalog对象的代码。

public class JdbcCatalogUtils {

    public static void validateJdbcUrl(String url) {

          String[] parts = url.trim().split("\\/+");

          checkArgument(parts.length == 2);

    }

    /**

     * Create catalog instance from given information.

     */

    public static AbstractJdbcCatalog createCatalog(

            ClassLoader userClassLoader,

            String catalogName,

            String defaultDatabase,

            String username,

            String pwd,

            String baseUrl) {

        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader);

        if (dialect instanceof PostgresDialect) {

            return new PostgresCatalog(

                    userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);

        } else if (dialect instanceof MySqlDialect) {

            return new MySqlCatalog(

                    userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);

        }

        // 添加 oracle的catalog

        else if (dialect instanceof OracleDialect) {

            return new OracleCatalog(

                    userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);

        }

 else {

            throw new UnsupportedOperationException(

                    String.format("Catalog for '%s' is not supported yet.", dialect));

        }

    }

}

使用OracleCatalog

OracleCatalog的使用和官方PostgreSQL使用近似,请参考JDBC | Apache Flink

使用案例:

CREATE CATALOG ORACLE_CATALOG WITH(

 'type' = 'jdbc',

 'default-database' = 'orclgps',    -- 指定数据库实例名

 'username' = '****', 'password' = '****',

 'base-url' = 'jdbc:oracle:thin:@//ip:port' --不要添加数据库实例名

);

SELECT `LINE_NO`, `UP_PASSENGER`, `DOWN_PASSENGER`, `SITE_TIME`, `STATION_NAME`

-- 注意全表名查询数据的格式

FROM ORACLE_CATALOG.orclgps.`TMGPS.TM_BUS_PASSENGER_UPDOWN` LIMIT 1;

使用经验分享

使用Catalog能够替换TABEL定义,减少字段类型转换问题和当字段多的时候能够大幅减少DDL的代码量。分析Flink通过JdbcCatalog发起查询过程源代码,必定调用tableExists方法,该方法中级联调用获取数据库和每个库的表,如果表非常多,有一定的性能损耗(OracleCatalog实现中做了优化)。

建议在批作业且短周期调度(比如10分钟以内)、尤其是adHoc查询时优先使用Table定义;流作业、长周期调度时使用Catalog定义(字段多、类型转换可以通过大数据中间件提供的工具自动生成DDL)。

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐