flink on yarn集群搭建
环境需求CentOS7.5.1804、jdk1.8.0_181、zookeeper3.6.2、hadoop3.2.2、flink1.12.2关于hadoop的安装细节请查看《hadoop3.2.2集群搭建》集群部署下载flink-1.12.2的安装包,下载地址:https://archive.apache.org/dist/flink/flink-1.12.2/,如下图下载scala-2.12.1
环境需求
- CentOS7.5.1804、jdk1.8.0_181、zookeeper3.6.2、hadoop3.2.2、flink1.12.2
- 关于hadoop的安装细节请查看《hadoop3.2.2集群搭建》
集群部署
下载flink-1.12.2的安装包,下载地址:https://archive.apache.org/dist/flink/flink-1.12.2/,如下图
下载scala-2.12.13的安装包,下载地址:https://scala-lang.org/download/2.12.13.html,如下图:
下载完成后,上传到虚拟机(分发到每一台虚拟机),并解压。
修改flink配置文件
- 配置${FLINK_HOME}/conf/flink-conf.yaml文件,修改的地方如下:
jobmanager.rpc.address: node-1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: /flink-cluster
high-availability.storageDir: hdfs://vmcluster/flink/ha/
high-availability.zookeeper.quorum: node-1:2181,node-2:2181,node-3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs://vmcluster/flink-checkpoints
state.savepoints.dir: hdfs://vmcluster/flink-savepoints
jobmanager.execution.failover-strategy: region
jobmanager.archive.fs.dir: hdfs://vmcluster/completed-jobs/
historyserver.archive.fs.dir: hdfs://vmcluster/completed-jobs/
classloader.check-leaked-classloader: false
- 配置${FLINK_HOME}/conf/masters文件。修改之后如下:
node-1:8081
node-2:8081
- 配置${FLINK_HOME}/conf/workers文件。修改之后如下:
node-3
node-4
node-5
注意:hostname(或IP)、port、hadoop集群的namespace和flink的一切其他配置根据自己的情况去修改和添加即可。
flink部署模式
注意:确保hdfs和yarn集群运行正常。
flink在yarn上的部署模式分为三种:
Application Mode
Application Mode将在YARN上启动一个Flink集群,其中Application jar的main()方法将在YARN中的JobManager上执行。应用程序一完成,集群就会立即关闭。你可以手动停止集群使用yarn application -kill <ApplicationId> 或者 cancelling Flink job。如下:
[bigdata@node-1 flink-1.12.2]$ flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
2021-06-24 09:42:01,088 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/env/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-06-24 09:42:01,282 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-06-24 09:42:01,384 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-06-24 09:42:01,385 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-06-24 09:42:01,385 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=3}
2021-06-24 09:42:11,385 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1624498399517_0002
2021-06-24 09:42:11,616 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1624498399517_0002
2021-06-24 09:42:11,616 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2021-06-24 09:42:11,618 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2021-06-24 09:42:17,703 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2021-06-24 09:42:17,703 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node-5:33939 of application 'application_1624498399517_0002'.
访问yarn的web监控页面,如下:
查看服务器进程:
[bigdata@node-5 ~]$ jps
1281 DataNode
1413 NodeManager
2358 YarnApplicationClusterEntryPoint
2509 YarnTaskExecutorRunner
2589 Jps
Per-Job Cluster Mode
Per-job Cluster Mode 将在YARN上启动Flink集群,然后在本地运行提供的application jar,最后将JobGraph提交给YARN上的JobManager。如果你传递 --detached参数,client将在提交被接受后停止。
[bigdata@node-1 flink-1.12.2]$ flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-06-24 09:35:05,044 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/env/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-06-24 09:35:05,208 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-06-24 09:35:05,489 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-06-24 09:35:05,489 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-06-24 09:35:05,489 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=3}
2021-06-24 09:35:19,178 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1624498399517_0001
2021-06-24 09:35:19,461 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1624498399517_0001
2021-06-24 09:35:19,461 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2021-06-24 09:35:19,466 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2021-06-24 09:35:33,893 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2021-06-24 09:35:33,894 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1624498399517_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1624498399517_0001
Note that killing Flink might not clean up all job artifacts and temporary files.
2021-06-24 09:35:33,894 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node-4:41384 of application 'application_1624498399517_0001'.
Job has been submitted with JobID 18036bbf79b87724f902d18768e716ef
查看服务器进程:
[bigdata@node-4 ~]$ jps
1410 NodeManager
1971 Jps
1278 DataNode
1886 YarnJobClusterEntrypoint
[bigdata@node-5 ~]$ jps
1281 DataNode
1859 YarnTaskExecutorRunner
1413 NodeManager
1947 Jps
Session Mode
session mode有两种操作模式:
- attached mode (default):yarn-session.sh客户端将Flink集群提交给YARN,但是客户端继续运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户机被终止,它也会向集群发出关闭的信号。
- detached mode (-d or --detached):yarn-session.sh客户端将Flink集群提交给YARN,然后客户端返回。需要调用另一个客户端或YARN工具来停止Flink集群。
如下:
[bigdata@node-1 flink-1.12.2]$ yarn-session.sh -n 1 -jm 1024 -tm 1024
2021-06-24 09:58:10,653 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, node-1
2021-06-24 09:58:10,655 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
2021-06-24 09:58:10,655 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2021-06-24 09:58:10,655 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2021-06-24 09:58:10,655 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 3
2021-06-24 09:58:10,655 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability, zookeeper
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.cluster-id, /flink-cluster
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.storageDir, hdfs://vmcluster/flink/ha/
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.zookeeper.quorum, node-1:2181,node-2:2181,node-3:2181
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend, filesystem
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.dir, hdfs://vmcluster/flink-checkpoints
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, hdfs://vmcluster/flink-savepoints
2021-06-24 09:58:10,656 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-06-24 09:58:10,657 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.archive.fs.dir, hdfs://vmcluster/completed-jobs/
2021-06-24 09:58:10,657 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: historyserver.archive.fs.dir, hdfs://vmcluster/completed-jobs/
2021-06-24 09:58:10,670 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-bigdata.
2021-06-24 09:58:10,935 WARN org.apache.hadoop.util.NativeCodeLoader [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-06-24 09:58:11,102 INFO org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop user set to bigdata (auth:SIMPLE)
2021-06-24 09:58:11,138 INFO org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file will be created as /tmp/jaas-3266925321736419552.conf.
2021-06-24 09:58:11,155 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/env/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-06-24 09:58:11,261 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-06-24 09:58:11,268 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-06-24 09:58:11,423 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-06-24 09:58:11,424 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-06-24 09:58:11,424 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=3}
2021-06-24 09:58:21,011 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-06-24 09:58:21,023 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1624498399517_0004
2021-06-24 09:58:21,252 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1624498399517_0004
2021-06-24 09:58:21,252 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2021-06-24 09:58:21,266 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2021-06-24 09:58:28,131 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2021-06-24 09:58:28,132 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node-4:40243 of application 'application_1624498399517_0004'.
2021-06-24 09:58:28,172 INFO org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility [] - Running in ZooKeeper 3.4.x compatibility mode
2021-06-24 09:58:28,172 INFO org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility [] - Using emulated InjectSessionExpiration
2021-06-24 09:58:28,190 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - Starting
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:host.name=node-1
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.version=1.8.0_181
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.vendor=Oracle Corporation
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.home=/opt/env/jdk1.8.0_181/jre
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.class.path=/opt/env/flink-1.12.2/lib/flink-csv-1.12.2.jar:/opt/env/flink-1.12.2/lib/flink-json-1.12.2.jar:/opt/env/flink-1.12.2/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/env/flink-1.12.2/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/env/flink-1.12.2/lib/flink-table_2.12-1.12.2.jar:/opt/env/flink-1.12.2/lib/flink-table-blink_2.12-1.12.2.jar:/opt/env/flink-1.12.2/lib/log4j-1.2-api-2.12.1.jar:/opt/env/flink-1.12.2/lib/log4j-api-2.12.1.jar:/opt/env/flink-1.12.2/lib/log4j-core-2.12.1.jar:/opt/env/flink-1.12.2/lib/log4j-slf4j-impl-2.12.1.jar:/opt/env/flink-1.12.2/lib/flink-dist_2.12-1.12.2.jar::/opt/env/hadoop-3.2.2/etc/hadoop::/opt/env/hbase-2.4.2/conf
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.io.tmpdir=/tmp
2021-06-24 09:58:28,199 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.compiler=<NA>
2021-06-24 09:58:28,200 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:os.name=Linux
2021-06-24 09:58:28,200 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:os.arch=amd64
2021-06-24 09:58:28,200 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:os.version=3.10.0-862.el7.x86_64
2021-06-24 09:58:28,200 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:user.name=bigdata
2021-06-24 09:58:28,200 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:user.home=/home/bigdata
2021-06-24 09:58:28,200 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:user.dir=/opt/env/flink-1.12.2
2021-06-24 09:58:28,200 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Initiating client connection, connectString=node-1:2181,node-2:2181,node-3:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState@15fa55a6
2021-06-24 09:58:28,243 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - Default schema
2021-06-24 09:58:28,268 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3266925321736419552.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2021-06-24 09:58:28,269 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Opening socket connection to server node-1/192.168.56.129:2181
2021-06-24 09:58:28,272 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Socket connection established to node-1/192.168.56.129:2181, initiating session
2021-06-24 09:58:28,273 ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - Authentication failed
2021-06-24 09:58:28,285 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Session establishment complete on server node-1/192.168.56.129:2181, sessionid = 0x10000020e920008, negotiated timeout = 60000
2021-06-24 09:58:28,286 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED
2021-06-24 09:58:28,438 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}.
JobManager Web Interface: http://node-4:40243
[bigdata@node-4 ~]$ jps
1410 NodeManager
2648 YarnSessionClusterEntrypoint
1278 DataNode
2799 Jps
执行如下例子:
[bigdata@node-2 flink-1.12.2]$ flink run ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 80fa8da520b0ff558257713a1d4b1ed3
点击上图的Cancel Job,停止应用程序:
停止session进程:
[bigdata@node-4 ~]$ yarn application -kill application_1624498399517_0004
Killing application application_1624498399517_0004
2021-06-24 10:08:41,817 INFO impl.YarnClientImpl: Killed application application_1624498399517_0004
更多推荐
所有评论(0)