flink on k8s native 再次实践
基于flink 1.13.2版本做的实践本次主要实践flink on k8s native 的两种方式, 分别是sesion 和 application方式第一步: k8s环境准备1, 创建一个namespacekubectl create namespace flink-session-cluster-test-12132, 新建一个serviceaccount, 用来提交flink的任务kub
基于flink 1.13.2版本做的实践
本次主要实践flink on k8s native 的两种方式, 分别是sesion 和 application方式
第一步: k8s环境准备
1, 创建一个namespace
kubectl create namespace flink-session-cluster-test-1213
2, 新建一个serviceaccount, 用来提交flink的任务
kubectl create serviceaccount flink -n flink-session-cluster-test-1213
3, 做好绑定
kubectl create clusterrolebinding flink-role-binding-flink-session-cluster-test-1213_flink \
--clusterrole=edit --serviceaccount=flink-session-cluster-test-1213:flink
第二步: 镜像准备
使用hdfs作为flink的checkpoint存储,所以需要在flink的lib目录中放入hadoop的jar包
创建Dockerfile文件,并添加如下内容:
vi Dockerfile
FROM flink:1.13.2-scala_2.11-java8
COPY ./flink-shaded-hadoop-2-uber-2.7.5-10.0.jar $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
构建image
docker build -t native_realtime:1.0.3 .
后续的session与application均使用该镜像镜像实践
为了解决hosts映射以及用户自定义jar包等问题, 需要使用yaml模板
vi flink-template.yaml
apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
spec:
initContainers:
- name: artifacts-fetcher
image: native_realtime:1.0.3
# 添加自定义运行的jar包以及各种配置文件
command: ["/bin/sh","-c"]
args: ["wget http://xxxxxx:8082/flinkhistory/1.13.2/tt.sql -O /opt/flink/usrHome/taa.sql ; wget http://xxxx:8082/flinkhistory/1.13.2/realtime-dw-service-1.0.1-SNAPSHOT.jar -O /opt/flink/usrHome/realtime-dw-service-1.0.1.jar"]
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
hostAliases:
- ip: 10.1.1.103
hostnames:
- "cdh103"
- ip: 10.1.1.104
hostnames:
- "cdh104"
- ip: 10.1.1.105
hostnames:
- "cdh105"
- ip: 10.1.1.106
hostnames:
- "cdh106"
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
volumes:
- name: flink-usr-home
hostPath:
path: /tmp
type: Directory
使用run application模式提交任务
/data/flink-1.13.0/bin/flink run-application \
--target kubernetes-application \
-Dresourcemanager.taskmanager-timeout=345600 \
-Dkubernetes.namespace=flink-session-cluster-test-1213 \
-Dkubernetes.service-account=flink \
-Dkubernetes.cluster-id=flink-stream-reatime-dw11 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \
-Dkubernetes.container.image=native_realtime:1.0.3 \
-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \
-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \
-Dexecution.checkpointing.interval=2s \
-Dexecution.checkpointing.mode=EXACTLY_ONCE \
-Dstate.backend=filesystem \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Drestart-strategy=failure-rate \
-Drestart-strategy.failure-rate.delay=1s \
-Drestart-strategy.failure-rate.failure-rate-interval=5s \
-Drestart-strategy.failure-rate.max-failures-per-interval=1 \
-Dtaskmanager.memory.process.size=1096m \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dkubernetes.pod-template-file=./flink-template.yaml \
-c com.xxx.bigdata.rt.dw.service.runtime.RealtimeWarehouseMain \
local:///opt/flink/usrHome/realtime-dw-service-1.0.1.jar \
-cfc state.checkpoint.interval=60000 -cfp 1 -cfm no -cfn kafka_es -cfs /opt/flink/usrHome/taa.sql
使用session模式提交任务
-- 创建session
/data/flink-1.13.0/bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=stream-wordcount-application-cluster \
-Dtaskmanager.memory.process.size=1096m \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=native_realtime:1.0.3 \
-Dkubernetes.service.exposed.type=NodePort \
-Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.service-account=flink \
-Dkubernetes.namespace=flink-session-cluster-test-1213\
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \
-Dkubernetes.container.image=native_realtime:1.0.3 \
-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \
-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \
-Dexecution.checkpointing.interval=2s \
-Dexecution.checkpointing.mode=EXACTLY_ONCE \
-Dstate.backend=filesystem \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Drestart-strategy=failure-rate \
-Drestart-strategy.failure-rate.delay=1s \
-Drestart-strategy.failure-rate.failure-rate-interval=5s \
-Drestart-strategy.failure-rate.max-failures-per-interval=1 \
-Dkubernetes.pod-template-file=./flink-template.yaml \
提交任务
/data/flink-1.13.0/bin/flink run -d -e kubernetes-session \
-Dkubernetes.cluster-id=stream-wordcount-application-cluster \
-Dkubernetes.namespace=flink-session-cluster-test-1213 \
-Dkubernetes.taskmanager.service-account=flink \
-Dexecution.attached=true \
-c com.xxx.bigdata.rt.dw.service.runtime.RealtimeWarehouseMain \
./realtime-dw-service-1.0.1-SNAPSHOT.jar \
-cfc state.checkpoint.interval=60000 -cfp 1 -cfm no -cfn kafka_es -cfs ./tt.sql
问题及解决:
1, flink任务的hosts问题?
可以通过flink 提供的yaml模板, 将hosts配置放在yaml中, 然后在命令使用-Dkubernetes.pod-template-file指定
2, 关于使用session 模式出现启动taskManager时, 获取configmap权限不够的问题?
可以使用 -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.service-account=flink -Dkubernetes.taskmanager.service-account=flink 来解决
3, 如果使用application模式, 解决自定义jar包不想打入镜像的问题?
可以在yaml模板中, initContainers使用wget方式引入
session模式与application模式的相互比较
session模式, 先启动jobmanager, 再之后根据提交的任务, 来启动taskManager, 导致多个任务日志耦合在一起, 但是自定义的jar包不需要再构建镜像, 相对提交比较简单
application模式, 需要将自定义的jar包构建在镜像中, 或者使用yaml模板的initContainers的方式. 日志可以分开.
这两种模式均可以实现高可用, 不管是application还是session模式, 均可以自动以checkpoint自动重启
本次未实践日志的收集等相关
追加一下信息, 笔者通过几天的实践, 发现一种更为方便的提交application模式的任务, 直接使用flink的镜像, 将hadoop相关的jar包直接放入到$FLINK_HOME/lib/extr-lib目录下, flink会自己加载这个jar包
apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
spec:
initContainers:
- name: artifacts-fetcher
image: flink:1.13.2-scala_2.11-java8
# Use wget or other tools to get user jars from remote storage
command: ["/bin/sh","-c"]
# 将自定义udf函数jar包和hadoop的jar包均放在$flinkHome下的extr-lib目录下,让flink自己加载到
args: ["wget http://xxxx:8082/tt1.sql -O /opt/flink/usrHome/taa.sql ; wget http://xxxx:8082/realtime-dw-service-1.0.1.jar -O /opt/flink/usrHome/realtime-dw-service-1.0.1.jar; wget http://xxxx:8082/udf-1.0.1-SNAPSHOT.jar -O /opt/flink/extr-lib/udf-1.0.1-SNAPSHOT.jar; wget http://xxxx:8082/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar -O /opt/flink/extr-lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar"]
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
- mountPath: /opt/flink/extr-lib
name: flink-usr-extr-lib
hostAliases:
- ip: 10.1.1.103
hostnames:
- "cdh103"
- ip: 10.1.1.104
hostnames:
- "cdh104"
- ip: 10.1.1.105
hostnames:
- "cdh105"
- ip: 10.1.1.106
hostnames:
- "cdh106"
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
#使用subPath,可以将指定文件挂载到目录中
#- mountPath: /opt/flink/lib/udf-1.0.1-SNAPSHOT.jar
- mountPath: /opt/flink/lib/extr-lib
name: flink-usr-extr-lib
#使用subPath,可以将指定文件挂载到目录中
#subPath: udf-1.0.1-SNAPSHOT.jar
volumes:
- name: flink-usr-home
emptyDir: {}
- name: flink-usr-extr-lib
emptyDir: {}
提交命令如下:
/data/flink-1.13.0/bin/flink run-application \
--target kubernetes-application \
-Dresourcemanager.taskmanager-timeout=345600 \
-Dkubernetes.namespace=flink-session-cluster-test-1213 \
-Dkubernetes.service-account=flink \
-Dkubernetes.cluster-id=flink-stream-reatime-dw11 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \
-Dkubernetes.container.image=flink:1.13.2-scala_2.11-java8 \
-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \
-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \
-Dexecution.checkpointing.interval=2s \
-Dexecution.checkpointing.mode=EXACTLY_ONCE \
-Dstate.backend=filesystem \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Drestart-strategy=failure-rate \
-Drestart-strategy.failure-rate.delay=1s \
-Drestart-strategy.failure-rate.failure-rate-interval=5s \
-Drestart-strategy.failure-rate.max-failures-per-interval=1 \
-Dtaskmanager.memory.process.size=1096m \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dkubernetes.pod-template-file=./flink-template-udf.yaml \
-c com.xxx.bigdata.rt.dw.service.runtime.RealtimeWarehouseMain \
local:///opt/flink/usrHome/realtime-dw-service-1.0.1.jar \
-cfc state.checkpoint.interval=60000 -cfp 1 -cfm no -cfn kafka_es -cfs /opt/flink/usrHome/taa.sql
更多推荐
所有评论(0)