Flink On k8s
flink-1.15.1on k8s
参考文章:
https://www.cnblogs.com/liugp/p/16755095.html
如果你的程序中需要用到HDFS(一般是checkpint的存储目录),可以先参考上一篇文章:
Hadoop、Hive On k8s
《Hadoop、Hive On k8s》
介绍
Flink On K8s 有两种方式
- session:先启动一个jobmanager,在提交任务,启动taskmanger
- application:直接将jar提交运行,每一个jar所在的环境都是单独的(主要讲这个)
组件版本:
- hadoop:2.7.2
- flink:1.15.1
一、构建DockerFile
vim dockerfile-flink-1.15.1
# 截至2023-03-01日,在dockerhub能下载最新的flink镜像版本为:flink:1.14.2-scala_2.12
# 这里将dockerhub的镜像上传到本地的阿里云仓库
# 由于我这边的程序是基于flink-1.15.1版本开发的,所以这边需要手动去升级flink镜像,并且在镜像内部添加一些环境
FROM registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.14.2-scala_2.12
USER root
ENV FLINK_VERSION 1.15.1
ENV SCALA_VERSION 2.12
ENV HADOOP_VERSION 2.7.2
ENV HADOOP_HOME=/opt/hadoop
ENV HADOOP_CONFIG_DIF=/etc/hadoop/conf/
ENV HADOOP_COMMON_HOME=${HADOOP_HOME} \
HADOOP_HDFS_HOME=${HADOOP_HOME} \
HADOOP_MAPRED_HOME=${HADOOP_HOME} \
HADOOP_YARN_HOME=${HADOOP_HOME} \
HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop \
PATH=${PATH}:${HADOOP_HOME}/bin
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
# 手动删除flink-1.14.2的环境,替换为flink-1.15.1
RUN rm -rf $FLINK_HOME/*
COPY flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz /tmp/flink.tgz
RUN tar --directory /opt -xf /tmp/flink.tgz && rm /tmp/flink.tgz
RUN mkdir -p /opt/flink && mv /opt/flink-${FLINK_VERSION}/* /opt/flink/ && rm -rf /opt/flink-${FLINK_VERSION}
# flink的checkpoint需要用到HDFS,引入hadoop环境
COPY flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar /opt/flink/lib/
COPY hadoop-${HADOOP_VERSION}.tar.gz /tmp/hadoop.tgz
RUN tar --directory /opt -xzf /tmp/hadoop.tgz && rm /tmp/hadoop.tgz
RUN ln -s /opt/hadoop-${HADOOP_VERSION} ${HADOOP_HOME}
# 拷贝HDFS的配置文件
COPY hadoop-env.sh ${HADOOP_CONFIG_DIF}/
COPY core-site.xml ${HADOOP_CONFIG_DIF}/
COPY hdfs-site.xml ${HADOOP_CONFIG_DIF}/
ENV FLINK_HOME /opt/flink
ENV PATH=${PATH}:${FLINK_HOME}/bin
RUN chown -R flink:flink /opt/flink
RUN mkdir -p $FLINK_HOME/usrlib
WORKDIR $FLINK_HOME
RUN chown -R flink:flink /opt/flink && chmod -R 666 /opt/flink/conf/*
# 拷贝启动jar和启动配置文件
COPY flink-realtime-1.0-SNAPSHOT.jar $FLINK_HOME/lib/
COPY flink-realtime-hdfs.properties $FLINK_HOME/usrlib/
COPY handle-table.txt $FLINK_HOME/usrlib/
# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["help"]
注意:Dockerfile 中所需的flink、hadoop的tgz包,需要先手动下载至当前目录(速度更快)
docker build -f dockerfile-flink-1.15.1 -t flink:1.15.1 . --no-cache
后续通过docker tag
+ docker push
上传至自己的镜像仓库
二、Flink环境部署
1、宿主机部署Flink-Client
wget https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
mv flink-1.15.2-bin-scala_2.12.tgz /opt && cd /opt && tar -xf flink-1.15.2-bin-scala_2.12.tgz
ll /opt/flink-1.15.1
2、创建命令空间和serviceacount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
3、提交Flink任务
提交Flink自带测试程序
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05 \
-Dkubernetes.jobmanager.replicas=1 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=1000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
提交用户自己开发程序
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
# 指定容器启动的镜像(与之前提交的保持一致)
-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05 \
-Dkubernetes.jobmanager.replicas=1 \
# 指定容器运行的命名空间
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=1000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dclassloader.resolve-order=parent-first \
# yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包和配置文件
-Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \
# Main方法
-c com.clb.hadoop.hub.flink.realtime.launch.FlinkConsumeKafkaToHdfs \
# 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机)
local:///opt/flink/lib/flink-realtime-1.0-SNAPSHOT.jar /opt/flink/usrlib/flink-realtime-hdfs.properties
4、查看任务
kubectl get -n flink pod,svc,deployment,configmap,pv
- pod/flink-cluster-79ff8b6dd-nn6sk:jobmanager服务
- pod/flink-cluster-taskmanager-1-1:taskmanager服务
- pod/hadoop-dn-node1-0:hadoop服务下的datanode
- pod/hadoop-nn-0:hadoop服务下的namenode
可以通过下面命令查看Pod的状态及日志
# 查看jobmanager的日志
kubectl logs flink-cluster-79ff8b6dd-nn6sk -n flink
# 查看taskmanager的日志
kubectl logs flink-cluster-taskmanager-1-1 -n flink
# 查看jobmanager的pod信息
kubectl describe pod flink-cluster-79ff8b6dd-nn6sk -n flink
# 停止刚刚提交的程序
kubectl delete deployment flink-cluster -n flink
三、注意事项:
1、yaml 模板
为什么需要用到yaml模板?
我的代码中用到了kafka组件。比如我当前的kafka环境为:node2:9092、node3:9092、node4:9094
KafkaClient在连接KafkaBroker时,会获取集群内部的所有broker的连接信息。如果我们没有提前映射IP,容器内部去根据node2、node3、node4这些主机名连接对应的服务时会报错(因为/etc/hosts文件内没有映射)
apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
namespace: flink
spec:
initContainers:
- name: artifacts-fetcher
image: registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05
command:
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
- mountPath: /opt/flink/usrlib
name: flink-usr-extr-lib
# 提前在容器内部映射域名
hostAliases:
- ip: 192.168.0.111
hostnames:
- "master"
- ip: 192.168.0.113
hostnames:
- "node2"
- ip: 192.168.0.114
hostnames:
- "node3"
- ip: 192.168.0.115
hostnames:
- "node4"
containers:
# Do not change the main container name
- name: flink-main-container
image: registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
- mountPath: /opt/flink/lib/extr-lib
name: flink-usr-extr-lib
volumes:
- name: flink-usr-home
emptyDir: {}
- name: flink-usr-extr-lib
emptyDir: {}
2、hadoop-config-volume问题
问题:
如果你的程序时用到了hadoop环境,那么你的taskmanager在启动时可能会卡住
通过命令查看原因:
kubectl describe pod flink-cluster-taskmanager-1-1 -n flink
MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap "hadoop-config-flink-cluster" not found
解决:
添加一个configmap,将HDFS环境下的core-site.xml、hdfs-site.xml添加进去
注意:
- namespace:要与之前创建的namespace保持一致
- app:要与之前提交任务时的kubernetes.cluster-id保持一致
vim hadoop-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
labels:
app: flink-cluster
type: flink-native-kubernetes
name: hadoop-config-flink-cluster
namespace: flink
data:
core-site.xml: |
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-nn-0.hadoop-nn-service:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/var/hadoop</value>
</property>
</configuration>
hdfs-site.xml: |
<configuration>
<property>
<name>dfs.name.dir</name>
<value>/dfs/nn</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/dfs/dn/data/</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>
启动此configmap
kubectl apply -f hadoop-config.yaml
停止程序kubectl delete deployment flink-cluster -n flink
,重新提交即可
3、启动参数
-Dresourcemanager.taskmanager-timeout=345600 \
-Dkubernetes.namespace=flink-session-cluster-test-1213 \
-Dkubernetes.service-account=flink \
-Dkubernetes.cluster-id=flink-stream-reatime-dw11 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \
-Dkubernetes.container.image=flink:1.13.2-scala_2.11-java8 \
-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \
-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \
-Dexecution.checkpointing.interval=2s \
-Dexecution.checkpointing.mode=EXACTLY_ONCE \
-Dstate.backend=filesystem \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Drestart-strategy=failure-rate \
-Drestart-strategy.failure-rate.delay=1s \
-Drestart-strategy.failure-rate.failure-rate-interval=5s \
-Drestart-strategy.failure-rate.max-failures-per-interval=1 \
-Dtaskmanager.memory.process.size=1096m \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=1 \
四、问题及解决
1、 flink任务的hosts问题?
可以通过flink 提供的yaml模板, 将hosts配置放在yaml中, 然后在命令使用-Dkubernetes.pod-template-file指定
2、 如果使用application模式, 解决自定义jar包不想打入镜像的问题?
可以在yaml模板中, initContainers使用wget方式引入
vi flink-template.yaml
apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
spec:
initContainers:
- name: artifacts-fetcher
image: native_realtime:1.0.3
# 添加自定义运行的jar包以及各种配置文件
command: ["/bin/sh","-c"]
args: ["wget http://xxxxxx:8082/flinkhistory/1.13.2/tt.sql -O /opt/flink/usrHome/taa.sql ; wget http://xxxx:8082/flinkhistory/1.13.2/realtime-dw-service-1.0.1-SNAPSHOT.jar -O /opt/flink/usrHome/realtime-dw-service-1.0.1.jar"]
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
hostAliases:
- ip: 10.1.1.103
hostnames:
- "cdh103"
- ip: 10.1.1.104
hostnames:
- "cdh104"
- ip: 10.1.1.105
hostnames:
- "cdh105"
- ip: 10.1.1.106
hostnames:
- "cdh106"
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrHome
name: flink-usr-home
volumes:
- name: flink-usr-home
hostPath:
path: /tmp
type: Directory
更多推荐
所有评论(0)