flink-1.10 native-k8s (Beta)
注意:flink-1.10 版本的 native-k8s 还是实验版本,相关配置或客户端可能在未来变更。并且当前版本仅支持 session 模式1.环境需求Kubernetes 版本 >= 1.9用户需要有在 k8s 集群上的相关权限(list/create/delete pods/delete services),需要准备好 KubeConfig 文件,默认会使用( ~/.kube...
注意
:flink-1.10 版本的 native-k8s 还是实验版本,相关配置或客户端可能在未来变更。并且当前版本仅支持 session 模式
1.环境需求
- Kubernetes 版本 >= 1.9
- 用户需要有在 k8s 集群上的相关权限(list/create/delete pods/delete services),需要准备好 KubeConfig 文件,默认会使用( ~/.kube/config,也可以在运行时通过
kubernetes.config.file
指定). 可以运行kubectl auth can-i <list|create|edit|delete> pods
来测试当前用户是否有相关权限。 - 配置好 Kubernetes DNS 服务 (最好测试一下域名解析能否成功)
- 准备好 k8s 用户账号,并配置好 RBAC 权限(create, delete pods)
- 准备 flink 集群的 image 镜像:可以使用官方镜像,也可以自己打镜像
2.Flink Kubernetes Session
对 k8s 部署模式来说,官方目前只提供了 session 运行模式。 Job Cluster 模式正在开发中。
2.1 启动 Flink Session
注意
:要确保 flink-conf.yaml
中的 env.java.home
配置被注释,否则无法启动成功!
# env.java.home: /usr/local/java
先在 k8s 上启动 Flink Session
# -Dkubernetes.container.image=localhost:5000/flink-1.10.0.1 指定镜像,如果不指定该配置,会使用默认配置的官方镜像(flink:latest)
# -Dkubernetes.jobmanager.service-account=flink 指定有 k8s 权限的账号,jobmanager 用该账号为 taskmanager 申请 k8s 的 pod
./bin/kubernetes-session.sh -Dkubernetes.container.image=localhost:5000/flink:1.10.0.1 -Dkubernetes.jobmanager.service-account=flink
可以根据需要添加配置,k8s session 的所有相关配置在这里.
例如:启动一个有 2 个 cpu 核心,4GB 内存的 TaskManager
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=flink:1.10.0.1 \
-Dkubernetes.service.exposed.type=ClusterIP
其余配置则使用 conf/flink-conf.yaml 中的配置,所有配置在这里
如果没有指定 kubernetes.cluster-id
, flink 客户端会随机生成一个 UUID。
2.2 向 Flink Session 提交作业
通过 -e kubernetes-session
指定部署到之前启动的 k8s 集群,并且可以通过 -D 指定参数
./bin/flink run -d -e kubernetes-session -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
2.3 访问 Job Manager UI 界面
这里涉及 k8s 相关概念,由于 k8s 有自己的 ‘防火墙’,用户需要定义组件的端口暴露规则,将端口映射到宿主机,k8s 目前提供了 4 种端口映射方式:
- ClusterIP:组件 ip 和端口只能在 k8s 集群内访问。如果你想访问 jobmanager ui 或者向该 session 集群提交任务,需要启动一个本地代理:
kubectl port-forward service/<ServiceName> 8081
- NodePort:静态端口,通过宿主机 NodeIP:NodePort 就可以直接访问 jobmanager UI
- LoadBalancer:由宿主机上的 load balancer 服务动态生成可用端口,jobmanager web interface 的 NodePort 会在提交 job 时客户端的日志中找到
- ExternalName:映射服务到一个 DNS 域名,当前 flink 版本还不支持
2.4 连接 k8s session
可以使用以下命令连接启动的 k8s session
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
2.5 停止 flink k8s Session
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
2.6 资源清理
依赖 k8s 机制进行清理,删除 k8s 服务即可回收所有占用的 k8s 资源(当然服务也停了)
kubectl delete service/<ClusterID>
3.Log Files
默认情况下 jobmanager 和 taskmanager 只将日志保存到 pod 的 /opt/flink/log 目录下。如果要使用 kubectl logs <PodName>
查看日志,需要进行以下配置:
- 在 Flink 客户端的 log4j.properties 文件中添加一个新的 appender :console
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- 更新 rootLogger :log4j.rootLogger=INFO, file, console.
- 更改配置,移除 jobmanager/taskmanager 启动时的重定向: -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%".
- 重启 k8s-session,使用
kubectl logs <PodName>
命令查看日志
如果 pod 已经启动了,也可以使用kubectl exec -it <PodName> bash
进入 pod 查看日志或者进行 debug。
4.k8s 相关概念
Namespaces
k8s 的命令空间,类似于 yarn 中 queue 的概念,可以定义集群资源,给不同用户使用。详情可以查看k8s官网。
可以通过 -Dkubernetes.namespace=default
指定 namespace
RBAC
- 给 default namespace 的 default 账号赋权
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
- 自定义运行账号并赋权
比如 flink 账号
kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
然后可以使用 -Dkubernetes.jobmanager.service-account=flink
指定 jobmanager 使用 flink 账号来创建或删除 Taskmanager pods
可以查看官方 RBAC 文档了解详情。
5.实现机理
简单介绍一下 Flink 和 k8s 的交互原理。
当创建 Flink k8s session 集群时,flink 客户端首先连接 Kubernetes ApiServer
,提交要启动的集群描述(cluster description),包括 ConfigMap 、Job Manager Service、Job Manager Deployment 和 Owner(1). Kubernetes 将创建 Flink master deployment,Kubelet 拉取 flink image,准备并挂载卷(mount the volume),然后执行 start 命令。master pod 启动后,Dispatcher 和KubernetesResourceManager 就可用了,这时就可以向该集群提交作业了(2).
当用户使用 Flink 客户端提交作业,客户端会生成 job graph,然后将 job graph 连同用户 jar 包上传到 Dispatcher(3)。Dispatcher 会生成和该作业对应的 JobMaster(4).
JobMaster 向 KubernetesResourceManager(K8sRM)申请运行资源(slots)(5)。如果当前没有空闲 slots,K8sRM 会向 k8s Master 申请并注册 TaskManager pods(6 7 8 9 10).
更多推荐
所有评论(0)