Flink 三:Flink 流处理 API

主要内容结构:

Flink流处理API

1.流处理相关概念

  • 数据的时效性

    • 对网站的实时监控

    • 对异常日志的监控

  • 流式计算和批量计算

    image-20210202165040261

    • Batch Analytics 批量计算:统一收集数据-》存储到DB-》对数据进行批量处理,就是传统意义上使用类似于Map Reduce、Hive、Spark Batch等,对作业进行分析、处理、生成离线报表

    • Streaming Analytics流式计算:对数据流进行处理,如果用流式分析引擎如Storm,Flink实时处理分析数据,应用较多的场景如实时大屏,实时报表

image-20210202171201530

它们的主要区别是:

  • 与批量计算,慢慢积累数据不同,流计算立刻计算,数据持续流动,完成之后就丢弃;

  • 批量计算是维护一张表,对表进行实施各种计算逻辑。流式计算相反,是必须先定义好逻辑,提交到流式计算系统,这对计算作业逻辑在整个运行期间是不可更改的;

  • 计算结果上,批量计算对全部数据进行计算后传输结果,流式计算是每次小批量计算后,结果可以立刻实时化展现

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

image-20210202171858036

2. DataStream

  • 有边界流(bounded stream):有定义流的开始,也有定义流的结束。有界可以在摄取虽有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常称为批处理

  • 无界流(unbounded stream):有定义流的开始,也有定义流的结束。他们会无休止地产生数据。无休止地数据必须持续处理,即数据被摄取后需要立刻处理。不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件例如事件发生地顺序,以便能够推断结果地完整性。

image-20210202172728319

image-20210202173159897

DataStream数据流有5个子类,截图如下:

image-20210202173402667

3.API和编程模型

和批处理类似,Flink的流处理也支持多个层次的api并包含三个方面:

Source/Transformation/Sink

  • step1、Obtain an execution environment

  • step2、Load/create the initial data

  • step3、Specify transformations on this data

  • step4、Specify where to put the results of your computations

  • step5、Trigger the program execution

image-20210202173715606

3.1词频统计(java语言)

使用java语言编写从TCP Socket读取数据,进行词频统计WordCount,结果打印至控制台。

 package cn.itcast.flink.start;
 ​
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
 ​
 /**
  * 基于 Flink 流计算引擎:从TCP Socket消费数据,实时词频统计WordCount
  */
 public class StreamWordCount {
     public static void main(String[] args) throws Exception {
         // 1. 执行环境-env:流计算执行环
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         // 2. 数据源-source:Socket接收数据
         DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
         // 3. 转换处理-transformation:调用DataSet函数,处理数据
         SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
                 //a.过滤数据
                 .filter(new FilterFunction<String>() {
                     @Override
                     public boolean filter(String line) throws Exception {
                         return null != line && line.trim().length() > 0;
                     }
                 })
                 //b.分割单词
                 .flatMap(new FlatMapFunction<String, String>() {
                     @Override
                     public void flatMap(String line, Collector<String> out) throws Exception {
                         String[] words = line.trim().toLowerCase().split("\\W+");
                         for (String word : words) {
                             out.collect(word);
                         }
                     }
                 })
                 //c.转换二元组,表示每个单词出现一次
                 .map(new MapFunction<String, Tuple2<String, Integer>>() {
                     @Override
                     public Tuple2<String, Integer> map(String word) throws Exception {
                         return Tuple2.of(word, 1);
                     }
                 })
                 //d.按照单词分组及对组内聚合操作
                 .keyBy(0).sum(1);
 ​
                 //d.数据终端-sink;结果数据打印在控制台
                 resultDataStream.print();
                 env.execute(StreamWordCount.class.getSimpleName());
     }
 }

测试端口:nc -l node1.itcast.cn 9999

3.2 词频统计(Scala 语言)

使用Scala语言编写从TCP Socket读取数据,进行词频统计WordCount,结果打印至控制台。

 import org.apache.flink.streaming.api.scala._
 /**
  * 使用Scala语言编程实现Flink实时词频统计WordCount,从TCP Socket读取数据,分析结果打印控制台。
  */
 object FlinkWordCount {
   def main(args: Array[String]): Unit = {
     //1.执行环境-env
     //val env :
     //val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
     env.setParallelism(2)
 ​
     //2-数据源-source
     val inputDataStream: DataStream[String] = env.socketTextStream("node1.itcast.cn", 9999)
 ​
     //3数据转换-transformation
     val resultDataStream: DataStream[(String, Int)] = inputDataStream
       // a. 过滤数据,如空字符串
       .filter(line => null != line && line.trim.length > 0)
       // b. 将每行数据分割为字符
       .flatMap(line => line.trim.toLowerCase().split("\\W+"))
       // c. 转换为二元组,表示每个单词出现一次
       .map(word => (word, 1))
       // d. 按照单词分组和组内聚合计数
       .keyBy(0).sum(1)
     // 4. 数据终端-sink
     resultDataStream.printToErr()
     // 5. 触发执行-execute
     env.execute(FlinkWordCount.getClass.getSimpleName.stripSuffix("$"))
   }
 }

4. Source 数据源

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#data-sources

