Oracle 迁移到PostgreSQL

  1. 使用Spring JDBC Template访问source和target数据库,可以查询、插入、更新数据。为了保证数据一致性,可以使用JdbcTemplate提供的事务支持。
  2. 监测ddl语句的变化,可以使用PostgreSQL的触发器功能来实现。具体地,可以在source数据库中创建一个ddl触发器,当有ddl语句被执行时,触发器将记录这个事件,并将相关信息写入一个专门用于记录ddl变更的表中。
  3. 可以使用定时任务来监测ddl表的变化。当有新的记录出现时,可以读取这个记录,解析其中的信息(如表名、字段名称、数据类型等),并通过Spring JDBC Template更新target数据库中的表结构。
  4. 监测数据变化可以使用PostgreSQL的LISTEN/NOTIFY机制。可以在source数据库中创建一个LISTEN,并在应用程序中注册对这个监听事件的处理函数。当source数据库的数据发生变化时,应用程序将接收到一个事件通知,然后可以使用JdbcTemplate提供的数据查询、插入、更新操作将变化同步到target数据库中。
  5. 为了应对异常情况,可以在JDBC Template的操作中添加异常处理机制。当出现异常时,可以使用JdbcTemplate提供的事务回滚功能,保证数据的一致性。

由于代码量比较大,这里就不一一列出,将核心的代码列举出来,项目的源码以放在github上,地址:https://github.com/yanghaiji/DataGrip 也欢迎大家一起维护与开发

数据源配置

spring:
  profiles:
    include: datagrip
  datasource:
    source:
      driver-class-name: oracle.jdbc.OracleDriver
      jdbcUrl: jdbc:oracle:thin:@localhost:1521/ORCLPDB1
      username: oracle
      password: oracle
      minimumIdle: 10
      maximumPoolSize: 150
      autoCommit: true
      idleTimeout: 30000
      poolName: source
      maxLifetime: 60000
      connectionTimeout: 60000
      validationTimeout: 5000
      readOnly: false
      loginTimeout: 5
    target:
      driverClassName: org.postgresql.Driver
      jdbcUrl: jdbc:postgresql://localhost:5432/target?rewriteBatchedStatements=true
      username: postgres
      password: postgres
      minimumIdle: 10
      maximumPoolSize: 30
      autoCommit: true
      idleTimeout: 30000
      poolName: target
      maxLifetime: 60000
      connectionTimeout: 60000
      validationTimeout: 5000
      readOnly: false
      loginTimeout: 5

数据库表结构同步

@Override
protected void createTable(String tableName) {
    StringBuilder sql = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
    sql.append(tableName).append("(");
    // 查询 Oracle 数据库中表的字段信息
    List<Map<String, Object>> columns = oracleJdbcTemplate.queryForList("SELECT * FROM USER_TAB_COLUMNS WHERE TABLE_NAME = ?", tableName);
    for (Map<String, Object> column : columns) {
        String columnName = (String) column.get("COLUMN_NAME");
        String dataType = (String) column.get("DATA_TYPE");
        int dataLength = ((Number) column.get("DATA_LENGTH")).intValue();
        // 根据 Oracle 数据库的数据类型转换为 Postgres 数据库的数据类型
        String pgDataType;
        if (dataType.equalsIgnoreCase(DbTypes.VARCHAR2.name())
                || dataType.equalsIgnoreCase(DbTypes.NVARCHAR2.name())
                || dataType.equalsIgnoreCase(DbTypes.CHAR.name())) {
            pgDataType = DbTypes.VARCHAR.name().concat("(" + dataLength + ")");
        } else if (dataType.equalsIgnoreCase(DbTypes.NUMBER.name())
                || dataType.equalsIgnoreCase(DbTypes.FLOAT.name())
                || dataType.equalsIgnoreCase(DbTypes.DECIMAL.name())) {
            Object data_precision = column.get("DATA_PRECISION");
            Object data_scale = column.get("DATA_SCALE");
            if (Objects.nonNull(data_scale) && Objects.nonNull(data_precision)) {
                int precision = ((Number) data_precision).intValue();
                int scale = ((Number) data_scale).intValue();
                if (scale <= 0) {
                    pgDataType = "NUMERIC(" + precision + ")";
                } else {
                    pgDataType = "NUMERIC(" + precision + "," + scale + ")";
                }
            } else {
                pgDataType = DbTypes.NUMERIC.name();
            }
        } else if (dataType.equalsIgnoreCase(DbTypes.DATE.name())
                || dataType.equalsIgnoreCase(DbTypes.TIMESTAMP.name())
                || dataType.contains(DbTypes.TIMESTAMP.name())) {
            pgDataType = DbTypes.TIMESTAMP.name();
        } else {
            throw new RuntimeException("Unsupported data type: " + dataType);
        }
        sql.append(columnName).append(" ").append(pgDataType).append(",");
    }
    // 去除最后一个逗号
    sql.deleteCharAt(sql.length() - 1);
    sql.append(")");
    // 执行创建 PostgreSQL 表的 SQL 语句
    pgsqlJdbcTemplate.execute(sql.toString());
}

数据迁移

    @Override
    protected void migrateData(String tableName) {
        int page = 0;
        boolean hasNextPage = true;
        // 假设每页查询1000条数据
        int pageSize = dataGripProperties.getOracle().getPageSize();
        while (hasNextPage) {
            // 分页查询 Oracle 数据库中的数据
            List<Map<String, Object>> data = oracleJdbcTemplate.queryForList(getPagingSql(tableName, page, pageSize));
            if (data.isEmpty()) {
                hasNextPage = false;
            } else {
                // 排除 RN : 由于 oracle 查询是带出来的行号,但是在真是的数据传输时不需要这个行号,需要剔除
                Set<String> fields = data.get(0).keySet();
                String insertSql = "INSERT INTO " + tableName + " (" +
                        StringUtils.join(fields.stream().filter(o -> !"RN".equals(o)).collect(Collectors.toList()), ", ") + ")" +
                        " VALUES (" + StringUtils.repeat("?", ", ", fields.size() - 1) + ")";
                pgsqlJdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
                    @Override
                    public void setValues(PreparedStatement ps, int i) throws SQLException {
                        Map<String, Object> row = data.get(i);
                        row.remove("RN");
                        int idx = 1;
                        for (Object value : row.values()) {
                            ps.setObject(idx++, value);
                        }
                    }

                    @Override
                    public int getBatchSize() {
                        return data.size();
                    }
                });

            }
            page++;
        }
    }

目前的实现还是建议版本,无法保证速度,后期会修改为多线程去执行,

项目的源码以放在github上,地址:https://github.com/yanghaiji/DataGrip 也欢迎大家一起参与开发与优化

更多推荐