flink on k8 实现ha

一、前期准备
  • 上传flink-1.11.1-bin-scala_2.11.tgz 并解析到/opt 下,用于提交job
    在这里插入图片描述

    • 在nfs根目录下创建相应需要进行挂载的目录
    mkdir -p /jtpf_test/opt/flink
    
    • 上传flink1.tar.gz 并将解析到/jtpf_test/opt/flink下

    在这里插入图片描述

二、部署flink 集群
  • 修改/opt/flink/jobmanager/conf 下flink-conf.yaml 文件
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181 
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
  • 修改/opt/flink/taskmanager/conf 下flink-conf.yaml 文件
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 2048m
taskmanager.heap.size: 2048m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
  • 创建flink-jobmanager.yml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: flink-jobmanager-conf-pv
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  capacity:
    storage: 1Gi
  persistentVolumeReclaimPolicy: Retain
  storageClassName: nas
  nfs:
    path: /opt/flink/jobmanager/conf
    server: 172.24.128.28 

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: flink-jobmanager-conf-pvc
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: nas
  resources:
    requests:
      storage: 1Gi

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: flink-jobmanager-lib-pv
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  capacity:
    storage: 1Gi
  persistentVolumeReclaimPolicy: Retain
  storageClassName: nas
  nfs:
    path: /opt/flink/jobmanager/lib
    server: 172.24.128.28

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: flink-jobmanager-lib-pvc
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: nas
  resources:
    requests:
      storage: 1Gi

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: flink-jobmanager-data-pv
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  capacity:
    storage: 1Gi
  persistentVolumeReclaimPolicy: Retain
  storageClassName: nas
  nfs:
    path: /opt/flink/jobmanager/data
    server: 172.24.128.28

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: flink-jobmanager-data-pvc
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: nas
  resources:
    requests:
      storage: 1Gi

---
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-svc
  namespace: kafka
spec:
  ports:
  - name: flink-jobmanager-web
    nodePort: 30044
    port: 8081
    targetPort: 8081
  - name: flink-jobmanager-rpc
    nodePort: 32000
    port: 6123
    targetPort: 6123
  - name: blob
    nodePort: 32001
    targetPort: 6124
    port: 6124
  - name: query
    nodePort: 32002
    targetPort: 6125
    port: 6125
  type: NodePort
  selector:
    app: flink-jobmanager

---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: flink-jobmanager
  template:
    metadata:
      labels:
        app: flink-jobmanager
    spec:
      containers:
      - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
        name: flink-jobmanager
        workingDir: /opt/flink
        args:
        - jobmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager-svc
        ports:
        - containerPort: 8081
          name: flink-job 
        volumeMounts:
        - name: flink-jobmanager-conf-pv
          mountPath: /opt/flink/conf
        - name: flink-jobmanager-lib-pv
          mountPath: /opt/flink/lib
        - name: flink-jobmanager-data-pv
          mountPath: /data
      volumes:
      - name: flink-jobmanager-conf-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager-conf-pvc
      - name: flink-jobmanager-lib-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager-lib-pvc
      - name: flink-jobmanager-data-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager-data-pvc
      imagePullSecrets:
       - name: registrysecret
  • 创建flink-jobmanager
kubectl apply -f flink-jobmanager.yml
  • 创建flink-taskmanager.yml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: flink-taskmanager-pv
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  capacity:
    storage: 2Gi
  persistentVolumeReclaimPolicy: Retain
  storageClassName: nas
  nfs:
    path: /opt/flink/taskmanager/conf
    server: 172.24.128.28
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: flink-taskmanager-pvc
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: nas
  resources:
    requests:
      storage: 2Gi

---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flink-taskmanager
  template:
    metadata:
      labels:
        app: flink-taskmanager
    spec:
      containers:
      - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
        name: flink-taskmanager
        ports:
        - containerPort: 8081
          name: flink-task
        workingDir: /opt/flink
        args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager-svc
        volumeMounts:
        - name: flink-taskmanager-pv
          mountPath: /opt/flink/conf
        - name: flink-jobmanager-lib-pv
          mountPath: /opt/flink/lib
      volumes:
      - name: flink-taskmanager-pv
        persistentVolumeClaim:
          claimName: flink-taskmanager-pvc
      - name: flink-jobmanager-lib-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager
          -lib-pvc
      imagePullSecrets:
        - name: registrysecret   

  • 创建flink-taskmanager
apiVersion: v1
kind: PersistentVolume
metadata:
  name: flink-taskmanager-pv
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  capacity:
    storage: 2Gi
  persistentVolumeReclaimPolicy: Retain
  storageClassName: nas
  nfs:
    path: /opt/flink/taskmanager/conf
    server: 172.24.128.28
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: flink-taskmanager-pvc
  namespace: kafka
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: nas
  resources:
    requests:
      storage: 2Gi

---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flink-taskmanager
  template:
    metadata:
      labels:
        app: flink-taskmanager
    spec:
      containers:
      - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
        name: flink-taskmanager
        ports:
        - containerPort: 8081
          name: flink-task
        workingDir: /opt/flink
        args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager-svc
        volumeMounts:
        - name: flink-taskmanager-pv
          mountPath: /opt/flink/conf
        - name: flink-jobmanager-lib-pv
          mountPath: /opt/flink/lib
      volumes:
      - name: flink-taskmanager-pv
        persistentVolumeClaim:
          claimName: flink-taskmanager-pvc
      - name: flink-jobmanager-lib-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager-lib-pvc
      imagePullSecrets:
        - name: registrysecret   
  • 创建flink-taskmanager
kubectl apply -f flink-taskmanager.yml
三、通过jenkins集成sumbit job
  • 将git代码打包到/opt/flink-1.11.1/jar/目录下

  • 在Jenkins发版时需要创建的参数有:

    mainclass : 程序执行时的主类
    jobname: job名称
    parallelism : 并行度
    
  • 根据jobname查找flink集群中正在运行的job,并根据jobid 停止job

bash ./bin/flink list -m localhost:30044 list -a | grep StatisticApiCount
bash ./bin/flink cancel -m localhost:30044 2f8251ef38a6a0d82534335bb9ec5168

在这里插入图片描述

  • StatisticApiCount 为jobname,发版时需要弄成参数;2f8251ef38a6a0d82534335bb9ec5168 为flink集群中正在运行的job id
  • 提交job
bash ./bin/flink run -d -m localhost:38081 -c statistic.gatewaylog.test_group.StatisticApiCount  -p 3 ./jar/flink-svc-1.1.0.jar 

在这里插入图片描述

  • statistic.gatewaylog.test_group.StatisticApiCount 为mainclass,程序执行时的主类,发版时需要弄成参数;3 为并行度,parallelism,发版时需要弄成参数
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