image-20210206100955675

4.1 基于Socket的Source

一般用于学习测试

  • 需求:

    1.在node1上使用 nc -lk 9999 向指定端口发送数据 nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 如果没有该命令可以下安装:yum install -y nc 2.使用Flink编写流处理应用程序实时统计单词数量

  • 代码实现:3.1可见

4.2 基于集合的Source

一般用于学习测试,和批处理的API类似,不再演示。

 package cn.itcast.flink.source.basic;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import java.util.Arrays;
 /**
 * Flink 流计算数据源:基于集合的Source,分别为可变参数、集合和自动生成数据
 */
 public class StreamSourceCollectionDemo {
     public static void main(String[] args) throws Exception {
     // 1. 执行环境-env
     StreamExecutionEnvironment env =                StreamExecutionEnvironment.getExecutionEnvironment();
     env.setParallelism(2) ;
     // 2. 数据源
     DataStreamSource<String> dataStream01 = env.fromElements("spark", "flink", "mapreduce");
     dataStream01.printToErr();
     DataStreamSource<String> dataStream02 =             env.fromCollection(Arrays.asList("spark", "flink", "mapreduce"));
     dataStream02.print();
     DataStreamSource<Long> dataStream03 = env.generateSequence(1, 10);
     dataStream03.printToErr();
     // 5. 触发执行-execute
     env.execute(StreamSourceCollectionDemo.class.getSimpleName());
 }
 }

4.3 基于文件的Source

一般用于学习测试,和批处理的API类似,不再演示

 package cn.itcast.flink.source.basic;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 /**
 * Flink 流计算数据源:基于文件的Source
 */
 public class StreamSourceFileDemo {
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(2) ;
 // 2. 数据源
 DataStreamSource<String> dataStream = env.readTextFile("datas/wordcount.data");
 dataStream.printToErr();
 // 5. 触发执行-execute
 env.execute(StreamSourceFileDemo.class.getSimpleName());
 }
 }
 ​

4.4 自定义Source:随机生成数据

  • API

    一般用于学习测试,模拟生成一些数据

    Flink提供数据源接口,可以实现自定义数据源,不同的接口有不同的功能,分类如下:

    SourceFunction:非并行数据源(并行度parallelism=1) RichSourceFunction:多功能非并行数据源(并行度parallelism=1) ParallelSourceFunction:并行数据源(并行度parallelism>=1) RichParallelSourceFunction:多功能并行数据源(parallelism>=1),Kafka数据源使用该接口

  • 需求

    每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

  • 要求:

    • 随机生成订单ID:UUID ·

    • 随机生成用户ID:0-2

    • 随机生成订单金额:0-100

    • 时间戳为当前系统时间:current_timestamp

  • 代码实现

 package cn.itcast.flink.source.customer;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 ​
 ​

4.5 自定义Source:MySQL

  • 需求

    实际开发中没经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据

    需求:从MySQL中实时加载数据,要求MySQL中的数据有变化,也能被实时加载出来

  • 准备数据

     CREATE DATABASE IF NOT EXISTS db_flink ;
     USE db_flink ;
     CREATE TABLE `t_student` (
     `id` int(11) NOT NULL AUTO_INCREMENT,
     `name` varchar(255) DEFAULT NULL,
     `age` int(11) DEFAULT NULL,
     PRIMARY KEY (`id`)
     ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
     INSERT INTO `t_student` VALUES ('1', 'jack', 18);
     INSERT INTO `t_student` VALUES ('2', 'tom', 19);
     INSERT INTO `t_student` VALUES ('3', 'rose', 20);
     INSERT INTO `t_student` VALUES ('4', 'tom', 19);
     INSERT INTO `t_student` VALUES ('5', 'jack', 18);
     INSERT INTO `t_student` VALUES ('6', 'rose', 20);
     INSERT INTO `t_student` VALUES ('9', 'zhangsan2', 19);
     INSERT INTO `t_student` VALUES ('10', 'lisi2', 21);
  • 代码实现

 package cn.itcast.flink.source;
 ​
 ​
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 ​
 ​
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.concurrent.TimeUnit;
 ​
 /**
  * 从MySQL中实时加载数据:要求MySQL中的数据有变化,也能被实时加载出来
  */
 public class StreamSourceMySQLDemo {
     @Data
     @NoArgsConstructor
     @AllArgsConstructor
     public static class Student {
         private Integer id;
         private String name;
         private Integer age;
     }
 /**
  * 自定义数据源,实时从MySQL表获取数据,实现接口RichParallelSourceFunction
  */
 public static class MySQLSource extends RichParallelSourceFunction<Student>{
     // 标识符,是否实时接收数据
     private boolean isRunning = true ;
     private Connection conn = null;
     private PreparedStatement pstmt = null;
     private ResultSet result = null ;
     private Integer whereId = 0 ;
 ​
     @Override
     public void open(Configuration parameters) throws Exception {
         //1.加载驱动
         Class.forName("com.mysql.jdbc.Driver");
         //2创建连接
         conn= DriverManager.getConnection(
                 "jdbc:mysql://node1.itcast.cn:3306/?useUnicode=true&characterEncoding=utf-8&useSSL=false",
                 "root",
                 "123456"
         );
         //3.创建PreparedStatement
         pstmt = conn.prepareStatement("select id,name,age from db_flink.t_student WHERE id > ?");
 ​
 ​
     }
 ​
     @Override
     public void run(SourceContext<Student> ctx) throws Exception {
         while(isRunning){
             //1、执行查询
             pstmt.setInt(1,whereId);
             result = pstmt.executeQuery();
             //2. 遍历查询结果,收集数据
             while(result.next()){
                 Integer id = result.getInt("id");
                 String name = result.getString("name") ;
                 Integer age = result.getInt("age") ;
                 // 输出
                 ctx.collect(new Student(id, name, age));
                 whereId = id ;
             }
             // 每隔3秒查询一次
             TimeUnit.SECONDS.sleep(3);
         }
     }
 ​
     @Override
     public void close() throws Exception {
         if (null!=result) result.close();
         if (null!=pstmt) pstmt.close();
         if (null!=conn) conn.close();
     }
 ​
     @Override
     public void cancel() {
         isRunning = false ;
     }
 }
 ​
     public static void main(String[] args) throws Exception {
         // 1. 执行环境-env
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         // 2. 数据源-source
         DataStreamSource<Student> studentDataStream = env.addSource(new MySQLSource());
         // 3. 数据终端-sink
         studentDataStream.printToErr();
         // 5. 应用执行-execute
         env.execute(StreamSourceMySQLDemo.class.getSimpleName());
     }
 }

4.6 Kafka Source

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/index.html

image-20210206114123277

4.6.1 API及其版本

Flink 里已经提供了一些绑定的 Connector,例如 Kafka Source 和 Sink,Elasticsearch Sink等。读写 Kafka、ES、RabbitMQ 时可以直接使用相应 connector 的 API 即可,虽然该部分是Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

image-20210206114316363

4.6.2 参数设置

image-20210206114343353

以下参数都必须/建议设置

1.订阅的主题:topic

2.反序列化规则:deserialization 3.消费者属性-集群地址:bootstrap.servers 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理):groupId 5.消费者属性-offset重置规则,如earliest/latest...:offset

