flink on k8s 目前有两种模式:

Kubernetes:用户通过定义 flink 的 k8s 资源描述文件,由 kubectl 命令启动,最终以 standalone session cluster 或 standalone job cluster 模式运行。其中 standalone session cluster 运行多个作业;standalone job cluster 只运行一个作业。
Native Kubernetes:用户通过 flink run 命令直接提交作业,由 flink 客户端(内置 k8s client)将集群资源描述信息(包括ConfigMap(例如 HADOOP_CONF) 描述等)提交到 k8s ApiServer,从而启动 flink 集群并运行 flink 作业。具体原理查看第 6 节。
flink-1.11 新增了 k8s job cluster 和 native k8s Application 模式。本文只介绍 native k8s session 和 native k8s application 模式。

1.环境需求
Kubernetes 版本 >= 1.9
用户需要有在 k8s 集群上的相关权限(list/create/delete pods/delete services),需要准备好 KubeConfig 文件,默认会使用( ~/.kube/config,也可以在运行时通过 kubernetes.config.file 指定). 可以运行 kubectl auth can-i <list|create|edit|delete> pods 来测试当前用户是否有相关权限。
配置好 Kubernetes DNS 服务 (最好测试一下域名解析能否成功)
准备好 k8s 用户账号,并配置好 RBAC 权限(create, delete pods). flink 镜像默认是以 flink 账号运行 flink 程序的
准备 flink 集群的 image 镜像:可以使用官方镜像,也可以自己打镜像
flink-1.11 客户端
hadoop conf 文件(可选, 访问 HDFS 文件系统时需要)
2.Flink Kubernetes Session
2.1 启动 Flink Session
注意:要确保 flink-conf.yaml 中的 env.java.home 配置被注释,否则无法启动成功!

# env.java.home: /usr/local/java
1
先在 k8s 上启动 Flink Session

# -Dkubernetes.container.image=localhost:5000/flink:1.11.1-scala_2.11 指定镜像,如果不指定该配置,会使用默认配置的官方镜像(flink:latest) 
# -Dkubernetes.jobmanager.service-account=flink 指定有 k8s 权限的账号,jobmanager 用该账号为 taskmanager 申请 k8s 的 pod
./bin/kubernetes-session.sh -Dkubernetes.container.image=localhost:5000/flink:1.11.1-scala_2.11 -Dkubernetes.jobmanager.service-account=flink
1
2
3
可以根据需要添加配置,flink on k8s 的所有相关配置在这里.
例如:启动一个有 2 个 cpu 核心,4GB 内存的 TaskManager

./bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=<ClusterId> \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dkubernetes.container.image=localhost:5000/flink:1.11.1-scala_2.11 \
  -Dkubernetes.service.exposed.type=NodePort
1
2
3
4
5
6
7
其余配置则使用 conf/flink-conf.yaml 中的配置,所有配置在这里
如果没有指定 kubernetes.cluster-id, flink 客户端会随机生成一个 UUID。

2.2 向 Flink Session 提交作业
通过 -e kubernetes-session 指定部署到之前启动的 k8s 集群,并且可以通过 -D 指定参数

./bin/flink run -d -e kubernetes-session -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
1
2.3 访问 Job Manager UI 界面
这里涉及 k8s 相关概念,由于 k8s 有自己的 ‘防火墙’,用户需要定义组件的端口暴露规则,将端口映射到宿主机,k8s 目前提供了 4 种端口映射方式:

ClusterIP:组件 ip 和端口只能在 k8s 集群内访问。如果你想访问 jobmanager ui 或者向该 session 集群提交任务,需要启动一个本地代理:
kubectl port-forward service/<ServiceName> 8081
1
NodePort:静态端口,通过宿主机 <NodeIP>:<NodePort> 就可以直接访问 jobmanager UI。NodeIP 可以用 Kubernetes ApiServer 替换。作业提交成功后,<NodeIP>:<NodePort>可以在提交 job 的客户端日志中找到。
LoadBalancer:使用云提供商的负载均衡器在外部暴露服务。 由于云提供商和 Kubernetes 需要一些时间来准备负载均衡器,因为你可能在客户端日志中获得一个 NodePort 的 JobManager Web 界面。 你可以使用 kubectl get services/<ClusterId> 获取 EXTERNAL-IP, 然后手动构建负载均衡器 JobManager Web 界面 http://:8081
ExternalName:映射服务到一个 DNS 域名,当前 flink 版本还不支持
通常在自建 k8s 集群情况下,我们直接使用 NodePort 模式就可以了。

