以下所有都是基于Flink 1.12.0版本

Flink JDBCSink的使用

flink提供了JDBCSink方便我们写入数据库,以下是使用案例:

pom依赖

需要引入flink-connector-jdbc的依赖。另外,我这里是写入mysql,所以还引入了mysql的驱动包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.45</version>
</dependency>
案例代码
package com.upupfeng.sink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html
 *
 * @author mawf
 */
public class JDBCSinkDemo {

    public static void main(String[] args) throws Exception {

        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
	
        // 从配置文件中读取配置信息
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile("D:\\upupfeng\\my_code\\flink-learning\\resources\\application.properties");
        String url = parameterTool.get("url");
        String driver = parameterTool.get("driver");
        String user = parameterTool.get("user");
        String password = parameterTool.get("password");
		
        // sql语句,用问号做占位符
        String sql = "insert into tb_traffic_statistic_min(starttime, city_name, distinct_user_count, total_traffic) values(?, ?, ?, ?)";
        // 伪造数据
        Tuple4<String, String, Integer, Double> bjTp = Tuple4.of("2020-12-01 00:00:00", "北京", 10, 2.3d);

        env
                .fromElements(bjTp)
                .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT, Types.DOUBLE))
                // 添加JDBCSink
            	.addSink(
                        JdbcSink.sink(
                                sql, // sql语句
                            	// 设置占位符对应的字段值
                                (ps, tp) -> {
                                    ps.setString(1, tp.f0);
                                    ps.setString(2, tp.f1);
                                    ps.setInt(3, tp.f2);
                                    ps.setDouble(4, tp.f3);
                                },
                            	// 传递jdbc的连接属性
                                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                        .withDriverName(driver)
                                        .withUrl(url)
                                        .withUsername(user)
                                        .withPassword(password)
                                        .build()
                        )
                );
		
        // 执行
        env.execute();
    }
}

JDBCSink源码解析

先从JdbcSink.sink来看,

JdbcSink

这是一个类,提供了静态方法供我们创建JDBC的SinkFunction。

提供了两个重载的sink方法。

public static <T> SinkFunction<T> sink(String sql, JdbcStatementBuilder<T> statementBuilder, JdbcConnectionOptions connectionOptions);

public static <T> SinkFunction<T> sink(
		String sql,
		JdbcStatementBuilder<T> statementBuilder,
		JdbcExecutionOptions executionOptions,
		JdbcConnectionOptions connectionOptions);

其中一个方法是对另一个方法的executionOptions参数提供了默认实现。

我们就看一下参数最全的sink方法:

	/**
	 * 创建JDBC Sink
	 * @param sql              任意DML查询(例如插入,update,upsert)
	 * @param statementBuilder  根据每一个查询在java.sql.PreparedStatement上设置参数
	 * @param <T>               数据类型
	 * @param executionOptions  执行时配置的参数,如批大小、重试等
	 * @param connectionOptions 连接参数,如jdbc url等
	 */
	public static <T> SinkFunction<T> sink(
		String sql,
		JdbcStatementBuilder<T> statementBuilder,
		JdbcExecutionOptions executionOptions,
		JdbcConnectionOptions connectionOptions) {
        // 创建了一个GenericJdbcSinkFunction对象,这个类就是JDBC对应的SinkFunction实现类
        // JdbcBatchingOutputFormat类是批处理OutputFormat的一个实现,封装了攒批处理的逻辑
		return new GenericJdbcSinkFunction<>(new JdbcBatchingOutputFormat<>(
			// 简单的JDBC连接提供者,可以创建JDBC连接
            new SimpleJdbcConnectionProvider(connectionOptions),
			// 执行参数
            executionOptions,
            // 执行PreparedStatement的方式
			context -> {
				Preconditions.checkState(!context.getExecutionConfig().isObjectReuseEnabled(),
					"objects can not be reused with JDBC sink function");
				// 使用一个静态方法,创建了简单的JdbcBatchStatementExecutor类:SimpleBatchStatementExecutor
                return JdbcBatchStatementExecutor.simple(sql, statementBuilder, Function.identity());
			},
            // 记录的提取方式
			JdbcBatchingOutputFormat.RecordExtractor.identity()
		));
	}

