flink ON k8s 实现 ha
flink on k8 实现ha一、前期准备上传flink-1.11.1-bin-scala_2.11.tgz 并解析到/opt 下,用于提交job在nfs根目录下创建相应需要进行挂载的目录mkdir -p /jtpf_test/opt/flink上传flink1.tar.gz 并将解析到/jtpf_test/opt/flink下二、部署flink 集群修改/opt/flink/jobmanage
·
flink on k8 实现ha
一、前期准备
-
上传flink-1.11.1-bin-scala_2.11.tgz 并解析到/opt 下,用于提交job
- 在nfs根目录下创建相应需要进行挂载的目录
mkdir -p /jtpf_test/opt/flink
- 上传flink1.tar.gz 并将解析到/jtpf_test/opt/flink下
二、部署flink 集群
- 修改/opt/flink/jobmanager/conf 下flink-conf.yaml 文件
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
- 修改/opt/flink/taskmanager/conf 下flink-conf.yaml 文件
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 2048m
taskmanager.heap.size: 2048m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
- 创建flink-jobmanager.yml
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-jobmanager-conf-pv
namespace: kafka
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 1Gi
persistentVolumeReclaimPolicy: Retain
storageClassName: nas
nfs:
path: /opt/flink/jobmanager/conf
server: 172.24.128.28
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: flink-jobmanager-conf-pvc
namespace: kafka
spec:
accessModes:
- ReadWriteMany
storageClassName: nas
resources:
requests:
storage: 1Gi
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-jobmanager-lib-pv
namespace: kafka
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 1Gi
persistentVolumeReclaimPolicy: Retain
storageClassName: nas
nfs:
path: /opt/flink/jobmanager/lib
server: 172.24.128.28
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: flink-jobmanager-lib-pvc
namespace: kafka
spec:
accessModes:
- ReadWriteMany
storageClassName: nas
resources:
requests:
storage: 1Gi
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-jobmanager-data-pv
namespace: kafka
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 1Gi
persistentVolumeReclaimPolicy: Retain
storageClassName: nas
nfs:
path: /opt/flink/jobmanager/data
server: 172.24.128.28
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: flink-jobmanager-data-pvc
namespace: kafka
spec:
accessModes:
- ReadWriteMany
storageClassName: nas
resources:
requests:
storage: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-svc
namespace: kafka
spec:
ports:
- name: flink-jobmanager-web
nodePort: 30044
port: 8081
targetPort: 8081
- name: flink-jobmanager-rpc
nodePort: 32000
port: 6123
targetPort: 6123
- name: blob
nodePort: 32001
targetPort: 6124
port: 6124
- name: query
nodePort: 32002
targetPort: 6125
port: 6125
type: NodePort
selector:
app: flink-jobmanager
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: kafka
spec:
selector:
matchLabels:
app: flink-jobmanager
template:
metadata:
labels:
app: flink-jobmanager
spec:
containers:
- image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-jobmanager
workingDir: /opt/flink
args:
- jobmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager-svc
ports:
- containerPort: 8081
name: flink-job
volumeMounts:
- name: flink-jobmanager-conf-pv
mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
mountPath: /opt/flink/lib
- name: flink-jobmanager-data-pv
mountPath: /data
volumes:
- name: flink-jobmanager-conf-pv
persistentVolumeClaim:
claimName: flink-jobmanager-conf-pvc
- name: flink-jobmanager-lib-pv
persistentVolumeClaim:
claimName: flink-jobmanager-lib-pvc
- name: flink-jobmanager-data-pv
persistentVolumeClaim:
claimName: flink-jobmanager-data-pvc
imagePullSecrets:
- name: registrysecret
- 创建flink-jobmanager
kubectl apply -f flink-jobmanager.yml
- 创建flink-taskmanager.yml
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-taskmanager-pv
namespace: kafka
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 2Gi
persistentVolumeReclaimPolicy: Retain
storageClassName: nas
nfs:
path: /opt/flink/taskmanager/conf
server: 172.24.128.28
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: flink-taskmanager-pvc
namespace: kafka
spec:
accessModes:
- ReadWriteMany
storageClassName: nas
resources:
requests:
storage: 2Gi
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: kafka
spec:
replicas: 3
selector:
matchLabels:
app: flink-taskmanager
template:
metadata:
labels:
app: flink-taskmanager
spec:
containers:
- image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager-svc
volumeMounts:
- name: flink-taskmanager-pv
mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
mountPath: /opt/flink/lib
volumes:
- name: flink-taskmanager-pv
persistentVolumeClaim:
claimName: flink-taskmanager-pvc
- name: flink-jobmanager-lib-pv
persistentVolumeClaim:
claimName: flink-jobmanager
-lib-pvc
imagePullSecrets:
- name: registrysecret
- 创建flink-taskmanager
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-taskmanager-pv
namespace: kafka
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 2Gi
persistentVolumeReclaimPolicy: Retain
storageClassName: nas
nfs:
path: /opt/flink/taskmanager/conf
server: 172.24.128.28
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: flink-taskmanager-pvc
namespace: kafka
spec:
accessModes:
- ReadWriteMany
storageClassName: nas
resources:
requests:
storage: 2Gi
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: kafka
spec:
replicas: 3
selector:
matchLabels:
app: flink-taskmanager
template:
metadata:
labels:
app: flink-taskmanager
spec:
containers:
- image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager-svc
volumeMounts:
- name: flink-taskmanager-pv
mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
mountPath: /opt/flink/lib
volumes:
- name: flink-taskmanager-pv
persistentVolumeClaim:
claimName: flink-taskmanager-pvc
- name: flink-jobmanager-lib-pv
persistentVolumeClaim:
claimName: flink-jobmanager-lib-pvc
imagePullSecrets:
- name: registrysecret
- 创建flink-taskmanager
kubectl apply -f flink-taskmanager.yml
三、通过jenkins集成sumbit job
-
将git代码打包到/opt/flink-1.11.1/jar/目录下
-
在Jenkins发版时需要创建的参数有:
mainclass : 程序执行时的主类 jobname: job名称 parallelism : 并行度
-
根据jobname查找flink集群中正在运行的job,并根据jobid 停止job
bash ./bin/flink list -m localhost:30044 list -a | grep StatisticApiCount
bash ./bin/flink cancel -m localhost:30044 2f8251ef38a6a0d82534335bb9ec5168
- StatisticApiCount 为jobname,发版时需要弄成参数;2f8251ef38a6a0d82534335bb9ec5168 为flink集群中正在运行的job id
- 提交job
bash ./bin/flink run -d -m localhost:38081 -c statistic.gatewaylog.test_group.StatisticApiCount -p 3 ./jar/flink-svc-1.1.0.jar
- statistic.gatewaylog.test_group.StatisticApiCount 为mainclass,程序执行时的主类,发版时需要弄成参数;3 为并行度,parallelism,发版时需要弄成参数
更多推荐
已为社区贡献1条内容
所有评论(0)