Flink的作业提交
Flink作业提交Flink作为一个分布式的计算引擎,类似于Spark,Flink有多重部署模式:standalone、k8s、yarn、mesosStandalone模式安装步骤:1.下载安装包,并分发配置好的2.配置flink-conf.yml & slave3.启动Flink集群4.确认Flink启动情况:http://localhost:8081/进程1 -Stan...
Flink作业提交
官网Cli提交参考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html#restore-a-savepoint
Flink作为一个分布式的计算引擎,类似于Spark,Flink有多重部署模式:standalone、k8s、yarn、mesos
Standalone模式
安装步骤:
1.下载安装包,并分发配置好的
2.配置flink-conf.yml & slave & master
3.启动Flink集群
4.确认Flink启动情况:http://localhost:8081/
进程1 -StandaloneSessionClusterEntrypoint
进程2 -TaskManagerRunner
提交Flink-Job
Flink程序的maven打包插件,亲测有效,以下是官网插件依赖地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/dependencies.html
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1.通过Flink的监控页面进行提交
2.通过shell命令行形式进行提交
# a>> standalone提交
bin/flink run #执行命令
-c com.shufang.flink.examples.FlinkWordCount #主类
[-p 2 ] #设置并行度
xxxxxxxxxxxxx.jar #jar包的路径
--port 9999
--host localhost
---------------------------------
bin/flink list [--all] #列出正在运行的作业
bin/flink cancel [jobId] #取消作业
---------------------------------
Flink-Yarn模式
1-介绍
flink的yarn模式细分成2种方式:
方式1:
在yarn上启动长久运行的(long running)的flink集群(yarn session)
方式2:
在yarn上运行单个flink job的方式
共性:
CliFrontend是所有job的入口类,通过解析jar包、mainClass、params等,读取flink的环境,配置信息等
封装成PackageProgram,最终通过ClusterClient的方式提交给Flink集群
2-分别演示
2.1 session方式
这种方式适合调试,一把用于数据量较小耗时较少的作业
-在yarn中初始化一个flink的集群,开辟指定的资源,以后提交的任务都向这里提交,这个flink集群会常驻在yarn集群中,除非手动停止
--启动之后会启动2个进程
#1. YarnSessionClusterEntryPoint = ApplicationMaster + JobManager
#2. YarnJobClusterEntryPoint = ApplicationMaster进程的启动类
2.1.1 实操
2.1.1.1 启动flink常驻集群
bin/yarn-session.sh -h #可以通过该命令查看yarn-session的命令配置参数
bin/yarn-session.sh
-n(--container) 2 #taskmanager的数量
-s (--slots) 2 #taskmanager的slot数量
-jm 1024 #jobmanager的内存
-tm 1024 #taskmanager的内存
-nm test #出现在yarn界面上的名字-AppName
-d #后台运行
#开启完之后就可以提交作业了
手动kill yarn session常驻进程
sbin/yarn application -kill application_id
2.2.2 在yarn中部署flink job运行
bin/flink run
-c com.shufang.flink.examples.FlinkWordCount
-d xxxxxxxx.jar
--input ///x.txt
--output x.csv
启动之后,flink job共享yarn session开辟的空间,运行在flink分布式集群中
#3. YarnTaskExecutorRunner = TaskManager进程,执行完毕之后会结束进程
2.2 per-job方式
这种是实际生产中所用比较多的方式,适合于数据量较大,耗时较长的作业
bin/flink run
-m yarn-cluster #指定master资源管理器,只有在per-job的时候需要指定
-yn #指定TaskManager(container)的数量
-ys #指定TaskManager的Slot的数量
-yqu #指定yarn队列,FIFO、CAP、FAIR
-c com.shufang.flink.examples.FlinkWordCount
-d xxxxxxxx.jar
--input ///x.txt
--output x.csv
job执行完毕后
YarnJobClusterEntryPoint这个进程也会自动关闭
更多推荐
所有评论(0)