看下GenericJdbcSinkFunction类

GenericJdbcSinkFunction

JDBC的通用SinkFunction。

继承了RichSinkFunction接口,是一个标准的sink实现类。

// outputFormat内部封装了真正的处理逻辑
private final AbstractJdbcOutputFormat<T > outputFormat;

// 构造器只有AbstractJdbcOutputFormat一个参数
public GenericJdbcSinkFunction(@Nonnull AbstractJdbcOutputFormat<T> outputFormat) {
    this.outputFormat = Preconditions.checkNotNull(outputFormat);
}

// open方法
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    RuntimeContext ctx = getRuntimeContext();
    // 将context传递给outputFormat
    outputFormat.setRuntimeContext(ctx);
    // 调用outputFormat的open方法
    outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}

// 每条记录进来的时候调用outputFormat的writeRecord方法
@Override
public void invoke(T value, Context context) throws IOException {
    outputFormat.writeRecord(value);
}

// 状态相关的
@Override
public void initializeState(FunctionInitializationContext context) {
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    outputFormat.flush();
}

// clost方法中调用了outputFormat的close方法
public void close() {
    outputFormat.close();
}

从以上代码可以看出,基本上处理逻辑都封装了AbstractJdbcOutputFormat类中。

AbstractJdbcOutputFormat / JdbcBatchingOutputFormat

AbstractJdbcOutputFormat 是OutputFormat基础的抽象类,提供了一些方法的默认实现。

我们这里用到的是批提交对应的实现类JdbcBatchingOutputFormat,就直接看JdbcBatchingOutputFormat类了

JdbcBatchingOutputFormat源码:

