Flink K8s Application任务的使用
这里我们可以指定一定的运行参数,相关的参数设定方案请参考官方文档https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/config/#kubernetes。PodTemplate主要是通过指定pod的启动样例,在podtemplate中可以指定域名、挂载路径、配置文件、初始化容器等信息,如下给
Flink K8s Application任务的使用
Flink K8s Application任务的使用
构键k8s集群
- 在这里,我们需要搭建一个K8S环境用于提供flink任务的运行时环境。推荐使用kubespray的方式创建k8s集群.
- 需要注意的是,我们需要在相应用户的目录下提供一个kubeconfig文件,如下图所示,通过该文件,StreamPark才能顺利地调用K8S客户端提交任务,该config的内容为与K8S的ApiServer进行连接时需要使用的信息。
提供flink运行任务的环境
-
将kubeconfig提供出来,供flink客户端调用
-
在这里主要提供一个供flink使用命名空间、和sa
# 创建namespace kubectl create ns flink-dev # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink-dev # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-dev:flink-service-account
下载flink客户端
flink客户端是控制flink的核心,需要下载并部署
wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
tar -xf flink-1.14.3-bin-scala_2.12.tgz
任务编程
任务jar生成过程
在这里,主要提供一个flink任务案例供flink k8s application进行调用
-
开发java代码,供使用,本示例项目较为简单,仅为将数据输出至mysql中,调用mysql-connector进行实现
package cn.ctyun.demo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SinkToMySQL { public static void main(String[] args) throws Exception { // 从启动参数中获取连接信息 ParameterTool parameterTool = ParameterTool.fromArgs(new String[]{"url", "passwd", "user"}); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Alice", "./prod?id=200", 3500L), new Event("Bob", "./prod?id=2", 2500L), new Event("Alice", "./prod?id=300", 3600L), new Event("Bob", "./home", 3000L), new Event("Bob", "./prod?id=1", 2300L), new Event("Bob", "./prod?id=3", 3300L)); stream.addSink( JdbcSink.sink( "INSERT INTO clicks (user, url) VALUES (?, ?)", (statement, r) -> { statement.setString(1, r.user); statement.setString(2, r.url); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(parameterTool.get("url")) .withDriverName("com.mysql.jdbc.Driver") .withUsername(parameterTool.get("user")) .withPassword(parameterTool.get("passwd")) .build() ) ); env.execute(); } }
-
项目打包
防止一些依赖缺失,这里使用fatjar的方式进行打包,maven相关的设置如下所示:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
之后通过命令
mvn package
进行打包 -
制作镜像,在这里通过官方文档进行
使用docker进行镜像生成,使用命令为docker build -t ****/flink-demo-jar-job:1.0-SNAPSHOT .
FROM apache/flink:1.14.3-scala_2.12 RUN mkdir -p $FLINK_HOME/usrlib COPY lib $FLINK_HOME/lib/ COPY flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flink-demo-jar-job-1.0-SNAPSHOT-jar-with-dependencies.jar
-
推送镜像
docker push ****/flink-demo-jar-job:1.0-SNAPSHOT
k8s Application运行
Application模式架构
在k8s application模式下,用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。
启动命令
这里我们可以指定一定的运行参数,相关的参数设定方案请参考官方文档https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/config/#kubernetes
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
# 指定容器启动的镜像(与之前提交的保持一致)
-Dkubernetes.container.image=****/flink-demo-jar-job:1.0-SNAPSHOT \
-Dkubernetes.jobmanager.replicas=1 \
# 指定容器运行的命名空间
-Dkubernetes.namespace=flink-dev \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.memory.process.size=4096mb \
-Dkubernetes.jobmanager.cpu=1 \
-Djobmanager.memory.process.size=4096mb \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dclassloader.resolve-order=parent-first \
# yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包、配置文件和持久化一些数据
# -Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \
# Main方法
-c cn.ctyun.demo.SinkToMySQL \
# 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机)
local:///usr/local/flink/lib/flink-realtime-1.0-SNAPSHOT.jar \
# 如下将提供mysql的连接信息,通过参数的方式传递给jar包
--passwd ****** \
--user ******\
--url ******
PodTemplate
PodTemplate主要是通过指定pod的启动样例,在podtemplate中可以指定域名、挂载路径、配置文件、初始化容器等信息,如下给出一个提供一个将保存点持久化的的podtemplate。
apiVersion: v1
kind: Pod
spec:
containers:
# Do not change the main container name
- name: flink-main-container
volumeMounts:
- mountPath: /opt/flink/Checkpoint
name: Checkpoint
- mountPath: /opt/flink/Savepoint
name: Savepoint
volumes:
- name: Checkpoint
persistentVolumeClaim:
claimName: flink-checkpoint-pvc
- name: Savepoint
persistentVolumeClaim:
claimName: flink-savepoint-pvc
可知,通过如上的配置文件,启动taskmanager后将同时启用挂载支持,
更多推荐
所有评论(0)