6.动态分区检测:dynamic partition detection

image-20210206114445193

4.6.3 Kafka命令

启动Kafka和Zookeeper命令,针对讲师提供虚拟机: zookeeper-daemon.sh start kafka-daemon.sh start ● 查看当前服务器中的所有topic /export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092 ● 创建topic /export/server/kafka/bin/kafka-topics.sh --create --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 --replication-factor 1 --partitions 3 ● 查看某个Topic的详情 /export/server/kafka/bin/kafka-topics.sh --describe --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 ● 删除topic /export/server/kafka/bin/kafka-topics.sh --delete --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 ● 发送消息 /export/server/kafka/bin/kafka-console-producer.sh --topic flink-topic \ --broker-list node1.itcast.cn:9092 ● 消费消息 /export/server/kafka/bin/kafka-console-consumer.sh --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 --from-beginning ● 修改分区 /export/server/kafka/bin/kafka-topics.sh --alter --topic flink-topic \ --bootstrap-server node1.itcast.cn:9092 --partitions 4

4.6.4 代码实现

Flink 实时从Kafka消费数据,底层调用Kafka New Consumer API,演示案例代码如下:

4.6.5 Kafka 消费起始位置

kafka可以被看成一个无限的流,里面的流事数据是短暂的存在的,如果不消费,消息就过期滚动没了。涉及这个问题:如果开始消费,就要定一下从什么位置开始。

image-20210206181112039

  • 第一、earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费;

  • 第二、latest:从最末位置开始消费

  • 第三、per-partition assignment:对每个分区都指定一个offset,再从offset位置开始消费。

    image-20210206181410150

默认情况下,从kafka消费数据时,采用的是:latest,最新偏移量开始消费数据。

在Flink Kafka Consumer 库中,允许用户配置从每个分区的哪个位置position开始消费数据,具体说明如下所示:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

image-20210206181702234

演示代码:

 package cn.itcast.flink.source;
 ​
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.kafka.clients.CommonClientConfigs;
 ​
 import java.util.HashMap;
 import java.util.Properties;
 ​
 public class StreamSourceKafkaOffsetDemo {
     public static void main(String[] args) throws Exception {
         //1.执行环境-env
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(3);
         // 2. 数据源-source:从Kafka 消费数据
         // a. Kafka Consumer消费者配置属性设置
         Properties props = new Properties() ;
         props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092");
         props.setProperty("group.id","test-1001");
         //b.创建FlinkKafkaConsumer对象
         FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
                 "flink-topic", // Topic 名称
                 new SimpleStringSchema(), //
                 props //
         ) ;
         // TODO: 1、Flink从topic中最初的数据开始消费
         //kafkaConsumer.setStartFromEarliest() ;
         // TODO: 2、Flink从topic中最新的数据开始消费
         //kafkaConsumer.setStartFromLatest();
         // TODO: 3、Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
         //kafkaConsumer.setStartFromGroupOffsets() ;
         // TODO: 4、Flink从topic中指定的offset开始,这个比较复杂,需要手动指定offset
         HashMap<KafkaTopicPartition, Long> offsets = new HashMap<>();
         offsets.put(new KafkaTopicPartition("flink-topic", 0), 28L);
         offsets.put(new KafkaTopicPartition("flink-topic", 1), 94L);
         offsets.put(new KafkaTopicPartition("flink-topic", 2), 108L);
         //kafkaConsumer.setStartFromSpecificOffsets(offsets);
         // TODO: 5、Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
         kafkaConsumer.setStartFromTimestamp(1603099781484L);
         // c. 添加数据源
         DataStreamSource<String> kafkaDataStream = env.addSource(kafkaConsumer);
         // 3. 数据终端-sink
         kafkaDataStream.printToErr();
         // 4. 触发执行-execute
         env.execute(StreamSourceKafkaOffsetDemo.class.getSimpleName()) ;
     }
 }