2.4 连接 k8s session
可以使用以下命令连接启动的 k8s session

./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
1
2.5 停止 flink k8s Session
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
1
2.6 资源清理
依赖 k8s 机制进行清理,删除 k8s 服务即可回收所有占用的 k8s 资源(当然服务也停了)

kubectl delete service/<ClusterID>
1
3.Flink k8s Application
注意:要确保 flink-conf.yaml 中的 env.java.home 配置被注释,否则无法启动成功!

# env.java.home: /usr/bin/hadoop/software/java
1
3.1 运行 flink 自带示例程序
# 需要访问 hdfs 文件系统时需要配置 HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/path/to/hadoop/etc/hadoop
# 每个作业一个全局唯一的 job_name
kubernetes_cluster_id={your-flink-job_name} && \
./bin/flink run-application -p 2 -t kubernetes-application \
  -Dkubernetes.cluster-id=${kubernetes_cluster_id} \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dkubernetes.container.image=localhost:5000/flink:1.11.1-scala_2.11 \
  -Dkubernetes.jobmanager.service-account=flink \
  -Dkubernetes.rest-service.exposed.type=NodePort \
  -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" \
  -Dhigh-availability=zookeeper \
  -Dhigh-availability.cluster-id=${kubernetes_cluster_id} \
  -Dhigh-availability.storageDir=hdfs:///path/to/flink/metadata/${kubernetes_cluster_id} \
  -Dstate.checkpoints.dir=hdfs:///path/to/flink/checkpoints/${kubernetes_cluster_id} \
  -Dstate.savepoints.dir=hdfs:///path/to/flink/savepoints/${kubernetes_cluster_id} \
  -Dexecution.checkpointing.interval=5000ms \
  -Dtaskmanager.memory.process.size=4096m \
  -Dtaskmanager.numberOfTaskSlots=2 \
  local:///opt/flink/examples/streaming/WindowJoin.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
注意:

