独立( Standalone )模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他
第三方资源框架的耦合性,独立性非常强。但我们知道, Flink 是大数据计算框架,不是资源
调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架
集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所
以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。
        整体来说, YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn ResourceManager,
Yarn ResourceManager 会向 Yarn NodeManager 申请容器。在这些容器上, Flink 会部署
JobManager TaskManager 的实例,从而启动集群。 Flink 会根据运行在 JobManger 上的作业
所需要的 Slot 数量动态分配 TaskManager 资源。

1、相关环境配置

         在 Flink1.8.0 之前的版本,想要以 YARN 模式部署 Flink 任务时,需要 Flink 是有 Hadoop
支持的。从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要 Hadoop 的环境
支持,需要自行在官网下载 Hadoop 相关版本的组件 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
并将该组件上传至 Flink lib 目录下。在 Flink 1.11.0 版本之后,增加了很多重要新特性,其
中就包括增加了对 Hadoop3.0.0 以及更高版本 Hadoop 的支持,不再提供“ flink-shaded-hadoop-*
jar 包,而是通过配置环境变量完成与 YARN 集群的对接。
         在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop ,保证 Hadoop
版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。
具体配置步骤如下:
1 )按照 3.1 节所述,下载并解压安装包,并将解压后的安装包重命名为 flink-1.13.0-yarn
本节的相关操作都将默认在此安装路径下执行。
(2)配置环境变量,增加环境变量配置如下:
$ sudo vim /etc/profile.d/my_env.sh
HADOOP_HOME=/opt/module/hadoop-2.7.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
这里必须保证设置了环境变量 HADOOP_CLASSPATH
(3)启动 Hadoop 集群,包括 HDFS YARN
[atguigu@hadoop102 ~]$ start-dfs.sh
[atguigu@hadoop103 ~]$ start-yarn.sh

分别在 3 台节点服务器查看进程启动情况。

[atguigu@hadoop102 ~]$ jps
5190 Jps
5062 NodeManager
4408 NameNode
4589 DataNode
[atguigu@hadoop103 ~]$ jps
5425 Jps
4680 ResourceManager
5241 NodeManager
4447 DataNode
[atguigu@hadoop104 ~]$ jps
4731 NodeManager
4333 DataNode
4861 Jps
4478 SecondaryNameNode

(4)进入 conf 目录,修改 flink-conf.yaml 文件,修改以下配置,这些配置项的含义在进
Standalone 模式配置的时候进行过讲解,若在提交命令中不特定指明,这些配置将作为默认
配置。
$ cd /opt/module/flink-1.13.0-yarn/conf/
$ vim flink-conf.yaml
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1

2、会话模式部署

YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话( YARN session
来启动 Flink 集群。具体步骤如下:

1. 启动集群

1 )启动 hadoop 集群 (HDFS, YARN)
(2)执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。
$ bin/yarn-session.sh -nm test
可用参数解读:
-d :分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,
43 44
即使关掉当前对话窗口, YARN session 也可以后台运行。
-jm(--jobManagerMemory) :配置 JobManager 所需内存,默认单位 MB
-nm(--name) :配置在 YARN UI 界面上显示的任务名。
-qu(--queue) :指定 YARN 队列名。
-tm(--taskManager) :配置每个 TaskManager 所使用内存。
注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,
YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也
不会把集群资源固定,同样是动态分配的。
YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID ,如下所示,
用户可以通过 web UI 或者命令行两种方式提交作业。

2. 提交作业

1 )通过 Web UI 提交作业
这种方式比较简单,与上文所述 Standalone 部署模式基本相同。
(2)通过命令行提交作业
① 将 Standalone 模式讲解中打包好的任务运行 JAR 包上传至集群
② 执行以下命令将该任务提交到已经开启的 Yarn-Session 中运行。
$ bin/flink run
-c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
客户端可以自行确定 JobManager 的地址,也可以通过-m 或者-jobmanager 参数指定
JobManager 的地址,JobManager 的地址在 YARN Session 的启动页面中可以找到。
③ 任务提交成功后,可在 YARN Web UI 界面查看运行情况。
如图 3-14 所示,从图中可以看到我们创建的 Yarn-Session 实际上是一个 Yarn
Application ,并且有唯一的 Application ID
④也可以通过 Flink Web UI 页面查看提交任务的运行情况,如图 3-15 所示。

3、单作业模式部署

YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一
个单独的作业,从而启动一个 Flink 集群。
1 )执行命令提交作业。
$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT.jar

 早期版本也有另一种写法:

$ bin/flink run
-m yarn-cluster
-c
com.atguigu.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT.jar

注意这里是通过参数 -m yarn-cluster 指定向 YARN 集群提交任务。
(2)在 YARN ResourceManager 界面查看执行情况,如图 3-16 所示

 

点击可以打开 Flink Web UI 页面进行监控,如图 3-17 所示:

(3)可以使用命令行查看或取消作业,命令如下。
$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
<jobId>
这里的 application_XXXX_YY 是当前应用的 ID,<jobId>是作业的 ID。注意如果取消作
业,整个 Flink 集群也会停掉。

4、应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。
1 )执行命令提交作业。
$ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT.jar
(2)在命令行中查看或取消作业。
$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink cancel
-t yarn-application
-Dyarn.application.id=application_XXXX_YY <jobId>
(3)也可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。
$ ./bin/flink run-application
-t yarn-application
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"
hdfs://myhdfs/jars/my-application.jar
这种方式下 jar 可以预先上传到 HDFS ,而不需要单独发送到集群,这就使得作业提交更
加轻量了。

5、高可用

       YARN 模式的高可用和独立模式( Standalone )的高可用原理不一样。 Standalone 模式中 , 同时启动多个 JobManager, 一个为“领导者”( leader ),其他为“后备” standby) , leader 挂了 , 其他的才会有一个成为 leader 。 而 YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后 , YARN 会再次 启动一个, 所以其实是利用的 YARN 的重试次数来实现的高可用。
1 )在 yarn-site.xml 中配置。
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
注意: 配置完不要忘记分发, 和重启 YARN。
(2)在 flink-conf.yaml 中配置。
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha
high-availability.zookeeper.quorum:
hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-yarn
(3)启动 yarn-session
(4)杀死 JobManager, 查看复活情况。
注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该
小于这个值。
Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