image-20210206182910275

注意:开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的 offset 位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分 历史数据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准 一次需要依赖一些幂等的存储系统或者事务操作。

4.6.6 Kafka 分区发现

实际的生产部环境中可能有这样一些需求,比如:

  • 场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。

image-20210206183112320

  • 场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 6 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 6 个扩容到 12。该情况下如何在不重启作业情况下动态感知新扩容的 partition?

image-20210206183145710

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

image-20210206183214399

针对上面的两种场景,首先在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,及设置的时间间隔。此时FlinkKafkaConsumer内部会启动一个单独的线程定期去Kafka获取最新的meta信息。

  • 针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。

image-20210206183338254

  • 针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。

5. Transformation 数据转换

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html

image-20210206183433673

5.1 说明

流处理的很多API和批处理类似,也包括一系列的Transformation操作,如map、flatMap、filter、sum、reduce……等等,所以这些类似的就不再一一讲解,主要讲解和批处理不一样的一些操作。

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html

image-20210206183519740

整体来说,流式数据上的操作可以分为四类:

  • 第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)

  • 第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理

  • 第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。

  • 第四类是DataStream支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

5.2 英文版API解释

DataStream中常用API 函数,相关说明如下所示:

image-20210206183758167

image-20210206183806296

image-20210206183822946

image-20210206183832609

image-20210206183841014

5.3 中文版API解释

image-20210206183855772

image-20210206183905180

image-20210206183915470

image-20210206183924410

5.4 keyBy

按照指定的key来对流中的数据进行分组,前面入门案例中已经演示过

image-20210206183953136

注意:流处理中没有groupBy,而是keyBy

5.5 union和connect

  • API

    union:可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

image-20210206184224529

connect:提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  • ’connect只能连接两个数据流,union可以连接多个数据流;connect所连接的两个数据流的数据类型可以不一致,union所连

  • connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

image-20210206184319475

  • 需求

    将两个String类型的流进行union

    将一个String类型和一个Long类型的流进行connect

  • 代码实现

 package cn.itcast.flink.transformation;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 /**
 * Flink 流计算中转换函数:合并union和连接connect
 */
 public class StreamUnionConnectDemo {
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(2);
 // 2. 数据源-source
 DataStreamSource<String> dataStream01 = env.fromElements("A", "B", "C", "D");
 DataStreamSource<String> dataStream02 = env.fromElements("aa", "bb", "cc", "dd");
 DataStreamSource<Integer> dataStream03 = env.fromElements(1, 2, 3, 4);
 // 3. 数据转换-transformation
 // TODO: 相同类型DataStream进行union合并操作
 DataStream<String> unionDataStream = dataStream01.union(dataStream02);
 unionDataStream.printToErr();
 // TODO: 将2个DataStream进行连接connect操作
 ConnectedStreams<String, Integer> connectDataStream = dataStream01.connect(dataStream03);
 SingleOutputStreamOperator<String> dataStream = connectDataStream.map(
 new CoMapFunction<String, Integer, String>() {
 @Override
 public String map1(String value) throws Exception {
 return "map1 -> " + value;
 }
 @Override
 public String map2(Integer value) throws Exception {
 return "map2 -> " + value;
 }
 }
 );
 dataStream.print();
 // 5. 应用执行
 env.execute(StreamUnionConnectDemo.class.getSimpleName());
 }
 }

5.6 split和select

image-20210206184546580

  • API

Split就是将一个流分成多个流,Select就是获取分流后对应的数据

image-20210206184626204

  • 需求:

