环境需求

  • CentOS7.5.1804、jdk1.8.0_181、zookeeper3.6.2、hadoop3.2.2、flink1.12.2
  • 关于hadoop的安装细节请查看《hadoop3.2.2集群搭建

集群部署

下载flink-1.12.2的安装包,下载地址:https://archive.apache.org/dist/flink/flink-1.12.2/,如下图
在这里插入图片描述
下载scala-2.12.13的安装包,下载地址:https://scala-lang.org/download/2.12.13.html,如下图:
在这里插入图片描述
下载完成后,上传到虚拟机(分发到每一台虚拟机),并解压。

修改flink配置文件

  • 配置${FLINK_HOME}/conf/flink-conf.yaml文件,修改的地方如下:
jobmanager.rpc.address: node-1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: /flink-cluster
high-availability.storageDir: hdfs://vmcluster/flink/ha/
high-availability.zookeeper.quorum: node-1:2181,node-2:2181,node-3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs://vmcluster/flink-checkpoints
state.savepoints.dir: hdfs://vmcluster/flink-savepoints
jobmanager.execution.failover-strategy: region
jobmanager.archive.fs.dir: hdfs://vmcluster/completed-jobs/
historyserver.archive.fs.dir: hdfs://vmcluster/completed-jobs/
classloader.check-leaked-classloader: false
  • 配置${FLINK_HOME}/conf/masters文件。修改之后如下:
node-1:8081
node-2:8081
  • 配置${FLINK_HOME}/conf/workers文件。修改之后如下:
node-3
node-4
node-5

注意:hostname(或IP)、port、hadoop集群的namespace和flink的一切其他配置根据自己的情况去修改和添加即可。

flink部署模式

注意:确保hdfs和yarn集群运行正常。
flink在yarn上的部署模式分为三种:

Application Mode

Application Mode将在YARN上启动一个Flink集群,其中Application jar的main()方法将在YARN中的JobManager上执行。应用程序一完成,集群就会立即关闭。你可以手动停止集群使用yarn application -kill <ApplicationId> 或者 cancelling Flink job。如下:

[bigdata@node-1 flink-1.12.2]$ flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
2021-06-24 09:42:01,088 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/env/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-06-24 09:42:01,282 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-06-24 09:42:01,384 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-06-24 09:42:01,385 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-06-24 09:42:01,385 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=3}
2021-06-24 09:42:11,385 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1624498399517_0002
2021-06-24 09:42:11,616 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1624498399517_0002
2021-06-24 09:42:11,616 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-06-24 09:42:11,618 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-06-24 09:42:17,703 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-06-24 09:42:17,703 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node-5:33939 of application 'application_1624498399517_0002'.

访问yarn的web监控页面,如下:
在这里插入图片描述
在这里插入图片描述

查看服务器进程:

[bigdata@node-5 ~]$ jps
1281 DataNode
1413 NodeManager
2358 YarnApplicationClusterEntryPoint
2509 YarnTaskExecutorRunner
2589 Jps

在这里插入图片描述
在这里插入图片描述

Per-Job Cluster Mode

Per-job Cluster Mode 将在YARN上启动Flink集群,然后在本地运行提供的application jar,最后将JobGraph提交给YARN上的JobManager。如果你传递 --detached参数,client将在提交被接受后停止。

[bigdata@node-1 flink-1.12.2]$ flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2021-06-24 09:35:05,044 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/env/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-06-24 09:35:05,208 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-06-24 09:35:05,489 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-06-24 09:35:05,489 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-06-24 09:35:05,489 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=3}
2021-06-24 09:35:19,178 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1624498399517_0001
2021-06-24 09:35:19,461 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1624498399517_0001
2021-06-24 09:35:19,461 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-06-24 09:35:19,466 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-06-24 09:35:33,893 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-06-24 09:35:33,894 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1624498399517_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1624498399517_0001
Note that killing Flink might not clean up all job artifacts and temporary files.
2021-06-24 09:35:33,894 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node-4:41384 of application 'application_1624498399517_0001'.
Job has been submitted with JobID 18036bbf79b87724f902d18768e716ef

在这里插入图片描述
在这里插入图片描述
查看服务器进程:

[bigdata@node-4 ~]$ jps
1410 NodeManager
1971 Jps
1278 DataNode
1886 YarnJobClusterEntrypoint

[bigdata@node-5 ~]$ jps
1281 DataNode
1859 YarnTaskExecutorRunner
1413 NodeManager
1947 Jps
Session Mode

session mode有两种操作模式:

  1. attached mode (default):yarn-session.sh客户端将Flink集群提交给YARN,但是客户端继续运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户机被终止,它也会向集群发出关闭的信号。
  2. detached mode (-d or --detached):yarn-session.sh客户端将Flink集群提交给YARN,然后客户端返回。需要调用另一个客户端或YARN工具来停止Flink集群。

如下:

