flink on k8s部署--kubernetes operator方式&Application Mode方式(附构建flink镜像Dockerfile)
本次部署方案采用flink on k8s,部署方式是flink-kubernetes-operator,部署flink-kubernetes-operator需要helm。我这里使用的是jdk1.8和flink1.13,或者用docker镜像执行flink命令。我使用的是flink1.13,其他版本类似,在docker hub中查询自己所使用版本的基础镜像即可。需要kubernetes versi
·
Flink kubernetes operator方式
需要kubernetes version>=1.6,flink version>=1.13
本次部署方案采用flink on k8s,部署方式是flink-kubernetes-operator,部署flink-kubernetes-operator需要helm。
- 首先通过helm安装flink-kubernetes-operator,非首次部署可以忽略该步骤
#创建一个专门用于flink服务的namespace
kubectl create namespace flink-clusters
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.2.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator -n flink-clusters
#创建serviceaccount
kubectl create serviceaccount flink -n bi
- 获取flink服务的yaml文件-my-first-job-on-k8s.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: test
name: my-first-job-on-k8s
spec:
image: first-flink-k8s:1.13.1.1
flinkVersion: v1_13
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
classloader.resolve-order: parent-first
jobmanager.memory.process.size: "2048m"
taskmanager.memory.process.size: "2048m"
$internal.application.main: "com.sword.FirstFlinkJob"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/usrlib/my-first-job.jar
parallelism: 2
upgradeMode: stateless
args: ["--offset","1662912000000","--startOffset","earliest","--flag","test","--servers","kafka-svc.infra:9092","--jobName","my-first-job-on-k8s","--enable_checkpoint","false"]
ingress:
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
- 根据yaml文件启动服务
kubectl create -f ./my-first-job-on-k8s.yaml
#后续更新服务
kubectl apply -f ./my-first-job-on-k8s.yaml
登陆数据库检查产出情况
所需注意问题
- 部署前一定要注意k8s版本和flink版本,版本不满足要求是无法部署的;
- 任务所需要的参数,需要在yaml文件中配置,是字符串数组的形式;
- ingress是为了可以使用flink UI,如果不需要的话可以不配置,或者使用官网的方式
kubectl port-forward svc/basic-example-rest 8081
。该方式为临时方案,该命令推出后就无法再查看UI;
flink命令方式
该方式如果在服务器上执行,需要配置java和flink环境;我这里使用的是jdk1.8和flink1.13,或者用docker镜像执行flink命令
flink run-application \
--target kubernetes-application \
-c com.sword.FirstFlinkJob \
-Dkubernetes.cluster-id=my-first-job-on-k8s \
-Dkubernetes.container.image=first-flink-k8s:1.13.1.1 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.namespace=bi \
-Dclassloader.resolve-order=parent-first \
-Dparallelism=2 \
local:///opt/flink/usrlib/my-first-job.jar \
--offset 1662912000000 --startOffset earliest --flag test --servers kafka-svc.infra:9092 --jobName my-first-job-on-k8s --enable_checkpoint false
构建镜像Dockerfile
我使用的是flink1.13,其他版本类似,在docker hub中查询自己所使用版本的基础镜像即可
FROM flink:1.13.6-scala_2.12-java8
ENV TIME_ZONE Asia/Shanghai
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone
RUN mkdir -p $FLINK_HOME/usrlib
COPY lib/*.jar $FLINK_HOME/lib
COPY my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
COPY flink-conf.yaml $FLINK_HOME/conf
CMD ["/bin/bash"]
文件结构如图
注意事项
- 如果需要使用kafka connector或者cdc的话,需要将对应的依赖jar包导入到flink_home下面的lib文件夹,例如如图片中lib下面的所有jar;
- 镜像中默认是utc时区,这会导致flink中的一些时间函数和watermark错误,需要在Dockerfile中更改环境变量,设置时区为东八区;
更多推荐
已为社区贡献2条内容
所有评论(0)