5、Flink 流处理API

  • Scala 编程中注意的隐式转换

  • import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api..scala._
    

5.1、Environment

  • getExecutionEnvironment
    • 创建一个执行环境,表示当前执行程序的上下文
    • 如果程序是独立调用的,则此方法返回本地执行环境;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • createLocalEnvironment
    • 返回本地执行环境,需要在调用时指定默认的并行度。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
  • createLocalEnvironmentWithWebUI(new Configuration())
    • 返回*本地执行环境和WebUI**,需要指定配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
  • createRemoteEnvironment
    • 返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar");

5.2、Source

  • 基于文件
    • readTextFile(path) - 读取text文件的数据
    • readFile(fileInputFormat, path) - 通过自定义的读取方式,来读取文件的数据
//开始读取文件
DataStreamSource<String> dataStream1 = environment.readTextFile("data/flink.txt", "UTF-8");

//readFile()
DataStreamSource<String> dataStream2 = environment.readFile(new YjxxtFileInputFormat(), "data/flink.txt");
class YjxxtFileInputFormat extends FileInputFormat<String> {
    @Override
    public boolean reachedEnd() throws IOException {
        return false;
    }
    @Override
    public String nextRecord(String s) throws IOException {
        return null;
    }
}
  • 基于socket
    • socketTextStream 从socket端口中读取数据
  • 基于集合
    • fromCollection(Collection) - 从collection集合里读取数据,从而形成一个数据流,集合里的元素类型需要 一致
    • fromElements(T …) - 从数组里读取数据,从而形成一个数据流,集合里的元素类型需要一致。
    • generateSequence(from, to) - 创建一个数据流,数据源里的数据从from到to的数字。
//创建一个Collection
List<String> list = Arrays.asList("aa bb", "bb cc", "cc dd", "dd ee", "ee ff", " ff aa");
DataStreamSource<String> dataStream = environment.fromCollection(list);

//基于Element
DataStreamSource<String> dataStream2 = environment.fromElements("aa bb", "bb cc", "cc dd", "dd ee", "ee ff", " ff aa");

//generateSequence(from, to)
DataStreamSource<Long> dataStream3 = environment.generateSequence(1, 100);
  • 自定义source
    • addSource - 自定义一个数据源,比如FlinkKafkaConsumer,从kafka里读数据。
		<!-- Flink Kafka连接器的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
//连接数据源
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
properties.setProperty("group.id", "yjx_kafka_flink");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("enable.auto.commit", "true");
DataStreamSource<String> dataStream = 
	environment.addSource(new FlinkKafkaConsumer011<String>("yjxflink", new SimpleStringSchema(), properties));

5.3、Transform (Operator)

在这里插入图片描述

  1. 基本转换算子
    • 基本转换算子会针对流中的每一个单独的事件做处理,也就是说每一个输入数据会产生 一个输出数据。单值转换,数据的分割,数据的过滤,都是基本转换操作的典型例子。
  2. 键控流转换算子
    • 很多流处理程序的一个基本要求就是要能对数据进行分组分组后的数据共享某一个相同的属性。DataStream API提供了一个叫做 KeyedStream 的抽象,此抽象会从逻辑上 对DataStream进行分区,分区后的数据拥有同样的 Key 值,分区后的流互不相关。
    • 针对KeyedStream的状态转换操作可以读取数据或者写入数据到当前事件Key所对应的状态中。这表明拥有同样Key的所有事件都可以访问同样的状态,也就是说所以这些事件可以一起处理。
  3. 多流转换算子
    • 许多应用需要摄入多个流并将流合并处理,还可能需要将一条流分割成多条流然后针对每一条流应用不同的业务逻辑。接下来,我们将讨论DataStream API中提供的能够处理多条输入流或者发送多条输出流的操作算子。
  4. 分布式转换算子
    • 分区操作对应于我们之前讲过的“数据交换策略”这一节。这些操作定义了事件如何分配到不同的任务中去。当我们使用DataStream API来编写程序时,系统将自动的选择数据分区策略,然后根据操作符的语义和设置的并行度将数据路由到正确的地方去。有些时候,我们需要在应用程序的层面控制分区策略,或者自定义分区策略。例如,如果我们 知道会发生数据倾斜,那么我们想要针对数据流做负载均衡,将数据流平均发送到接下 来的操作符中去。又或者,应用程序的业务逻辑可能需要一个算子所有的并行任务都需 要接收同样的数据。再或者,我们需要自定义分区策略的时候。