// JDBC outputFormat支持在将记录写入数据库之前批处理记录
// JdbcBatchingOutputFormat继承了AbstractJdbcOutputFormat类。
public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>> extends AbstractJdbcOutputFormat<In> {
    
    // 从给定参数中提取值的接口。
    public interface RecordExtractor<F, T> extends Function<F, T>, Serializable {
        // 直接原样返回
		static <T> RecordExtractor<T, T> identity() {
			return x -> x;
		}
	}
    
    // 创建Statement执行类的工厂
    public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>> extends Function<RuntimeContext, T>, Serializable {
	}
    
    // 执行参数
    private final JdbcExecutionOptions executionOptions;
	// 创建执行类的工厂
    private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
    // 记录提取
	private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
	// 执行statement的类
	private transient JdbcExec jdbcStatementExecutor;
    // 批大小
	private transient int batchCount = 0;
	private transient volatile boolean closed = false;
	// 用于定时提交的定时器
	private transient ScheduledExecutorService scheduler;
	private transient ScheduledFuture<?> scheduledFuture;
	private transient volatile Exception flushException;
    
    // 构造函数
    public JdbcBatchingOutputFormat(
        	// JDBC连接提供者
			@Nonnull JdbcConnectionProvider connectionProvider,
			// 执行参数
        	@Nonnull JdbcExecutionOptions executionOptions,
			// 创建statement执行类的工厂
        	@Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
        	// 记录提取
			@Nonnull RecordExtractor<In, JdbcIn> recordExtractor) {
		super(connectionProvider);
		this.executionOptions = checkNotNull(executionOptions);
		this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
		this.jdbcRecordExtractor = checkNotNull(recordExtractor);
	}
    
	/**
	 * 连接到目标数据库,并初始化准备好的语句
	 *
	 * @param taskNumber 并行实例数
	 */
	@Override
	public void open(int taskNumber, int numTasks) throws IOException {
		super.open(taskNumber, numTasks);
        // 根据给定的statement执行类工厂来创建 statement执行类
		jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
        // 设置一个定时器,来定时提交数据到数据库。
        // 相当于就是有两种方式可以触发提交:一种是到达批大小,一种是到了定时的时间。
		if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
            // 创建一个调度线程池
			this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
            // 注册定时调度
			this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
				synchronized (JdbcBatchingOutputFormat.this) {
					if (!closed) {
						try {
                            // 调用flush方法,将数据刷入数据库
							flush();
						} catch (Exception e) {
							flushException = e;
						}
					}
				}
			}, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
		}
	}    
    
	// 根据给定的工厂类创建对应的statement执行类
	private JdbcExec createAndOpenStatementExecutor(StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException {
        // 创建一个执行类
        // 对于batch output format来说是创建了一个SimpleBatchStatementExecutor类,我们稍后看
		JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
		try {
            // 从连接中创建一个statement
			exec.prepareStatements(connection);
		} catch (SQLException e) {
			throw new IOException("unable to open JDBC writer", e);
		}
		return exec;
	}    
    
    // 写每条记录。这个方法就是在GenericJdbcSinkFunction的invoke方法中被调用,处理每一条记录
	@Override
	public final synchronized void writeRecord(In record) throws IOException {
		checkFlushException();

		try {
            // 将数据添加到batch中
			addToBatch(record, jdbcRecordExtractor.apply(record));
			// batch数量增加
            batchCount++;
            // 如果批大小够了,触发flush方法提交数据
			if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) {
				flush();
			}
		} catch (Exception e) {
			throw new IOException("Writing records to JDBC failed.", e);
		}
	}    
    
    // 添加记录到batch中
	protected void addToBatch(In original, JdbcIn extracted) throws SQLException {
        // 调用了statement执行类的addToBatch方法,我们稍后看
		jdbcStatementExecutor.addToBatch(extracted);
	}    
    
    // 提交方法
	@Override
	public synchronized void flush() throws IOException {
		checkFlushException();

        // 根据重试次数循环。如果失败,会重试。如果成功,就直接break了
		for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
			try {
                // 执行提交
				attemptFlush();
                // batch count置为0 
				batchCount = 0;
				break;
			} catch (SQLException e) {
				LOG.error("JDBC executeBatch error, retry times = {}", i, e);
				if (i >= executionOptions.getMaxRetries()) {
					throw new IOException(e);
				}
				try {
                    // 如果是因为连接失效导致的。则重新获取连接
					if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
						// 重新获取连接
                        connection = connectionProvider.reestablishConnection();
                        // 关闭执行器的statement
						jdbcStatementExecutor.closeStatements();
						// 根据连接创建statement
                        jdbcStatementExecutor.prepareStatements(connection);
					}
				} catch (Exception excpetion) {
					LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion);
					throw new IOException("Reestablish JDBC connection failed", excpetion);
				}
				try {
					Thread.sleep(1000 * i);
				} catch (InterruptedException ex) {
					Thread.currentThread().interrupt();
					throw new IOException("unable to flush; interrupted while doing another attempt", e);
				}
			}
		}
	}

    // 执行提交
    protected void attemptFlush() throws SQLException {
		jdbcStatementExecutor.executeBatch();
	}

    
    // close方法
    public synchronized void close() {
		if (!closed) {
			closed = true;

			if (this.scheduledFuture != null) {
                // 关闭定时器
				scheduledFuture.cancel(false);
				this.scheduler.shutdown();
			}
			// 如果batch中还有数据,则提交
			if (batchCount > 0) {
				try {
					flush();
				} catch (Exception e) {
					LOG.warn("Writing records to JDBC failed.", e);
					throw new RuntimeException("Writing records to JDBC failed.", e);
				}
			}
			//	关闭statement
			try {
				if (jdbcStatementExecutor != null) {
					jdbcStatementExecutor.closeStatements();
				}
			} catch (SQLException e) {
				LOG.warn("Close JDBC writer failed.", e);
			}
		}
		super.close();
		checkFlushException();
	}
    
    ......
}

