1.在K8s上使用Flink

部署特定集群,部署好jobmanager和taskmanager,需要提交任务时直接提交即可运行。

首先部署flink-configuration-configmap.yaml

内容如下,主要配置了flink-conf.yaml和log4j日志输出

apiVersion: v1

kind: ConfigMap

metadata:

  name: flink-config

  labels:

    app: flink

data:

  flink-conf.yaml: |+

    jobmanager.rpc.address: flink-jobmanager

    taskmanager.numberOfTaskSlots: 1

    blob.server.port: 6124

    jobmanager.rpc.port: 6123

    taskmanager.rpc.port: 6122

    jobmanager.heap.size: 1024m

    taskmanager.memory.process.size: 1024m

  log4j.properties: |+

    log4j.rootLogger=INFO, file

    log4j.logger.akka=INFO

    log4j.logger.org.apache.kafka=INFO

    log4j.logger.org.apache.hadoop=INFO

    log4j.logger.org.apache.zookeeper=INFO

    log4j.appender.file=org.apache.log4j.FileAppender

    log4j.appender.file.file=${log.file}

    log4j.appender.file.layout=org.apache.log4j.PatternLayout

    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

 log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

 

1.1部署Jobmanager

apiVersion: apps/v1

kind: Deployment

metadata:

  name: flink-jobmanager

