Flink On K8s实践4:Session部署模式实践
Apache Flink是当下主流了流式计算引擎,在企业的实时数仓、实时BI、数据湖、智能推荐和风险风控等场景中有广泛的应用。Apache Flink支持多种Resource Providers,也就是可以在多种资源平台上运行,本系列文章以当前热门的容器平台Kubernetes作为Flink的Resource Proivder,全面讲解如何在Kubernetes平台上以Flink Kubernet
一、Session模式简介
二、示例程序
三、运行示例程序
1、方式1 通过Flink Web UI上传Jar包
# Flink Session集群 源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: session-deployment-only
spec:
image: flink:1.13.6
flinkVersion: v1_13
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
taskManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
kubectl apply -f session-deployment-only.yaml
kubectl get all -n flink
可以看到,Flink集群启动后只运行了JobManager(此处是session-deployment-only-64ccc9566-fhqsl),并没有运行TaskManager。此外,它还创建了两个Service,其中session-deployment-only是用于Flink集群内部进程和JobManager与TaskManager通讯使用的,session-deployment-only-rest是用于访问Flink Web UI和Restful接口使用的。
kubectl get all -n ingress-nginx -owide
上图中有两处地方需要特别注意,一是Ingress Pod运行在k8s05节点上,二是Ingress的服务以NodePort的方式暴露,暴露的端口是30507。
第四步,通过Flink Web UI上传Flink作业Jar并提交作业。由于示例Flink程序需要从nc读取字符流数据,因此在运行Flink作业前,需要先运行nc程序,命令是nc -lk 7777,否则Flink作业会运行不起来。
在nc发送一些测试字符串,使用以下命令查看Flink作业的输出结果。
kubectl logs session-deployment-only-taskmanager-1-1 -n flink
如果要终止Flink作业运行,直接在Flink Web UI进入作业页面,点击“Cancel Job”即可。限于篇幅,Restful接口方式就不作演示。
接着,将Flink作业Jar包上传到Tomcat的webapps目录下,本文将Flink作业Jar包上传到webapps/jars/flink目录下。此处需要注意的是,由于运行Flink作业的时候需要通过HTTP下载Jar包,为此需要确保TaskManager Pod能够访问Tomcat,为了便于测试,建议把Tomcat部署到Kubernetes的Node节点上。
# Flink Session集群 源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: session-deployment-only
spec:
image: flink:1.13.6
flinkVersion: v1_13
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
taskManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
# Flink作业 源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
namespace: flink
name: session-job-only
spec:
deploymentName: session-deployment-only # 需要与创建的集群名称一致
job:
jarURI: http://192.168.56.85:8080/jars/flink/flink-on-k8s-demo-1.0-SNAPSHOT.jar # 使用http方式下载jar包
entryClass: com.yale.StreamWordCount
args:
parallelism: 1 # 并行度
upgradeMode: stateless
kubectl apply -f session-deployment-only.yaml
kubectl apply -f session-job-only.yaml
kubectl get all -n flink -owide
在nc发送一些测试字符串,使用以下命令查看Flink作业的输出结果。
kubectl logs session-deployment-only-taskmanager-1-1 -n flink
kubectl delete -f session-deployment-only.yaml
kubectl delete -f session-job-only.yaml
3、两种方式的选择
四、Application模式和Session模式的选择
五、结语
更多推荐
所有评论(0)