Flink作业提交

官网Cli提交参考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html#restore-a-savepoint

Flink作为一个分布式的计算引擎,类似于Spark,Flink有多重部署模式:standalonek8syarnmesos

Standalone模式

安装步骤:

1.下载安装包,并分发配置好的

2.配置flink-conf.yml & slave & master

3.启动Flink集群

4.确认Flink启动情况:http://localhost:8081/

进程1 -StandaloneSessionClusterEntrypoint

进程2 -TaskManagerRunner

提交Flink-Job

Flink程序的maven打包插件,亲测有效,以下是官网插件依赖地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/dependencies.html

<build>
    <plugins>
        <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
        <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <!-- Run shade goal on package phase -->
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>

        <!-- Java Compiler -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>

        <!-- Scala Compiler -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>

</build>

1.通过Flink的监控页面进行提交

2.通过shell命令行形式进行提交

# a>> standalone提交
bin/flink run  #执行命令
-c com.shufang.flink.examples.FlinkWordCount  #主类
[-p 2 ] #设置并行度
xxxxxxxxxxxxx.jar  #jar包的路径
--port 9999
--host localhost
---------------------------------
bin/flink list [--all] #列出正在运行的作业
bin/flink cancel [jobId] #取消作业
---------------------------------

Flink-Yarn模式

1-介绍

flink的yarn模式细分成2种方式:

方式1:
	在yarn上启动长久运行的(long running)的flink集群(yarn session)
方式2:
	在yarn上运行单个flink job的方式

共性:
	CliFrontend是所有job的入口类,通过解析jar包、mainClass、params等,读取flink的环境,配置信息等
	封装成PackageProgram,最终通过ClusterClient的方式提交给Flink集群

2-分别演示

2.1 session方式

这种方式适合调试,一把用于数据量较小耗时较少的作业

-在yarn中初始化一个flink的集群,开辟指定的资源,以后提交的任务都向这里提交,这个flink集群会常驻在yarn集群中,除非手动停止

--启动之后会启动2个进程
#1. YarnSessionClusterEntryPoint = ApplicationMaster + JobManager
#2. YarnJobClusterEntryPoint = ApplicationMaster进程的启动类
2.1.1 实操
2.1.1.1 启动flink常驻集群
bin/yarn-session.sh -h #可以通过该命令查看yarn-session的命令配置参数

bin/yarn-session.sh 
-n(--container) 2  #taskmanager的数量
-s (--slots) 2  #taskmanager的slot数量
-jm 1024 #jobmanager的内存
-tm 1024 #taskmanager的内存
-nm test #出现在yarn界面上的名字-AppName
-d #后台运行
#开启完之后就可以提交作业了

手动kill yarn session常驻进程
sbin/yarn application -kill application_id
2.2.2 在yarn中部署flink job运行
bin/flink run 
-c com.shufang.flink.examples.FlinkWordCount 
-d xxxxxxxx.jar
--input ///x.txt
--output x.csv

启动之后,flink job共享yarn session开辟的空间,运行在flink分布式集群中

#3. YarnTaskExecutorRunner = TaskManager进程,执行完毕之后会结束进程

2.2 per-job方式

这种是实际生产中所用比较多的方式,适合于数据量较大,耗时较长的作业

bin/flink run 
-m yarn-cluster  #指定master资源管理器,只有在per-job的时候需要指定
-yn #指定TaskManager(container)的数量
-ys #指定TaskManager的Slot的数量
-yqu #指定yarn队列,FIFO、CAP、FAIR
-c com.shufang.flink.examples.FlinkWordCount 
-d xxxxxxxx.jar
--input ///x.txt
--output x.csv

job执行完毕后

YarnJobClusterEntryPoint这个进程也会自动关闭
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