在 k8s Application 模式下,指定运行 jar 包时,只能使用 “local” schema。此处的 local 指的是 kubernetes.container.image 指定的 docker 镜像,不是 flink 客户端。所以只能运行镜像中的 jar 包。那如何运行用户自定义的 jar 包呢?请看 3.2 节。
启动时会默认读取flink 客户端 conf 下的配置文件(flink-conf.yaml log4j logback …),上传到 ConfigMap(名字为 flink-config-{kubernetes.cluster-id}),覆盖 image 中的默认配置
如果设置了 HADOOP_CONF_DIR,也会将 HADOOP_CONF_DIR 下的文件上传到 ConfigMap(名字为 hadoop-config-{kubernetes.cluster-id}),如果程序中使用了 Hadoop 文件系统(例如配置了 checkpoint/savepoint 地址为 hdfs),会读取该 ConfigMap 中的配置
kubernetes.cluster-id:k8s cluster-id ,不应超过 45 个字符。如果没有指定 kubernetes.cluster-id, flink 客户端会随机生成一个 UUID。建议用户手动设置该值,并且保证每个作业的 cluster-id 都不一样。手动设置该值时应该满足 k8s 的命名规范(正则表达式 [a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*),简单归纳为:必须为小写字母或数字;必须包含 ‘-’ 或 ‘.’;必须以小写字母(或数字)开始和结尾。例如 “example.com”, “flink-windowjoin-example”
kubernetes.container.image:如果没有配置镜像,flink 会根据当前客户端的版本确定镜像。建议指定镜像,例如 flink:1.11.1-scala_2.11,官方提供的镜像在这里,当然也可以使用自己打的镜像。
kubernetes.jobmanager.service-account: 指定有 k8s 权限的账号,jobmanager 用该账号为 taskmanager 申请 k8s 的 pod,
high-availability:官方 flink-1.11 on native k8s 目前只支持 ZK+HDFS/S3/GFS/OSS 方式开启 HA,目前社区在考虑使用 native k8s api 替代 ZK 实现 HAFLINK-12884 FLINK-17598,并且以 mounting volumes 支持挂载共享磁盘的方式也已经有人实现FLINK-15649,如果不改动 1.11 代码,最好使用 ZK + HDFS 或者 S3 来支持 HA. 在 flink-1.11 application 模式下,如果开启 HA 模式(high-availability=zookeeper),job_id 始终为 ‘00000000000000000000000000000000’。这样可能会在提交多个作业时使用相同的 metadata/checkpoint/savepoint 目录。所以在开启 HA 模式时,我们需要手动设置 metadata/checkpoint/savepoint 目录,确保这些目录只被一个作业使用。可以查看上述示例中的几个相关配置:high-availability.cluster-id/high-availability.storageDir/state.checkpoints.dir/state.savepoints.dir
kubernetes.rest-service.exposed.type:这里指定的是 NodePort,详情查看 2.3 节
kubernetes.jobmanager.service-account:申请 k8s 资源需要的账号,这里是 flink。详情查看 5.2 节
kubernetes.container-start-command-template:为了 debug 时能在 k8s console 端查看日志,需要手动配置该项。建议正式运行时删除该配置,使用默认配置。详情查看第 4 节。
其他配置请查看官方配置文档

如果想要提交作业时都重新从 registry 下载镜像,可以指定 -Dkubernetes.container.image.pull-policy=Always

3.2 运行用户自定义程序
如果想要运行用户自定义的 jar 包,需要构建自己的镜像,镜像中包含用户自己的 jar 包和 flink 基础镜像。
例如我们想要运行 my-flink-job.jar 中的作业,可以进行以下步骤:

创建 Dockerfile
# 基础镜像是 flink image
FROM localhost:5000/flink:1.11.1-scala_2.11
# 在镜像中创建用户 jar 包目录。FLINK_HOME 在原 flink 镜像中已经配置好了(/opt/flink),这里直接用
RUN mkdir -p $FLINK_HOME/usrlib
# 拷贝 my-flink-job.jar 到上面的镜像目录
COPY /path/to/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
1
2
3
4
5
6
创建镜像
# 进入 Dockerfile 目录
cd /path/to/Dockerfile
docker build -t localhost:5000/my-flink-job:1.0 .
1
2
3
上传镜像到私服
docker push localhost:5000/my-flink-job:1.0
1
运行自定义程序
和之前提交 flink 自带示例不同的就是,指定用户自己的 jar 包(local:///opt/flink/usrlib/my-flink-job.jar);指定自己的镜像(localhost:5000/my-flink-job:1.0);以及指定自己需要的运行资源。
# 需要访问 hdfs 文件系统时需要配置 HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/path/to/hadoop/etc/hadoop
# 每个作业一个全局唯一的 job_name
kubernetes_cluster_id={your-flink-job_name} && \
./bin/flink run-application -p 2 -t kubernetes-application \
  -Dkubernetes.cluster-id=${kubernetes_cluster_id} \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dkubernetes.container.image=localhost:5000/my-flink-job:1.0 \
  -Dkubernetes.jobmanager.service-account=flink \
  -Dkubernetes.rest-service.exposed.type=NodePort \
  -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" \
  -Dhigh-availability=zookeeper \
  -Dhigh-availability.cluster-id=${kubernetes_cluster_id} \
  -Dhigh-availability.storageDir=hdfs:///path/to/flink/metadata/${kubernetes_cluster_id} \
  -Dstate.checkpoints.dir=hdfs:///path/to/flink/checkpoints/${kubernetes_cluster_id} \
  -Dstate.savepoints.dir=hdfs:///path/to/flink/savepoints/${kubernetes_cluster_id} \
  -Dexecution.checkpointing.interval=5000ms \
  -Dtaskmanager.memory.process.size=4096m \
  -Dtaskmanager.numberOfTaskSlots=2 \
  local:///opt/flink/usrlib/my-flink-job.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
终于可以愉快的提交作业了。

3.3 停止 flink Application 程序
停止 flink Application 程序时,相关的 k8s 资源会自动释放
建议使用 bin/flink stop 命令停止 streaming 作业(同时会触发 savepoint)

./bin/flink stop -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID>
1
3.4 关于 flinkonk8s HA
官方 flink-1.11 on native k8s 目前只支持 ZK+HDFS/S3/GFS/OSS 方式开启 HA,目前社区在考虑使用 native k8s api 替代 ZK 实现 HAFLINK-12884 FLINK-17598;并且以 mounting volumes 支持挂载共享磁盘的方式也已经有人实现FLINK-15649,不过目前还没有合并到主分支;如果不改动 1.11 代码,最好使用 ZK + HDFS 或者 S3 来支持 HA. 在 flink-1.11 application 模式下,如果开启 HA 模式(high-availability=zookeeper),job_id 始终为 ‘00000000000000000000000000000000’。这样可能会在提交多个作业时使用相同的 metadata/checkpoint/savepoint 目录。所以在开启 HA 模式时,我们需要手动设置 metadata/checkpoint/savepoint 目录,确保这些目录只被一个作业使用。

如果使用自建的 k8s 集群,目前个人来看最合适的 flinkonk8s HA 方案是 ZK+S3/ZK+HDFS. 上文一直使用的 ZK+HDFS 举例。这里给出 ZK+S3 的配置方法和注意事项。

1.首先在 flink-conf.yaml 中配置(或者在命令行中通过 -D 指定)以下参数:

#==============================================================================
# S3 file system
# you should copy flink-s3-fs-hadoop to plugins:
# mkdir ./plugins/s3-fs-hadoop
# cp ./opt/flink-s3-fs-hadoop-1.11*.jar ./plugins/s3-fs-hadoop/
#==============================================================================
# Configure Access Credentials: s3 access_key
s3.access-key: ASDFQWE23*****
# Configure Access Credentials: s3 secret_key
s3.secret-key: dHx9op********************
# your-endpoint-hostname, for example http://my.host.name:9080/
s3.endpoint: http://my.host.name:9080/
# Some S3 compliant object stores might not have virtual host style addressing enabled by default. In such cases, you will have to provide the property to enable path style access
s3.path.style.access: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
s3.access-key: s3 access_key
s3.secret-key: s3 secret_key
s3.endpoint: 自建 s3 集群 host_base
s3.path.style.access:对于没有启用 virtual host style addressing 的 S3 集群,需要开启 path style access
2.在 S3 中创建 bucket,比如我们创建好了 /flink bucket
3.我们仿照 3.2 节的需求,将使用的文件系统从 HDFS 修改为 S3
创建 Dockerfile,和 3.2 不同的是,这一步多了拷贝 flink-s3-fs-hadoop 的过程

# 基础镜像是 flink image
FROM localhost:5000/flink:1.11.1-scala_2.11
# 需要将 flink-s3-fs-hadoop 放到 plugins/s3-fs-hadoop 下
RUN mkdir $FLINK_HOME/plugins/s3-fs-hadoop && \
    cp $FLINK_HOME/opt/flink-s3-fs-hadoop-1.11*.jar $FLINK_HOME/plugins/s3-fs-hadoop/
COPY /path/to/my-flink-job.jar $FLINK_HOME/usrlib/WindowJoin.jar
1
2
3
4
5
6
和之前一样创建镜像

# 进入 Dockerfile 目录
cd /path/to/Dockerfile
docker build -t localhost:5000/my-flink-job:1.0 .
1
2
3
和之前一样上传镜像到私服

docker push localhost:5000/my-flink-job:1.0
1
运行 my-flink-job
将涉及到的文件操作配置都修改成 s3://,并使用我们上面创建的 bucket
一般有以下几项:
high-availability.storageDir / state.checkpoints.dir / state.savepoints.dir

# 每个作业一个全局唯一的 job_name
kubernetes_cluster_id={your-flink-job_name} && \
./bin/flink run-application -p 2 -t kubernetes-application \
  -Dkubernetes.cluster-id=${kubernetes_cluster_id} \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dkubernetes.container.image=localhost:5000/my-flink-job:1.0 \
  -Dkubernetes.jobmanager.service-account=flink \
  -Dkubernetes.rest-service.exposed.type=NodePort \
  -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" \
  -Dhigh-availability=zookeeper \
  -Dhigh-availability.cluster-id=${kubernetes_cluster_id} \
  -Dhigh-availability.storageDir=s3://flink/metadata/${kubernetes_cluster_id} \
  -Dstate.checkpoints.dir=s3://flink/checkpoints/${kubernetes_cluster_id} \
  -Dstate.savepoints.dir=s3://flink/savepoints/${kubernetes_cluster_id} \
  -Dexecution.checkpointing.interval=5000ms \
  -Dtaskmanager.memory.process.size=4096m \
  -Dtaskmanager.numberOfTaskSlots=2 \
  local:///opt/flink/usrlib/my-flink-job.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
这样就能使用 S3 作为 flink 的文件系统了,可以用于快照存储和用户程序的文件读写。

flink 官方关于使用 S3 作为文件系统的文档在这里.

4.Log Files
默认情况下 jobmanager 和 taskmanager 只将日志保存到 pod 的 /opt/flink/log 目录下。如果要使用 kubectl logs <PodName>查看日志,需要进行以下配置:

在 Flink 客户端的 log4j.properties 文件中添加一个新的 appender :console
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
1
2
3
4
5
添加 rootLogger :rootLogger.appenderRef.console.ref = ConsoleAppender
更改配置,移除 jobmanager/taskmanager 启动时的重定向:修改 kubernetes.container-start-command-template 参数如下:
 -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
1
重启 k8s-session,使用 kubectl logs <PodName> 命令查看日志
如果 pod 已经启动了,也可以使用 kubectl exec -it <PodName> bash 进入 pod 查看日志或者进行 debug。

5.k8s 相关概念
5.1 Namespaces
k8s 的命令空间,类似于 yarn 中 queue 的概念,可以定义集群资源,给不同用户使用。详情可以查看k8s官网。
可以通过 -Dkubernetes.namespace=default 指定 namespace

5.2 RBAC
给 default namespace 的 default 账号赋权
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
1
自定义运行账号并赋权
比如 flink 账号

kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
1
2
然后可以使用 -Dkubernetes.jobmanager.service-account=flink 指定 jobmanager 使用 flink 账号来创建或删除 Taskmanager pods

可以查看官方 RBAC 文档了解详情。

6.实现机理
简单介绍一下 Flink 和 k8s 的交互原理。

当创建 Flink k8s session 集群时,flink 客户端首先连接 Kubernetes ApiServer,提交要启动的集群描述(cluster description),包括 ConfigMap 、Job Manager Service、Job Manager Deployment 和 Owner(1). Kubernetes 将创建 Flink master deployment,Kubelet 拉取 flink image,准备并挂载卷(mount the volume),然后执行 start 命令。master pod 启动后,Dispatcher 和KubernetesResourceManager 就可用了,这时就可以向该集群提交作业了(2).

当用户使用 Flink 客户端提交作业,客户端会生成 job graph,然后将 job graph 连同用户 jar 包上传到 Dispatcher(3)。Dispatcher 会生成和该作业对应的 JobMaster(4).

JobMaster 向 KubernetesResourceManager(K8sRM)申请运行资源(slots)(5)。如果当前没有空闲 slots,K8sRM 会向 k8s Master 申请并注册 TaskManager pods(6 7 8 9 10).

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