flink提交命令相关解释
一、Flink on Yarn1.1 、per-job模式1.1.1、命令bin/flink run -m yarn-cluster -yn 3 -ys 3 -ynm bjsxt02-c com.test.flink.wc.StreamWordCount ./appjars/test-1.0-SNAPSHOT.jar1.1.2、参数解释-yn,--container <arg> 表示分
·
一、Flink on Yarn
1.1 、per-job模式
1.1.1、命令
bin/flink run -m yarn-cluster -yn 3 -ys 3 -ynm bjsxt02 -c com.test.flink.wc.StreamWordCount ./appjars/test-1.0-SNAPSHOT.jar
1.1.2、参数解释
-yn,--container <arg> 表示分配容器的数量,也就是 TaskManager 的数量。
-d,--detached:设置在后台运行。
-yjm,--jobManagerMemory<arg>:设置 JobManager 的内存,单位是 MB。
-ytm,--taskManagerMemory<arg>:设置每个 TaskManager 的内存,单位是 MB。
-ynm,--name:给当前 Flink application 在 Yarn 上指定名称。
-yq,--query:显示 yarn 中可用的资源(内存、cpu 核数)
-yqu,--queue<arg> :指定 yarn 资源队列
-ys,--slots<arg> :每个 TaskManager 使用的 Slot 数量。
-yz,--zookeeperNamespace<arg>:针对 HA 模式在 Zookeeper 上创建 NameSpace
-yid,--applicationID<yarnAppId> : 指定 Yarn 集群上的任务 ID,附着到一个后台独立运行的 Yarn Session 中。
-s,-fromSavepoint <savepointPath> 要恢复作业的保存点的路径 的保存点的路径(例如hdfs:///flink/savepoint-1537)。)
1 # 参数必选 :
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
2 # 参数可选 :
-D <arg> 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列.
-s,--slots <arg> 每个TaskManager使用的slots数量
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
-id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
3 # run [OPTIONS] <jar-file> <arguments>
run操作参数:
-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。
4 # 启动一个新的yarn-session,它们都有一个y或者yarn的前缀
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
连接指定host和port的jobmanager:
./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
启动一个新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
5 # 注意:命令行的选项也可以使用./bin/flink 工具获得。
6 # 动作 "run "编译并运行一个程序。
语法:run [OPTIONS] <jar-file> <arguments
"运行 "动作选项。
-c,-class <classname> 带有程序入口点的类
("main "方法或 "getPlan() "方法。
只有在JAR文件没有
在其清单中指定类的情况下才需要。
-C,-classpath <url> 在所有节点上的每个用户代码中添加一个URL
簇中所有节点上的类加载器添加一个URL。
集群中的每个用户代码类加载器添加一个URL。路径必须指定一个
协议(如file://),并且在所有节点上都能访问(如
所有节点上都可以访问(例如,通过
的NFS共享)。) 你可以多次使用这个
选项,可以多次指定
一个以上的URL。该协议必须
是由{@link
java.net.URLClassLoader}.
-d,-detached 如果存在,以分离的方式运行作业
模式运行作业
-n,-allowNonRestoredState 允许跳过不能恢复的保存点状态。
允许跳过不能被恢复的保存点状态。你需要允许
如果你从你的程序中删除了一个操作者
你的程序中删除了一个操作符,而该操作符在保存点出现时是该程序的一部分。
你需要允许这样做,如果你从你的程序中删除了一个操作者,而这个操作者在保存点被触发时是
触发了。
-p,-parallelism <parallelism> 用来运行程序的并行性。程序。可选的标志,用于覆盖配置中指定的默认值。
-q,-sysoutLogging 如果存在,抑制日志输出到标准输出。
-s,-fromSavepoint <savepointPath> 要恢复作业的保存点的路径 的保存点的路径(例如hdfs:///flink/savepoint-1537)。)
7 #Yarn-cluster 模式的选项。
-d,-detached 如果存在,在分离模式下运行作业
模式运行作业
-m,-jobmanager <arg> 要连接的JobManager(主站)的地址。
的地址。使用这个标志可以
连接到一个不同的JobManager,而不是
中指定的那个不同的JobManager。
配置中指定的JobManager。
-yD <property=value> 使用给定属性的值
-yd,-yarndetached 如果存在,在分离模式下运行作业
模式运行作业(已废弃;使用非YARN
特定的选项代替)
-yh,--yarnhelp Yarn会话CLI的帮助。
-yid,-yarnapplicationId <arg> 附加到正在运行的YARN会话上
-yj,
1.2 、yarn-session 模式
1.2.1、命令行
bin/yarn-session.sh -n 3 -s 3 -nm bjsxt1
1.2.2、参数解释
-n,--container <arg> 表示分配容器的数量(也就是 TaskManager 的数量)。
-D <arg> 动态属性。
-d,--detached 在后台独立运行。
-jm,--jobManagerMemory <arg>:设置 JobManager 的内存,单位是 MB。
-nm,--name:在 YARN 上为一个自定义的应用设置一个名字。
-q,--query:显示 YARN 中可用的资源(内存、cpu 核数)。
-qu,--queue <arg>:指定 YARN 队列。
-s,--slots <arg>:每个 TaskManager 使用的 Slot 数量。
-tm,--taskManagerMemory <arg>:每个 TaskManager 的内存,单位是 MB。
-z,--zookeeperNamespace <arg>:针对 HA 模式在 ZooKeeper 上创建 NameSpace。
-id,--applicationId <yarnAppId>:指定 YARN 集群上的任务 ID,附着到一个后台独立运行的 yarn session 中
二、并行度
2.1并行度(Parallelism)
与Spark类似的,一个Flink Job在生成执行计划时也划分成多个Task。Task可以是Source、Sink、算子或算子链(算子链有点意思,之后会另写文章详细说的)。Task可以由多线程并发执行,每个线程处理Task输入数据的一个子集。而并发的数量就称为Parallelism,即并行度。
Flink程序中设定并行度有4种级别,从低到高分别为:算子级别、执行环境(ExecutionEnvironment)级别、客户端(命令行)级别、配置文件(flink-conf.yaml)级别。实际执行时,优先级则是反过来的,算子级别最高。简单示例如下。
- 算子级别
dataStream.flatMap(new SomeFlatMapFunction()).setParallelism(4);
- 执行环境级别
streamExecutionEnvironment.setParallelism(4);
- 命令行级别
bin/flink -run --parallelism 4 example-0.1.jar
- flink-conf.yaml级别
parallelism.default: 4
更多推荐
已为社区贡献1条内容
所有评论(0)