Flink JDBCOutputFormat
从数据库查询,或者插入到数据库,使用自带的JDBCOutputFormat。import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.java.DataSet;import
·
一段时间不用Flink/Spark老是会忘记算子和很多connector的参数,还经常把Flink算子与Spark混淆了。打算花点时间把每个重要的算子和常用的connector玩一下。版本1.8.1
从数据库查询,或者插入到数据库,使用自带的JDBCOutputFormat。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;;import java.sql.SQLType;
import java.sql.Types;
public class print {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> ds = env.fromElements("100,100","200,100");
/*转换ds为Row, JDBCOutputFormat不管是插入,还是查询,只能使用Row*/
//插入数据到MySQL
ds.map(new MapFunction<String, Row>() {
@Override
public Row map(String s) throws Exception {
String[] ss = s.split(",");
Row row = new Row(2);
row.setField(0,ss[0]);
row.setField(1,ss[1]);
return row;
}
}).output(JDBCOutputFormat.buildJDBCOutputFormat().setDBUrl("jdbc:mysql://10.203.3.97/test")
.setDrivername("com.mysql.jdbc.Driver").setUsername("admin").setPassword("internal")
.setSqlTypes(new int[]{Types.VARCHAR,Types.VARCHAR})
.setQuery("insert into test values(?,?)").finish());
//从MySQL查询数据,并转换Row为Tuple2进行处理
DataSet<Row> ds3 = env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDBUrl("jdbc:mysql://10.203.3.97/test")
.setDrivername("com.mysql.jdbc.Driver").setUsername("admin").setPassword("internal").setQuery("select id,name from test")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish());
DataSet<Tuple2<Integer,String>> ds4 = ds3.map(new MapFunction<Row, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer, String> map(Row row) throws Exception {
int id = (Integer)row.getField(0);
String name = (String)row.getField(1);
return new Tuple2<Integer,String>(id, name);
}
});
ds4.print();
// env.execute();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)