以上就是outputFormat的代码,提供了攒批、按批提交、定时提交的方法。

内部真正执行statement,还是调用JdbcBatchStatementExecutor来实现的。接下来看看JdbcBatchStatementExecutor

JdbcBatchStatementExecutor

这个接口是用于批量执行给定的JDBC语句以获取累积的记录。就是按批提交。

我们看下他的实现类SimpleBatchStatementExecutor

SimpleBatchStatementExecutor的源码:

class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T> {
	
    // sql
    private final String sql;
	// statement的build,用于将参数设置到占位符上
    private final JdbcStatementBuilder<V> parameterSetter;
    // 值转换
	private final Function<T, V> valueTransformer;
	// 使用List来存储批
    private final List<V> batch;
    
    // 构造器
    SimpleBatchStatementExecutor(String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) {
		this.sql = sql;
		this.parameterSetter = statementBuilder;
		this.valueTransformer = valueTransformer;
		this.batch = new ArrayList<>();
	}
    
    // 调用prepareStatement预编译sql
	@Override
	public void prepareStatements(Connection connection) throws SQLException {
		this.st = connection.prepareStatement(sql);
	}
    
    // 将记录添加到批中。在outputFormat中调用这个方法攒批
    @Override
	public void addToBatch(T record) {
		batch.add(valueTransformer.apply(record));
	}

    // 执行批、在outputFormat中调用,执行批提交
	@Override
	public void executeBatch() throws SQLException {
		if (!batch.isEmpty()) {
            // 遍历批
			for (V r : batch) {
                // 这里是用statement的引用,将值赋到statment上
				parameterSetter.accept(st, r);
				// 调用statament的addBatch方法
                st.addBatch();
			}
            // 执行提交
			st.executeBatch();
			// 清空批
            batch.clear();
		}
	}

    // 关闭statement
    @Override
	public void closeStatements() throws SQLException {
		if (st != null) {
			st.close();
			st = null;
		}
	}
}

从上面的所有代码,就可以了解JdbcSink的提交原理了。

接下来在看下两个配置的类:JdbcExecutionOptions和JdbcConnectionOptions。一个是执行的配置,一个是连接的配置。

JdbcConnectionOptions

有url、driverName、username、password四个配置

内部提供了builder类用于创建JdbcConnectionOptions

public class JdbcConnectionOptions implements Serializable {
	
    protected final String url;
	protected final String driverName;
	@Nullable
	protected final String username;
	@Nullable
	protected final String password;

    public static class JdbcConnectionOptionsBuilder {
        public JdbcConnectionOptions build() {
			return new JdbcConnectionOptions(url, driverName, username, password);
		}

    }
}

JdbcExecutionOptions

执行配置。有batchIntervalMs、batchSize、maxRetries三个配置。

内部也提供了builder来创建JdbcExecutionOptions

public class JdbcExecutionOptions implements Serializable {
	public static final int DEFAULT_MAX_RETRY_TIMES = 3;
	private static final int DEFAULT_INTERVAL_MILLIS = 0;
	public static final int DEFAULT_SIZE = 5000;

	// 自动提交批的时间间隔。毫秒值。默认是0,默认不会定时提交
	private final long batchIntervalMs;
	// 批大小。默认的批大小为5000
	private final int batchSize;
	// 重试次数。默认为3
	private final int maxRetries;
	
	public static final class Builder {
		public JdbcExecutionOptions build() {
			return new JdbcExecutionOptions(intervalMs, size, maxRetries);
		}
	}
}	

总结

以上就是JdbcSink的在Stream API中的使用和部分JdbcSink的源码。

这个JdbcSink写的很好,可以覆盖一部分场景。

但是有时候并不是很满足我们的要求,我们可以参考这个JdbcSink进行改造、二次开发。

参考

官方JDBC Connector文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

flink-connector-jdbc_2.12-1.12.0.jar源代码

Logo

更多推荐