Flink kubernetes operator方式

flink kubernetes operator官网地址

需要kubernetes version>=1.6,flink version>=1.13

本次部署方案采用flink on k8s,部署方式是flink-kubernetes-operator,部署flink-kubernetes-operator需要helm。

  1. 首先通过helm安装flink-kubernetes-operator,非首次部署可以忽略该步骤
#创建一个专门用于flink服务的namespace

kubectl create namespace flink-clusters


kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml


helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.2.0/


helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator -n flink-clusters

#创建serviceaccount
kubectl create serviceaccount flink -n bi
  1. 获取flink服务的yaml文件-my-first-job-on-k8s.yaml
apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

  namespace: test

  name: my-first-job-on-k8s

spec:

  image: first-flink-k8s:1.13.1.1

  flinkVersion: v1_13

  flinkConfiguration:

    taskmanager.numberOfTaskSlots: "1"

    classloader.resolve-order: parent-first

    jobmanager.memory.process.size: "2048m"

    taskmanager.memory.process.size: "2048m"

    $internal.application.main: "com.sword.FirstFlinkJob"

  serviceAccount: flink

  jobManager:

    resource:

      memory: "2048m"

      cpu: 1

  taskManager:

    resource:

      memory: "2048m"

      cpu: 1

  job:

    jarURI: local:///opt/flink/usrlib/my-first-job.jar

    parallelism: 2

    upgradeMode: stateless

    args: ["--offset","1662912000000","--startOffset","earliest","--flag","test","--servers","kafka-svc.infra:9092","--jobName","my-first-job-on-k8s","--enable_checkpoint","false"]

  ingress:

    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"

    className: "nginx"

    annotations:

      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  1. 根据yaml文件启动服务
kubectl create -f ./my-first-job-on-k8s.yaml

#后续更新服务

kubectl apply -f ./my-first-job-on-k8s.yaml

登陆数据库检查产出情况

所需注意问题
  1. 部署前一定要注意k8s版本和flink版本,版本不满足要求是无法部署的;
  2. 任务所需要的参数,需要在yaml文件中配置,是字符串数组的形式;
  3. ingress是为了可以使用flink UI,如果不需要的话可以不配置,或者使用官网的方式kubectl port-forward svc/basic-example-rest 8081。该方式为临时方案,该命令推出后就无法再查看UI;

flink命令方式

该方式如果在服务器上执行,需要配置java和flink环境;我这里使用的是jdk1.8和flink1.13,或者用docker镜像执行flink命令

flink run-application \
    --target kubernetes-application \
    -c com.sword.FirstFlinkJob \
    -Dkubernetes.cluster-id=my-first-job-on-k8s \
    -Dkubernetes.container.image=first-flink-k8s:1.13.1.1 \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    -Dkubernetes.namespace=bi \
    -Dclassloader.resolve-order=parent-first \
    -Dparallelism=2 \
    local:///opt/flink/usrlib/my-first-job.jar \
    --offset 1662912000000 --startOffset earliest --flag test --servers kafka-svc.infra:9092 --jobName my-first-job-on-k8s --enable_checkpoint false

构建镜像Dockerfile

我使用的是flink1.13,其他版本类似,在docker hub中查询自己所使用版本的基础镜像即可

FROM flink:1.13.6-scala_2.12-java8


ENV TIME_ZONE Asia/Shanghai
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone

RUN mkdir -p $FLINK_HOME/usrlib
COPY lib/*.jar $FLINK_HOME/lib
COPY my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
COPY flink-conf.yaml $FLINK_HOME/conf

CMD ["/bin/bash"]

文件结构如图
在这里插入图片描述

注意事项
  1. 如果需要使用kafka connector或者cdc的话,需要将对应的依赖jar包导入到flink_home下面的lib文件夹,例如如图片中lib下面的所有jar;
  2. 镜像中默认是utc时区,这会导致flink中的一些时间函数和watermark错误,需要在Dockerfile中更改环境变量,设置时区为东八区;
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