对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

  • 代码实现:

     package cn.itcast.flink.transformation;
     import org.apache.flink.streaming.api.collector.selector.OutputSelector;
     import org.apache.flink.streaming.api.datastream.DataStream;
     import org.apache.flink.streaming.api.datastream.DataStreamSource;
     import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
     import org.apache.flink.streaming.api.datastream.SplitStream;
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     import org.apache.flink.streaming.api.functions.ProcessFunction;
     import org.apache.flink.util.Collector;
     import org.apache.flink.util.OutputTag;
     import java.util.Arrays;
     /**
     * Flink 流计算中转换函数:split流的拆分和select流的选择
     */
     public class StreamSplitSelectDemo {
     public static void main(String[] args) throws Exception {
     // 1. 执行环境-env
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.setParallelism(1);
     // 2. 数据源-source
     DataStreamSource<Long> dataStream = env.generateSequence(1, 20);
     // 3. 数据转换-transformation
     // TODO: 流的拆分,按照奇数和偶数拆分,使用split函数
     SplitStream<Long> splitDataStream = dataStream.split(new OutputSelector<Long>() {
     @Override
     public Iterable<String> select(Long value) {
     String flag = value % 2 == 0 ? "even" : "odd" ;
     return Arrays.asList(flag);
     }
     });
     // 获取拆分后的流
     DataStream<Long> evenDataStream = splitDataStream.select("even");
     //evenDataStream.print();
     DataStream<Long> oddDataStream = splitDataStream.select("odd");
     //oddDataStream.printToErr();
     // TODO: 流的拆分,方式二,调用process函数
     OutputTag<Long> evenTag = new OutputTag<Long>("even"){};
     OutputTag<Long> oddTag = new OutputTag<Long>("odd"){};
     SingleOutputStreamOperator<Long> processDataStream = dataStream.process(
     new ProcessFunction<Long, Long>() {
     @Override
     public void processElement(Long value,
     Context ctx,
     Collector<Long> out) throws Exception {
     if(value % 2 == 0){
     ctx.output(evenTag, value);
     }else{
     ctx.output(oddTag, value);
     }
     }
     }
     );
     // 获取分离的流
     DataStream<Long> evenStream = processDataStream.getSideOutput(evenTag);
     evenStream.print();
     DataStream<Long> oddStream = processDataStream.getSideOutput(oddTag);
     oddStream.printToErr();
     // 5. 应用执行-execute
     env.execute(StreamSplitSelectDemo.class.getSimpleName());
     }
     }

     

     

    5.7 分区

  • API

    image-20210206185042208

recale分区,基于上下游Operator并行度,将记录以循环的方式输出到下游Operator每个实例。

  • 举例

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度 上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。

image-20210206185132258

若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上 游另两个并行度将记录输出到下游另一个并行度上。

image-20210206185148382

  • 需求

对流中的元素使用各种分区,并输出

  • 代码实现

 package cn.itcast.flink.transformation;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 /**
 * Flink 流计算中转换函数:对流数据进行分区,函数如下:
 * global、broadcast、forward、shuffle、rebalance、rescale、partitionCustom
 */
 public class StreamRepartitionDemo {
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(2);
 // 2. 数据源-source
 DataStreamSource<Tuple2<Integer, String>> dataStream = env.fromElements(
 Tuple2.of(1, "a"), Tuple2.of(2, "b"), Tuple2.of(3, "c"), Tuple2.of(4, "d")
 );
 //dataStream.printToErr();
 // 3. 数据转换-transformation
 // TODO: 1、global函数,将所有数据发往1个分区Partition
 DataStream<Tuple2<Integer, String>> globalDataStream = dataStream.global();
 // globalDataStream.print();
 // TODO: 2、broadcast函数, 广播数据
 DataStream<Tuple2<Integer, String>> broadcastDataStream = dataStream.broadcast();
 //broadcastDataStream.printToErr();
 // TODO: 3、forward函数,上下游并发一样时 一对一发送
 DataStream<Tuple2<Integer, String>> forwardDataStream = dataStream.forward();
 //forwardDataStream.print().setParallelism(1) ;
 // TODO: 4、shuffle函数,随机均匀分配
 DataStream<Tuple2<Integer, String>> shuffleDataStream = dataStream.shuffle();
 //shuffleDataStream.printToErr();
 // TODO: 5、rebalance函数,轮流分配
 DataStream<Tuple2<Integer, String>> rebalanceDataStream = dataStream.rebalance();
 //rebalanceDataStream.print() ;
 // TODO: 6、rescale函数,本地轮流分配
 DataStream<Tuple2<Integer, String>> rescaleDataStream = dataStream.rescale();
 //rescaleDataStream.printToErr();
 // TODO: 7、partitionCustom函数,自定义分区规则
 DataStream<Tuple2<Integer, String>> customDataStream = dataStream.partitionCustom(
 new Partitioner<Integer>() {
 @Override
 public int partition(Integer key, int numPartitions) {
     return key % 2;
 }
 },
 0
 );
 //customDataStream.print();
 // 4. 数据终端-sink
 env.execute(StreamRepartitionDemo.class.getSimpleName());
 }
 }

6. Sink 数据终端

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks

6.1 基于控制台和文件的Sink

直接参考批处理的API即可,学习测试会使用,开发中更多的是数据实时处理统计分析完之后存入MySQL/Kafka/Redis/HBase......

image-20210206185433937