spec:

  replicas: 1

  selector:

    matchLabels:

      app: flink

      component: jobmanager

  template:

    metadata:

      labels:

        app: flink

        component: jobmanager

    spec:

      containers:

      - name: jobmanager

        image: 10.180.210.196/flink/flink:latest

        workingDir: /opt/flink

        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\

          while :;

          do

            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];

              then tail -f -n +1 log/*jobmanager*.log;

            fi;

          done"]

        ports:

        - containerPort: 6123

          name: rpc

        - containerPort: 6124

          name: blob

        - containerPort: 8081

          name: ui

        livenessProbe:

          tcpSocket:

            port: 6123

          initialDelaySeconds: 30

          periodSeconds: 60

        volumeMounts:

        - name: flink-config-volume

          mountPath: /opt/flink/conf

        securityContext:

          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary

      volumes:

      - name: flink-config-volume

        configMap:

          name: flink-config

          items:

          - key: flink-conf.yaml

            path: flink-conf.yaml

          - key: log4j.properties

            path: log4j.properties

 

1.2部署Taskmanager

apiVersion: apps/v1

kind: Deployment

metadata:

  name: flink-taskmanager

spec:

  replicas: 2

  selector:

    matchLabels:

      app: flink

      component: taskmanager

  template:

    metadata:

      labels:

        app: flink

        component: taskmanager

    spec:

      containers:

      - name: taskmanager

        image: 10.180.210.196/flink/flink:latest

        workingDir: /opt/flink

        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \

          while :;

          do

            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];

              then tail -f -n +1 log/*taskmanager*.log;

            fi;

          done"]

        ports:

        - containerPort: 6122

          name: rpc

        livenessProbe:

          tcpSocket:

            port: 6122

          initialDelaySeconds: 30

          periodSeconds: 60

        volumeMounts:

        - name: flink-config-volume

          mountPath: /opt/flink/conf/

        securityContext:

          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary

      volumes:

      - name: flink-config-volume

        configMap:

          name: flink-config

          items:

          - key: flink-conf.yaml

            path: flink-conf.yaml

          - key: log4j.properties

            path: log4j.properties

 

1.3部署service服务

部署service使taskmanager能够访问jobmanager

apiVersion: v1

kind: Service

metadata:

  name: flink-jobmanager

spec:

  type: ClusterIP

  ports:

  - name: rpc

    port: 6123

  - name: blob

    port: 6124

  - name: ui

    port: 8081

  selector:

    app: flink

    component: jobmanager

部署webui服务

apiVersion: v1

kind: Service

metadata:

  name: flink-jobmanager-rest

spec:

  type: NodePort

  ports:

  - name: rest

    port: 8081

    targetPort: 8081

  selector:

    app: flink

    component: jobmanager

1.4访问webUI提交程序

通过以下命令获取Jobmanager所在节点

kubectl get pod -o wide

kubectl get svc

 

2.部署原生Flink on K8s

下载flink,解压

2.1配置RBAC和ServiceAccont权限

apiVersion: v1

kind: ServiceAccount

metadata:

  name: flink

  namespace: default

---

apiVersion: rbac.authorization.k8s.io/v1beta1

kind: Role

metadata:

  namespace: default

  name: flink-role

rules:

- apiGroups:

  - "" # "" indicates the core API group

  resources:

  - "pods"

  verbs:

  - "*"

- apiGroups:

  - "" # "" indicates the core API group

  resources:

  - "services"

  verbs:

  - "*"

---

apiVersion: rbac.authorization.k8s.io/v1beta1

kind: RoleBinding

metadata:

  name: flink-role-binding

  namespace: default

subjects:

- kind: ServiceAccount

  name: flink

  namespace: default

roleRef:

  kind: Role

  name: flink-role

  apiGroup: rbac.authorization.k8s.io

2.2启动k8s-session

./bin/kubernetes-session.sh \

  -Dtaskmanager.memory.process.size=10240m \

  -Dkubernetes.taskmanager.cpu=2 \

  -Dtaskmanager.numberOfTaskSlots=4 \

  -Dresourcemanager.taskmanager-timeout=360000 \

  -Dkubernetes.service.create-timeout=100min \

  -Dkubernetes.config.file=/root/.kube/config \

  -Dkubernetes.jobmanager.service-account=flink

 

kubernetes-session.sh参数列表

参数

默认值

类型

参数说明

kubernetes.cluster-id

(none)

String

集群id,如果不输入可以自动生成uuid

kubernetes.config.file

(none)

String

K8s默认配置文件,不输入会使用默认路径~/.kube/config

kubernetes.container-start-command-template

"%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"

String

模板启动Jobmanager和Taskmanager启动命令

kubernetes.container.image

"flink:latest"

String

默认启动镜像

kubernetes.container.image.pull-policy

"IfNotPresent"

String

K8s镜像拉取策略

kubernetes.entry.path

"/opt/flink/bin/kubernetes-entry.sh"

String

K8s entrypoint脚本路径

kubernetes.flink.conf.dir

"/opt/flink/conf"

String

挂载flink配置文件的路径,路径包含flink-conf.yaml和log4j.propertis

kubernetes.flink.log.dir

"/opt/flink/log"

String

日志文件存储路径

kubernetes.jobmanager.cpu

1.0

Double

Jobmanager CPU核心数

kubernetes.jobmanager.service-account

"default"

String

Jobmanager的ServiceAccont,用于Jobmanager在集群中创建Taskmanager

kubernetes.namespace

"default"

String

运行所在的命名空间

kubernetes.rest-service.exposed.type

"LoadBalancer"

String

负载均衡器, ClusterIP/NodePort/LoadBalancer(default). 即web页面访问的负载均衡

kubernetes.service.create-timeout

"1 min"

String

服务创建超时时长

kubernetes.taskmanager.cpu

-1.0

Double

TaskManager CPU核心数

2.3提交方式

(1)脚本提交

 

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar

<ClusterId>即已经创建的ClusterId,上图红框所示即ClusterId

 

(2)webUI提交

登陆日志打印的webUI链接或者查询所在节点和映射端口访问。

2.4关闭k8s session

./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true

已知问题,关闭脚本不起作用时,可以手动删除deployment和service

2.5工作流程

3.Flink operator使用

3.1构建镜像

源码下载

git clone https://github.com/GoogleCloudPlatform/flink-on-k8s-operator.git

源码编译

修改Makefile中的build方法,用于设置go mod代理

CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on GOPROXY=https://goproxy.cn go build -a -o bin/flink-operator main.go

 

修改Makefile 中的generate方法配置代理

GO111MODULE=on GOPROXY=https://goproxy.cn go get sigs.k8s.io/controller-tools/cmd/controller-gen@v0.3.0 ;\

 

修改Makefile中operator-image方法  删除执行的

test-in-docker方法

 

修改Dockerfile.builder删除以下代码

RUN go mod download

RUN make build

原因:在物理机上编译源码后,无需在镜像中再次编译,镜像中编译速度慢且代理容易中断。

 

下载依赖镜像

以下两个镜像在国内无法直接下载,可通过 "不能说" 下载后拷贝至本地,也可以通过阿里云代理下载后重新tag.

Dockerfile中的gcr.io/distroless/static:latest

config/default/manager_auth_proxy_patch.yaml中的gcr.io/kubebuilder/kube-rbac-proxy:v0.4.0

将gcr.io/kubebuilder/kube-rbac-proxy:v0.4.0中的gcr.io替换为registry.cn-hangzhou.aliyuncs.com,使用docker pull拉取镜像并重新tag

docker pull registry.cn-hangzhou.aliyuncs.com/kubebuilder/kube-rbac-proxy:v0.4.0

docker tag registry.cn-hangzhou.aliyuncs.com/kubebuilder/kube-rbac-proxy:v0.4.0 gcr.io/kubebuilder/kube-rbac-proxy:v0.4.0

 

以上操作完成后开始进行编译

 

make build

 

构建镜像

 

make operator-image

 

 

重新tag镜像并推送至远端仓库

docker tag gcr.io/flink-operator/flink-operator:latest 10.180.210.196/bddata/flink-operator-google:0.0.3

docker push 10.180.210.196/bddata/flink-operator-google:0.0.3

 

 

3.2运行flink-operator服务

安装kustomize

访问https://api.github.com/repos/kubernetes-sigs/kustomize/releases/latest

查找browser_download_url中的linux_amd64.tar.gz版本

执行wget 操作

wget https://github.com/kubernetes-sigs/kustomize/releases/download/kustomize/v3.8.1/kustomize_v3.8.1_linux_amd64.tar.gz

解压安装

tar –zxvf kustomize_v3.8.1_linux_amd64.tar.gz

mv  kustomize /usr/local/bin/

修改config/default/manager_auth_proxy_patch.yaml文件中的镜像地址

10.180.210.196/bddata/kube-rbac-proxy:v0.4.0

执行以下命令启动flink-operator镜像

make deploy IMG=10.180.210.196/bddata/flink-operator-google:0.0.3 FLINK_OPERATOR_NAMESPACE=flink-operator

 

查看节点状态

kubectl get pod  -n flink-operator

3.3提交任务  

修改examples/update/wordcount-1.9.3.yaml文件中的镜像地址

10.180.210.196/bddata/flink:1.9.3

 

启动flink任务

kubectl create -f wordcount-1.9.3.yaml

 

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