[bigdata@node-1 flink-1.12.2]$ yarn-session.sh -n 1 -jm 1024 -tm 1024
2021-06-24 09:58:10,653 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, node-1
2021-06-24 09:58:10,655 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2021-06-24 09:58:10,655 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2021-06-24 09:58:10,655 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2021-06-24 09:58:10,655 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 3
2021-06-24 09:58:10,655 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability, zookeeper
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.cluster-id, /flink-cluster
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.storageDir, hdfs://vmcluster/flink/ha/
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.zookeeper.quorum, node-1:2181,node-2:2181,node-3:2181
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, filesystem
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, hdfs://vmcluster/flink-checkpoints
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, hdfs://vmcluster/flink-savepoints
2021-06-24 09:58:10,656 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2021-06-24 09:58:10,657 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.archive.fs.dir, hdfs://vmcluster/completed-jobs/
2021-06-24 09:58:10,657 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: historyserver.archive.fs.dir, hdfs://vmcluster/completed-jobs/
2021-06-24 09:58:10,670 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-bigdata.
2021-06-24 09:58:10,935 WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-06-24 09:58:11,102 INFO  org.apache.flink.runtime.security.modules.HadoopModule       [] - Hadoop user set to bigdata (auth:SIMPLE)
2021-06-24 09:58:11,138 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-3266925321736419552.conf.
2021-06-24 09:58:11,155 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/env/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-06-24 09:58:11,261 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-06-24 09:58:11,268 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-06-24 09:58:11,423 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-06-24 09:58:11,424 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-06-24 09:58:11,424 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=3}
2021-06-24 09:58:21,011 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2021-06-24 09:58:21,023 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1624498399517_0004
2021-06-24 09:58:21,252 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1624498399517_0004
2021-06-24 09:58:21,252 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-06-24 09:58:21,266 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-06-24 09:58:28,131 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-06-24 09:58:28,132 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node-4:40243 of application 'application_1624498399517_0004'.
2021-06-24 09:58:28,172 INFO  org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility [] - Running in ZooKeeper 3.4.x compatibility mode
2021-06-24 09:58:28,172 INFO  org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility [] - Using emulated InjectSessionExpiration
2021-06-24 09:58:28,190 INFO  org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - Starting
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:host.name=node-1
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.version=1.8.0_181
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.vendor=Oracle Corporation
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.home=/opt/env/jdk1.8.0_181/jre
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.class.path=/opt/env/flink-1.12.2/lib/flink-csv-1.12.2.jar:/opt/env/flink-1.12.2/lib/flink-json-1.12.2.jar:/opt/env/flink-1.12.2/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/env/flink-1.12.2/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/env/flink-1.12.2/lib/flink-table_2.12-1.12.2.jar:/opt/env/flink-1.12.2/lib/flink-table-blink_2.12-1.12.2.jar:/opt/env/flink-1.12.2/lib/log4j-1.2-api-2.12.1.jar:/opt/env/flink-1.12.2/lib/log4j-api-2.12.1.jar:/opt/env/flink-1.12.2/lib/log4j-core-2.12.1.jar:/opt/env/flink-1.12.2/lib/log4j-slf4j-impl-2.12.1.jar:/opt/env/flink-1.12.2/lib/flink-dist_2.12-1.12.2.jar::/opt/env/hadoop-3.2.2/etc/hadoop::/opt/env/hbase-2.4.2/conf
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.io.tmpdir=/tmp
2021-06-24 09:58:28,199 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.compiler=<NA>
2021-06-24 09:58:28,200 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:os.name=Linux
2021-06-24 09:58:28,200 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:os.arch=amd64
2021-06-24 09:58:28,200 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:os.version=3.10.0-862.el7.x86_64
2021-06-24 09:58:28,200 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:user.name=bigdata
2021-06-24 09:58:28,200 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:user.home=/home/bigdata
2021-06-24 09:58:28,200 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:user.dir=/opt/env/flink-1.12.2
2021-06-24 09:58:28,200 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Initiating client connection, connectString=node-1:2181,node-2:2181,node-3:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState@15fa55a6
2021-06-24 09:58:28,243 INFO  org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - Default schema
2021-06-24 09:58:28,268 WARN  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3266925321736419552.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2021-06-24 09:58:28,269 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Opening socket connection to server node-1/192.168.56.129:2181
2021-06-24 09:58:28,272 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Socket connection established to node-1/192.168.56.129:2181, initiating session
2021-06-24 09:58:28,273 ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - Authentication failed
2021-06-24 09:58:28,285 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Session establishment complete on server node-1/192.168.56.129:2181, sessionid = 0x10000020e920008, negotiated timeout = 60000
2021-06-24 09:58:28,286 INFO  org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED
2021-06-24 09:58:28,438 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}.
JobManager Web Interface: http://node-4:40243
[bigdata@node-4 ~]$ jps
1410 NodeManager
2648 YarnSessionClusterEntrypoint
1278 DataNode
2799 Jps

在这里插入图片描述
在这里插入图片描述
执行如下例子:

[bigdata@node-2 flink-1.12.2]$ flink run ./examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 80fa8da520b0ff558257713a1d4b1ed3

在这里插入图片描述
在这里插入图片描述
点击上图的Cancel Job,停止应用程序:
在这里插入图片描述
在这里插入图片描述
停止session进程:

[bigdata@node-4 ~]$ yarn application -kill application_1624498399517_0004
Killing application application_1624498399517_0004
2021-06-24 10:08:41,817 INFO impl.YarnClientImpl: Killed application application_1624498399517_0004

在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