案例演示:将词频统计结果数据存储至文本文件中,代码如下所示:

 package cn.itcast.flink.sink;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
 public class StreamSinkFileDemo {
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env:流计算执行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1) ;
 // 2. 数据源-source:Socket接收数据
 DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
 // 3. 转换处理-transformation:调用DataSet函数,处理数据
 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
 // a. 过滤数据
 .filter(new FilterFunction<String>() {
 @Override
 public boolean filter(String line) throws Exception {
 return null != line && line.trim().length() > 0;
 }
 })
 // b. 分割单词
 .flatMap(new FlatMapFunction<String, String>() {
 @Override
 public void flatMap(String line, Collector<String> out) throws Exception {
 String[] words = line.trim().split("\\W+");
 for (String word : words) {
 out.collect(word);
 }
 }
 })
 // c. 转换二元组,表示每个单词出现一次
 .map(new MapFunction<String, Tuple2<String, Integer>>() {
 @Override
 public Tuple2<String, Integer> map(String word) throws Exception {
 return Tuple2.of(word, 1);
 }
 })
 // d. 按照单词分组及对组内聚合操作
 .keyBy(0).sum(1);
 // d. 数据终端-sink:数据终端-sink:保存至文件
 resultDataStream
 .setParallelism(1)
 .writeAsText("datas/stream-output.txt", FileSystem.WriteMode.OVERWRITE);
 // e. 执行应用-execute
 env.execute(StreamSinkFileDemo.class.getSimpleName()) ;
 }
 }

 

6.2 自定义Sink:MySQL

  • 需求:

    将Flink集合中的数据通过自定义Sink保存到MySQL

  • 代码实现:

 package cn.itcast.flink.sink.mysql;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 /**
 * 案例演示:自定义Sink,将数据保存至MySQL表中,继承RichSinkFunction
 */
 public class StreamSinkMySQLDemo {
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 private static class Student{
 private Integer id ;
 private String name ;
 private Integer age ;
 }
 /**
 * 自定义Sink,将DataStream数据写入到外部存储MySQL数据库表中
 */
 private static class MySQLSink extends RichSinkFunction<Student> {
 private Connection conn = null;
 private PreparedStatement pstmt = null;
 // 计数
 private Integer counter = 0 ;
 @Override
 public void open(Configuration parameters) throws Exception {
 // 1. 加载驱动
 Class.forName("com.mysql.jdbc.Driver");
 // 2. 创建连接
 conn = DriverManager.getConnection(
 "jdbc:mysql://node1.itcast.cn:3306/?useUnicode=true&characterEncoding=utf-8&useSSL=false",
 "root", "123456"
 );
 // 3. 创建PreparedStatement
 pstmt = conn.prepareStatement("insert into db_flink.t_student (id, name, age) values (?, ?, ?)");
 }
     @Override
 public void invoke(Student student, Context context) throws Exception {
 try{
 // 设置参数的值
 pstmt.setInt(1, student.id);
 pstmt.setString(2, student.name);
 pstmt.setInt(3, student.age);
 // 加入批次
 pstmt.addBatch();
 counter ++ ;
 if(counter >= 10){
 pstmt.executeBatch(); // 批量插入
 counter = 0 ;
 }
 }catch (Exception e){
 e.printStackTrace();
 }
 }
 @Override
 public void close() throws Exception {
 try{
 if(counter > 0){
 // 批量插入
 pstmt.executeBatch();
 }
 }catch (Exception e){
 e.printStackTrace();
 }finally {
 if(null != pstmt) pstmt.close();
 if(null != conn) conn.close();
 }
 }
 }
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
 env.setParallelism(1);
 // 2. 数据源-source
 DataStreamSource<Student> inputDataStream = env.fromElements(
 new Student(13, "wangwu", 20),
 new Student(14, "zhaoliu", 19),
 new Student(15, "wangwu", 20),
 new Student(16, "zhaoliu", 19)
 );
 // 3. 数据终端-sink
 inputDataStream.addSink(new MySQLSink());
 // 4. 应用执行-execute
 env.execute(StreamSinkMySQLDemo.class.getSimpleName());
 }
 }

