Flink读写Doris操作介绍

​ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。

  • Flink操作Doris修改和删除只支持在 Unique Key 模型上

1. 准备开发环境

  • pom.xml加入依赖
<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.13_2.12</artifactId>
    <version>1.0.3</version>
</dependency>
  • 创建测试库测试表
-- 切测试库
use test_db;

-- 创建测试表flinktest
CREATE TABLE flinktest
(
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- 插入样例数据
insert into flinktest values
(1,1,'jim',2),
(2,1,'grace',2),
(3,2,'tom',2),
(4,3,'bush',3),
(5,3,'helen',3);

-- 查看表数据情况
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      5 |        3 | helen    |    3 |
|      4 |        3 | bush     |    3 |
|      3 |        2 | tom      |    2 |
|      2 |        1 | grace    |    2 |
+--------+----------+----------+------+
  • Doris 和 Flink 列类型映射关系
Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

2. Flink-DataStream读Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class Flink_stream_read_doris {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);


        Properties props = new Properties();
        props.setProperty("fenodes", "hdt-dmcp-ops01:8130");
        props.setProperty("username", "root");
        props.setProperty("password", "123456");
        props.setProperty("table.identifier", "test_db.flinktest");

        env
                .addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()))
                .print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
  代码控制台输出:
[4, 3, bush, 3]
[2, 1, grace, 2]
[1, 1, jim, 2]
[5, 3, helen, 3]
[3, 2, tom, 2]
 */

3. Flink写Doris

Flink 读写 Doris 数据主要有两种方式

  • DataStream
  • SQL

3.1 Flink-DataStream以 JSON 数据 写到Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * 使用 Flink 将 JSON 数据 写到Doris数据库
 */
public class Flink_stream_write_doris_json {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("strip_outer_array", "true");
        env
                .fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}")
                .addSink(DorisSink.sink(
                        new DorisExecutionOptions.Builder()
                                .setBatchIntervalMs(2000L)
                                .setEnableDelete(false)
                                .setMaxRetries(3)
                                .setStreamLoadProp(pro)
                                .build(),
                        new DorisOptions.Builder()
                                .setFenodes("hdt-dmcp-ops01:8130")
                                .setUsername("root")
                                .setPassword("123456")
                                .setTableIdentifier("test_db.flinktest")
                                .build())
                );

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
    代码执行前: 5 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      5 |        3 | helen    |    3 |
|      4 |        3 | bush     |    3 |
|      3 |        2 | tom      |    2 |
|      2 |        1 | grace    |    2 |
+--------+----------+----------+------+

    代码执行后: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.2 Flink-DataStream以 RowData 数据 写Doris

代码示例:

package com.zenitera.bigdata.doris;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.*;


public class Flink_stream_write_doris_rowdata {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};
        String[] fields = {"siteid", "citycode", "username", "pv"};

        env
                .fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}")

                .map(json -> {
                    JSONObject obj = JSON.parseObject(json);
                    GenericRowData rowData = new GenericRowData(4);
                    rowData.setField(0, obj.getIntValue("siteid"));
                    rowData.setField(1, obj.getShortValue("citycode"));
                    rowData.setField(2, StringData.fromString(obj.getString("username")));
                    rowData.setField(3, obj.getLongValue("pv"));
                    return rowData;

                })


                .addSink(DorisSink.sink(
                        fields,
                        types,
                        new DorisExecutionOptions.Builder()
                                .setBatchIntervalMs(2000L)
                                .setEnableDelete(false)
                                .setMaxRetries(3)
                                .build(),
                        new DorisOptions.Builder()
                                .setFenodes("hdt-dmcp-ops01:8130")
                                .setUsername("root")
                                .setPassword("123456")
                                .setTableIdentifier("test_db.flinktest")
                                .build())
                );

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

/*
    代码执行前: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+

    代码执行后: 7 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|     10 |     1001 | ww       |  100 |
|    100 |     1002 | wang     |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.3 Flink-SQL 方式写Doris

Doris测试表:

use test_db;

truncate table flinktest;

insert into flinktest values
(1,1,'aaa',1),
(2,2,'bbb',2),
(3,3,'ccc',3);

select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        2 | bbb      |    2 |
|      1 |        1 | aaa      |    1 |
|      3 |        3 | ccc      |    3 |
+--------+----------+----------+------+
3 rows in set (0.01 sec)

Flink-SQL代码示例:

package com.zenitera.bigdata.doris;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0518(" +
                " siteid int, " +
                " citycode int, " +
                " username string, " +
                " pv bigint " +
                ")with(" +
                "  'connector' = 'doris', " +
                "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
                "  'table.identifier' = 'test_db.flinktest', " +
                "  'username' = 'root', " +
                "  'password' = '123456' " +
                ")");

        tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Flink_0518 {
        private Integer siteid;
        private Integer citycode;
        private String username;
        private Long pv;
    }
}

执行代码,执行完成后查看Doris对应表数据进行验证:

select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      3 |        3 | ccc      |    3 |
|      2 |        2 | bbb      |    2 |
|      1 |        1 | aaa      |    1 |
|      4 |        4 | wangting |    4 |
+--------+----------+----------+------+
4 rows in set (0.01 sec)

3.4 Flink-SQL 方式读Doris

package com.zenitera.bigdata.doris;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris_read {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0520(" +
                " siteid int, " +
                " citycode SMALLINT, " +
                " username string, " +
                " pv bigint " +
                ")with(" +
                "  'connector' = 'doris', " +
                "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
                "  'table.identifier' = 'test_db.flinktest', " +
                "  'username' = 'root', " +
                "  'password' = '123456' " +
                ")");

        tEnv.sqlQuery("select * from flink_0520").execute().print();

    }
}

/*
   控制台输出信息:
+----+-------------+----------+---------------+---------+
| op |      siteid | citycode |      username |      pv |
+----+-------------+----------+---------------+---------+
| +I |           1 |        1 |           aaa |       1 |
| +I |           3 |        3 |           ccc |       3 |
| +I |           2 |        2 |           bbb |       2 |
| +I |           4 |        4 |      wangting |       4 |
+----+-------------+----------+---------------+---------+
4 rows in set
*/
Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