Flink在Yarn模式部署和命令
flink的yarn提交模式
·
独立(
Standalone
)模式由
Flink
自身提供资源,无需其他框架,这种方式降低了和其他
第三方资源框架的耦合性,独立性非常强。但我们知道,
Flink
是大数据计算框架,不是资源
调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架
集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是
YARN
了。所
以接下来我们就将学习,在强大的
YARN
平台上
Flink
是如何集成部署的。
整体来说,
YARN
上部署的过程是:客户端把
Flink
应用提交给
Yarn
的
ResourceManager,
Yarn
的
ResourceManager
会向
Yarn
的
NodeManager
申请容器。在这些容器上,
Flink
会部署
JobManager
和
TaskManager
的实例,从而启动集群。
Flink
会根据运行在
JobManger
上的作业
所需要的
Slot
数量动态分配
TaskManager
资源。
1、相关环境配置
在
Flink1.8.0
之前的版本,想要以
YARN
模式部署
Flink
任务时,需要
Flink
是有
Hadoop
支持的。从
Flink 1.8
版本开始,不再提供基于
Hadoop
编译的安装包,若需要
Hadoop
的环境
支持,需要自行在官网下载
Hadoop
相关版本的组件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
,
并将该组件上传至
Flink
的
lib
目录下。在
Flink 1.11.0
版本之后,增加了很多重要新特性,其
中就包括增加了对
Hadoop3.0.0
以及更高版本
Hadoop
的支持,不再提供“
flink-shaded-hadoop-*
”
jar
包,而是通过配置环境变量完成与
YARN
集群的对接。
在将
Flink
任务部署至
YARN
集群之前,需要确认集群是否安装有
Hadoop
,保证
Hadoop
版本至少在
2.2
以上,并且集群中安装有
HDFS
服务。
具体配置步骤如下:
(
1
)按照
3.1
节所述,下载并解压安装包,并将解压后的安装包重命名为
flink-1.13.0-yarn
,
本节的相关操作都将默认在此安装路径下执行。
(2)配置环境变量,增加环境变量配置如下:
$ sudo vim /etc/profile.d/my_env.shHADOOP_HOME=/opt/module/hadoop-2.7.5export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbinexport HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoopexport HADOOP_CLASSPATH=`hadoop classpath`
这里必须保证设置了环境变量
HADOOP_CLASSPATH
。
(3)启动
Hadoop
集群,包括
HDFS
和
YARN
。
[atguigu@hadoop102 ~]$ start-dfs.sh[atguigu@hadoop103 ~]$ start-yarn.sh
分别在 3 台节点服务器查看进程启动情况。
[atguigu@hadoop102 ~]$ jps5190 Jps5062 NodeManager4408 NameNode4589 DataNode[atguigu@hadoop103 ~]$ jps5425 Jps4680 ResourceManager5241 NodeManager4447 DataNode[atguigu@hadoop104 ~]$ jps4731 NodeManager4333 DataNode4861 Jps4478 SecondaryNameNode
(4)进入
conf
目录,修改
flink-conf.yaml
文件,修改以下配置,这些配置项的含义在进
行
Standalone
模式配置的时候进行过讲解,若在提交命令中不特定指明,这些配置将作为默认
配置。
$ cd /opt/module/flink-1.13.0-yarn/conf/$ vim flink-conf.yamljobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mtaskmanager.numberOfTaskSlots: 8parallelism.default: 1
2、会话模式部署
YARN
的会话模式与独立集群略有不同,需要首先申请一个
YARN
会话(
YARN session
)
来启动
Flink
集群。具体步骤如下:
1. 启动集群
(
1
)启动
hadoop
集群
(HDFS, YARN)
。
(2)执行脚本命令向 YARN 集群申请资源,开启一个
YARN
会话,启动
Flink
集群。
$ bin/yarn-session.sh -nm test
可用参数解读:
⚫
-d
:分离模式,如果你不想让
Flink YARN
客户端一直前台运行,可以使用这个参数,
43
44
即使关掉当前对话窗口,
YARN session
也可以后台运行。
⚫
-jm(--jobManagerMemory)
:配置
JobManager
所需内存,默认单位
MB
。
⚫
-nm(--name)
:配置在
YARN UI
界面上显示的任务名。
⚫
-qu(--queue)
:指定
YARN
队列名。
⚫
-tm(--taskManager)
:配置每个
TaskManager
所使用内存。
注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,
YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也
不会把集群资源固定,同样是动态分配的。
YARN Session
启动之后会给出一个
web UI
地址以及一个
YARN application ID
,如下所示,
用户可以通过
web UI
或者命令行两种方式提交作业。
2. 提交作业
(
1
)通过
Web UI
提交作业
这种方式比较简单,与上文所述
Standalone
部署模式基本相同。
(2)通过命令行提交作业
① 将
Standalone
模式讲解中打包好的任务运行
JAR
包上传至集群
② 执行以下命令将该任务提交到已经开启的
Yarn-Session
中运行。
$ bin/flink run-c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
客户端可以自行确定 JobManager 的地址,也可以通过-m 或者-jobmanager 参数指定
JobManager 的地址,JobManager 的地址在 YARN Session 的启动页面中可以找到。
③ 任务提交成功后,可在
YARN
的
Web UI
界面查看运行情况。
如图
3-14
所示,从图中可以看到我们创建的
Yarn-Session
实际上是一个
Yarn
的
Application
,并且有唯一的
Application ID
。
④也可以通过
Flink
的
Web UI
页面查看提交任务的运行情况,如图
3-15
所示。
3、单作业模式部署
在
YARN
环境中,由于有了外部平台做资源调度,所以我们也可以直接向
YARN
提交一
个单独的作业,从而启动一个
Flink
集群。
(
1
)执行命令提交作业。
$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCountFlinkTutorial-1.0-SNAPSHOT.jar
早期版本也有另一种写法:
$ bin/flink run-m yarn-cluster-ccom.atguigu.wc.StreamWordCountFlinkTutorial-1.0-SNAPSHOT.jar
注意这里是通过参数
-m yarn-cluster
指定向
YARN
集群提交任务。
(2)在
YARN
的
ResourceManager
界面查看执行情况,如图
3-16
所示
点击可以打开 Flink Web UI 页面进行监控,如图 3-17 所示:
(3)可以使用命令行查看或取消作业,命令如下。
$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY$ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY<jobId>
这里的 application_XXXX_YY 是当前应用的 ID,<jobId>是作业的 ID。注意如果取消作
业,整个 Flink 集群也会停掉。
4、应用模式部署
应用模式同样非常简单,与单作业模式类似,直接执行
flink run-application
命令即可。
(
1
)执行命令提交作业。
$ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCountFlinkTutorial-1.0-SNAPSHOT.jar
(2)在命令行中查看或取消作业。
$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink cancel-t yarn-application-Dyarn.application.id=application_XXXX_YY <jobId>
(3)也可以通过
yarn.provided.lib.dirs
配置选项指定位置,将
jar
上传到远程。
$ ./bin/flink run-application-t yarn-application-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"hdfs://myhdfs/jars/my-application.jar
这种方式下
jar
可以预先上传到
HDFS
,而不需要单独发送到集群,这就使得作业提交更
加轻量了。
5、高可用
YARN
模式的高可用和独立模式(
Standalone
)的高可用原理不一样。 Standalone 模式中
,
同时启动多个
JobManager,
一个为“领导者”(
leader
),其他为“后备” standby)
,
当
leader
挂了
,
其他的才会有一个成为
leader
。 而 YARN
的高可用是只启动一个
Jobmanager,
当这个
Jobmanager
挂了之后
, YARN
会再次 启动一个,
所以其实是利用的
YARN
的重试次数来实现的高可用。
(
1
)在
yarn-site.xml
中配置。
<property><name>yarn.resourcemanager.am.max-attempts</name><value>4</value><description>The maximum number of application master execution attempts.</description></property>
注意: 配置完不要忘记分发, 和重启 YARN。
(2)在
flink-conf.yaml
中配置。
yarn.application-attempts: 3high-availability: zookeeperhigh-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/hahigh-availability.zookeeper.quorum:hadoop102:2181,hadoop103:2181,hadoop104:2181high-availability.zookeeper.path.root: /flink-yarn
(3)启动
yarn-session
。
(4)杀死
JobManager,
查看复活情况。
注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该
小于这个值。
更多推荐
已为社区贡献1条内容
所有评论(0)