此外,从Flink 1.11开始,提供JDBC Connector,更加方便保存数据至RDBMs表中,演示保存数据MySQL数据库表中,代码如下所示:

 package cn.itcast.flink.sink.mysql;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.JdbcSink;
 import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 /**
 * Flink 流计算,官方自带Connector,将数据保存写入RDBMs数据库表中,比如MySQL表中
 */
 public class StreamSinkJdbcDemo {
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 private static class Student{
 private Integer id ;
 private String name ;
 private Integer age ;
 }
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 // 2. 数据源-source
 DataStreamSource<Student> studentDataStream = env.fromElements(
 new Student(21, "wangwu3", 20),
 new Student(22, "zhaoliu4", 19),
 new Student(23, "wangwu5", 20),
 new Student(24, "zhaoliu6", 19)
 );
 // 3. 数据终端-sink
 studentDataStream.addSink(
 JdbcSink.sink(
 "insert into db_flink.t_student (id, name, age) values (?, ?, ?)", //
 new JdbcStatementBuilder<Student>(){
 @Override
 public void accept(PreparedStatement pstmt,
 Student student) throws SQLException {
 pstmt.setInt(1, student.id);
 pstmt.setString(2, student.name);
 pstmt.setInt(3, student.age);
 }
 },
     new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
 .withDriverName("com.mysql.jdbc.Driver")
 .withUrl("jdbc:mysql://node1.itcast.cn:3306/")
 .withUsername("root")
 .withPassword("123456")
 .build()
 )
 );
 // 4. 触发执行-execute
 env.execute(StreamSinkJdbcDemo.class.getSimpleName());

6.3 Kafka Sink

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-producer

  • 需求

    将Flink集合中的数据通过自定义Sink保存到Kafka

  • 代码实现

 package cn.itcast.flink.sink.kafka;
 import cn.itcast.flink.source.mysql.StreamSourceMySQLDemo;
 import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import javax.annotation.Nullable;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
 /**
 * 案例演示:将数据保存至Kafka Topic中,直接使用官方提供Connector
 * /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic flink-topic
 */
 public class StreamSinkKafkaDemo {
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 private static class Student{
     private Integer id ;
 private String name ;
 private Integer age ;
 }
 /**
 * 自定义KafkaSerializationSchema实现类
 */
 private static class KafkaSchema implements KafkaSerializationSchema<String> {
 private String topic ;
 public KafkaSchema(String topicName){
 this.topic = topicName ;
 }
 @Override
 public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
 ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
 topic, element.getBytes(StandardCharsets.UTF_8)
 );
 return record;
 }
 }
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
 env.setParallelism(1);
 // 2. 数据源-source
 DataStreamSource<StreamSourceMySQLDemo.Student> studentDataStream = env.addSource(
 new StreamSourceMySQLDemo.MySQLSource()
 );
 // 3. 数据转换-transformation
 SingleOutputStreamOperator<String> jsonDataStream = studentDataStream.map(
 new MapFunction<StreamSourceMySQLDemo.Student, String>() {
 @Override
 public String map(StreamSourceMySQLDemo.Student student) throws Exception {
 return JSON.toJSONString(student);
 }
 }
 );
 // 4. 数据终端-sink
 String topic = "flink-topic" ;
 // a. Kafka 生产者配置属性
 Properties props = new Properties();
 props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092");
 // b. Kafka 数据序列化Schema信息x
 KafkaSerializationSchema<String> kafkaSchema = new KafkaSchema(topic);
 // c. 创建FlinkKafkaProducer对象
 FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
 topic, //
 kafkaSchema, //
 props,
  FlinkKafkaProducer.Semantic.EXACTLY_ONCE //
 );
 // d. 添加Sink
 jsonDataStream.addSink(kafkaProducer);
 // 5. 应用执行-execute
 env.execute(StreamSinkKafkaDemo.class.getSimpleName());
 }
 }   

6.4 Redis Sink

  • API

通过Flink 操作Redis 其实可以通过传统的Jedis 连接池JedisPool 进行Redis 的相关操作,但 是Flink 提供了专门操作Redis 的RedisSink,使用起来更方便,而且不用考虑性能的问题,接下来将主要介绍RedisSink 如何使用。 https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现 这个接口中的三个方法,如下所示

1.getCommandDescription() :

设置使用的Redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型

2.String getKeyFromData(T data): 设置value 中的键值对key的值 3.String getValueFromData(T data); 设置value 中的键值对value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系

image-20210206185918196

可以连接到不同Redis环境(单机Redis服务、集群Redis服务及Sentinel Redis服务),配置 Config:

image-20210206185933065

  • 需求

将Flink集合中的数据通过自定义Sink保存到Redis

  • 代码实现

 package cn.itcast.flink.sink.redis;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.redis.RedisSink;
 import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 import org.apache.flink.util.Collector;
 /**
 * 案例演示:将数据保存至Redis中,直接使用官方提供Connector
 * https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
 */
 public class StreamSinkRedisDemo {
 /**
 * 自定义RedisMapper,实现其中三个方法,分别为命令、key和Value
 */
 private static class StreamRedisMapper implements RedisMapper<Tuple2<String, Integer>> {
 @Override
 public RedisCommandDescription getCommandDescription() {
 return new RedisCommandDescription(RedisCommand.HSET, "wordcount");
 }
   @Override
 public String getKeyFromData(Tuple2<String, Integer> data) {
 return data.f0;
 }
 @Override
 public String getValueFromData(Tuple2<String, Integer> data) {
 return Integer.toString(data.f1);
 }
 }
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
 env.setParallelism(1);
 // 2. 数据源-source:Socket接收数据
 DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
 // 3. 转换处理-transformation:调用DataSet函数,处理数据
 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
 // a. 过滤数据
 .filter(new FilterFunction<String>() {
 @Override
 public boolean filter(String line) throws Exception {
 return null != line && line.trim().length() > 0;
 }
 })
 // b. 分割单词
 .flatMap(new FlatMapFunction<String, String>() {
 @Override
 public void flatMap(String line, Collector<String> out) throws Exception {
 String[] words = line.trim().split("\\W+");
 for (String word : words) {
 out.collect(word);
 }
 }
 })
 // c. 转换二元组,表示每个单词出现一次
 .map(new MapFunction<String, Tuple2<String, Integer>>() {
 @Override
 public Tuple2<String, Integer> map(String word) throws Exception {
 return Tuple2.of(word, 1);
 }
 })
 // d. 按照单词分组及对组内聚合操作
 .keyBy(0).sum(1);
 // 4. 数据终端-sink
 // a. Redis 服务配置设置
 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
 .setHost("node1.itcast.cn")
 .setPort(6379)
 .setDatabase(0)
 .setMinIdle(1) 
     .setMaxIdle(8)
 .setMaxTotal(8)
 .build();
 // b. 创建RedisSink对象
 RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(
 config, new StreamRedisMapper()
 ) ;
 // c. 添加Sink
 resultDataStream.addSink(redisSink);
 // 5. 触发执行
 env.execute(StreamSinkRedisDemo.class.getSimpleName());
 }
 }

 

 

 