5.3.1. 基本转换算子
  1. map
    • map()基本是一对一服务,即输入一个元素输出一个元素。
  2. flatmap
    • 扁平化操作,1对多;
  3. filter
    • 上对一个布尔条件进行求值来过滤掉一些元素,然后将剩下 的元素继续发送。
5.3.2. 键控转换算子
  1. keyby
    • 基于不同的key,流中的事件将被分配 到不同的分区中去。
  2. 滚动聚合
    • sum():min():max():minBy():maxBy():
  3. Reduce() 聚合
    • 按照同一个Key分组的数据流上生效,两两合一地进行汇总操作,生成一个同类型的新元素。
5.3.6. 多流转换算子
  • 事件合流的方式为FIFO方式。

  • union

    • 将两条或者多条DataStream合并成一条具有与输入流相同类型的输出 DataStream。
    • 不去重,合流顺序为先进先出
    • 合并的流的类型必须是相同的
  • connect

    • 可以合并2个数据流
    • connect只能连接两个数据流
    • 流的数据类型可以不一致
5.3.9. 分布式转换算子
  1. .shuffle()

    • 随机数据交换
  2. rebalance()

    • Round-Robin负载均衡算法将输入流平均分配到所有的随后的并行运行的任务中去
  3. rescale()

    • 轻量级的Round-Robin负载均衡策略。
    • 对每一个任务和下游算子的一部分子并行任务之间建立通信通道。
  4. broadcast()

    • 所有数据复制并发送到下游算子的所有并行任务中去
  5. global()

    • 所有流数据都发送到下游算子的第一个并行任务中去。
  6. partitionCustom() 方法来自定义分区策略。

5.4、支持的数据类型

  • Primitives(原始数据类型) 四类八种
  • Java和Scala的Tuples(元组)
  • Scala的样例类
  • POJO类型 -》必须是公共类
  • 一些特殊的类型
DataStream[Long] numbers = env.fromElements(1L, 2L, 3L, 4L);
numbers.map(n -> n + 1);

//PoJo
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23)
);
//key的定义方式
persons.keyBy("f2")

5.5、UDF函数

  • Flink 暴露了所有udf函数的接口(实现方式为接口或者抽象类)。
    • MapFunction
    • FilterFunction
    • FlatmapFunction
    • ProcessFunction 自定义高级函数

5.6、富函数

  • 在函数处理数据之前,需要做一些初始化的工作;

  • 需要在处理数据时可以获得函数执行上下文 的一些信息;

  • 以及在处理完数据时做一些清理工作。

  • DataStream API提供的所有转换操作函数,都拥有它们的“富”版本,

    1. RichMapFunction
    2. RichFlatMapFunction
    3. RichFilterFunction
  • 使用富函数时可以实现额外的方法:

    1. open()方法是rich function的初始化方法
    2. close()方法是生命周期中的最后一个调用的方法
    3. getRuntimeContext()方法提供了函数的RuntimeContext的一些信息

5.7、Sink

  • writeAsText() // 将计算结果输出成text文件
  • writeAsCsv(…) // 将计算结果输出成csv文件
  • print() // 将计算结果打印到控制台
  • writeUsingOutputFormat() //自定义输出方式。
  • writeToSocket // 将计算结果输出到某台机器的端口上。
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