附录一、创建Maven模块

1)、Maven 工程结构

1603150617118

Maven Module模块GAV三要素:

 <parent>
         <artifactId>course-flink</artifactId>
         <groupId>cn.itcast.flink</groupId>
         <version>1.0.0</version>
     </parent>
 ​
     <artifactId>flink-day03</artifactId>

 

2)、POM 文件内容

Maven 工程POM文件中内容(依赖包):

     <repositories>
         <repository>
             <id>apache.snapshots</id>
             <name>Apache Development Snapshot Repository</name>
             <url>https://repository.apache.org/content/repositories/snapshots/</url>
             <releases>
                 <enabled>false</enabled>
             </releases>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>
         </repository>
     </repositories>
 ​
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <java.version>1.8</java.version>
         <maven.compiler.source>${java.version}</maven.compiler.source>
         <maven.compiler.target>${java.version}</maven.compiler.target>
         <flink.version>1.10.0</flink.version>
         <scala.version>2.11.12</scala.version>
         <scala.binary.version>2.11</scala.binary.version>
         <mysql.version>5.1.48</mysql.version>
     </properties>
 ​
     <dependencies>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>${scala.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-scala_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-clients_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
 ​
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
 ​
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-kafka_2.11</artifactId>
             <version>${flink.version}</version>
         </dependency>
 ​
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-jdbc_2.11</artifactId>
             <version>1.11.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-hadoop-2-uber</artifactId>
             <version>2.7.5-10.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.bahir</groupId>
             <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
             <version>1.0</version>
         </dependency>
 ​
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <version>${mysql.version}</version>
         </dependency>
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.68</version>
         </dependency>
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
             <version>1.18.12</version>
         </dependency>
 ​
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>1.7.7</version>
             <scope>runtime</scope>
         </dependency>
         <dependency>
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
             <version>1.2.17</version>
             <scope>runtime</scope>
         </dependency>
 ​
     </dependencies>
 ​
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>
         <plugins>
             <!-- 编译插件 -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.5.1</version>
                 <configuration>
                     <source>1.8</source>
                     <target>1.8</target>
                     <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                 </configuration>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>2.18.1</version>
                 <configuration>
                     <useFile>false</useFile>
                     <disableXmlReport>true</disableXmlReport>
                     <includes>
                         <include>**/*Test.*</include>
                         <include>**/*Suite.*</include>
                     </includes>
                 </configuration>
             </plugin>
             <!-- 打jar包插件(会包含所有依赖) -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <version>2.3</version>
                 <executions>
                     <execution>
                         <phase>package</phase>
                         <goals>
                             <goal>shade</goal>
                         </goals>
                         <configuration>
                             <filters>
                                 <filter>
                                     <artifact>*:*</artifact>
                                     <excludes>
                                         <!--
                                         zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                         <exclude>META-INF/*.SF</exclude>
                                         <exclude>META-INF/*.DSA</exclude>
                                         <exclude>META-INF/*.RSA</exclude>
                                     </excludes>
                                 </filter>
                             </filters>
                             <transformers>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                     <!-- 可以设置jar包的入口类(可选) -->
                                     <!--
                                     <mainClass>com.itcast.flink.batch.FlinkBatchWordCount</mainClass>
                                     -->
                                 </transformer>
                             </transformers>
                         </configuration>
                     </execution>
                 </executions>
             </plugin>
         </plugins>
     </build>

 

附录二、Flink on YARN

Flink支持增量迭代,具有对迭代自行优化的功能,因此在on yarn上提交的任务性能略好于 Spark,Flink提供2种方式在YARN上提交任务:启动1个一直运行的 Yarn session(分离模式)和在 Yarn 上运行1个 Flink 任务(客户端模式)。

img

 

  • Session 会话模式:

    • 通过命令yarn-session.sh的启动方式本质上是在yarn集群上启动一个flink集群

    • 由yarn预先给flink集群分配若干个container,在yarn的界面上只能看到一个Flink session with X TaskManagers的任务,并且只有一个Flink界面,可以从Yarn的Application Master链接进入;

  • Job 分离模式:

    • 通过命令bin/flink run -m yarn-cluster启动,每次发布1个任务,本质上给每个Flink任务启动了1个集群,yarn在任务发布时启动JobManager(对应Yarn的AM)和TaskManager;

    • 如果一个任务指定了n个TaksManager(-yn n),则会启动n+1个Container,其中一个是JobManager,发布m个应用,则有m个Flink界面,不同的任务不可能在一个Container(JVM)中,实现了资源隔离。

 

1、Session 会话模式

通过命令yarn-session.sh先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。

img

 

2、Job 分离模式

通过命令bin/flink run -m yarn-cluster提交任务,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行,适合规模大长时间运行的作业;

img

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