在上一篇文章Flink On K8s实践3:Application部署模式实践》中讲解和演示了Application部署模式在Kubernetes上的2种Flink作业提交方式,本文继续讲解Flink的另外一种部署模式——Session部署模式,它和Application模式一样在Kubernetes上也有2种Flink作业提交方式,接下来通过示例进行实践演示

一、Session模式简介

在Session模式下,需要先在Kubernetes上启动一个Flink集群,这个集群初始情况下只有JobManager(如果Flink集群开启了HA,JobManager会有多个实例),没有TaskManager,当客户端向该集群提交作业时,Kubernetes再为每个作业动态创建TaskManager,TaskManager Pod的数量由Flink作业所需的slot数量和每个TaskManager所能提供的可用slot数量决定。所有作业共享该集群的JobManager,作业终止时,如果TaskManager的slot没有被其他的Flink作业占用,那么该TaskManager的Pod会被释放,但JobManager会继续运行,等待后续Flink作业的提交。
Session部署模式的主要特点是所有Flink作业共享集群的JobManager,因此Flink作业的资源隔离比较差,作业间会存在相互影响,极端情况下,个别异常作业会拖垮整个Flink集群,导致集群上的作业一起失败。
Session部署模式在Kubernetes上也有2种Flink作业的提交方式。方式一是通过Flink Web UI和Restful接口上传Flink作业Jar包方式,在Flink集群启动后,也就是JobManager运行起来后,可以通过人工访问Flink Web UI页面上传Jar包和提交作业,这种方式操作灵活,便于调试,也可以通过编写程序调用Flink Restful接口上传Jar包和提交作业,这样可以使外部系统例如调度系统灵活控制Flink作业的更新和启停。方式二是通过HTTP下载Flink作业Jar包方式,该方式有利于Flink作业Jar包的统一存放管理,例如可以统一将Flink作业Jar包发布到HTTP文件服务器,例如tomcat或nginx,使用这种方式, Fllink作业需要编写对应的FlinkSessionJob yaml文件, 然后通过Kubernetes的kubectl命令提交,当然,也可以通过编写程序调用Kubernetes API来实现FlinkSessionJob的提交、更新和停止。
需要补充说明的是,与FlinkDeployment一样,FlinkSessionJob是Flink Kubernetes Operator为Flink作业创建的K8s自定义资源(CRD),但它只定义了Flink作业相关的信息。在Session部署模式下,FlinkDeployment负责定义Flink集群相关的信息,例如Flink Web UI的Ingress、Flink集群的全局参数配置、JobManager和TaskManager的资源配额等,而FlinkSessionJob则负责定义Flink作业的JarURI、启动类、启动参数等相关的信息。

二、示例程序

本文复用Flink On K8s实践3:Application部署模式实践》的示例程序,该示例程序的功能和代码可以参阅该文章,或者前往bigdataonk8s获取源码。

三、运行示例程序

1、方式1 通过Flink Web UI上传Jar包

第一步,需要先启动Flink集群,也就是先运行JobManager,为此需要编写Flink集群的yaml文件,yaml文件的内容如下所示。
# Flink Session集群 源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: session-deployment-only
spec:
  image: flink:1.13.6
  flinkVersion: v1_13
  imagePullPolicy: IfNotPresent   # 镜像拉取策略,本地没有则从仓库拉取
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
在yaml文件里有三处地方需要特别注意,首先是开头部分定义了资源的类型为FlinkDeployment,这与Application部署模式的资源类型一样;其次,images使用的是Flink的基础镜像,而不是我们在Application部署模式方式一中自行构建的Flink作业镜像;再者,定义了Ingress,以便可以通过网页浏览器或Restful接口访问JobManager,Ingress的模板主要由三部分组成:域名、Flink集群所在的命名空间和Flink集群的名称,因为此处使用了ingress,所以需要提前在Kubernetes上安装配置好Ingress的组件服务,有关Ingress的安装和配置可前往bigdataonk8s参考相关教程。最后,与Application部署模式的yaml相比,Session部署模式的yaml少了job部分的定义。
第二步,启动Flink集群,使用如下命令提交Flink集群yaml文件到Kubernetes运行,并查看Pod的创建情况。
kubectl apply -f session-deployment-only.yaml
kubectl get all -n flink

 可以看到,Flink集群启动后只运行了JobManager(此处是session-deployment-only-64ccc9566-fhqsl),并没有运行TaskManager。此外,它还创建了两个Service,其中session-deployment-only是用于Flink集群内部进程和JobManager与TaskManager通讯使用的,session-deployment-only-rest是用于访问Flink Web UI和Restful接口使用的。

第三步,访问Filnk Web UI。在前面配置了使用Ingress的方式访问Flink Web UI,且Ingress的域名是flink.k8s.io,为此,在Windows的本地测试中,需要先在C:\Windows\System32\drivers\etc\hosts中添加域名和IP的映射关系,此处的IP是Ingress Pod所在的Kubernetes Node节点的IP,对于Ubuntu或Mac,则在/etc/hosts里添加。
使用如下命令查看Ingress的具体详细。
kubectl get all -n ingress-nginx -owide

上图中有两处地方需要特别注意,一是Ingress Pod运行在k8s05节点上,二是Ingress的服务以NodePort的方式暴露,暴露的端口是30507。

通过ping k8s05得到它的IP是192.168.56.84,为此,在hosts文件中添加以下映射。
192.168.56.84 flink.k8s.io
接下来,就可以通过网页浏览器访问Flink Web UI,访问网址是 http://flink.k8s.io:30502/flink/session-deployment-only/#/overview

第四步,通过Flink Web UI上传Flink作业Jar并提交作业。由于示例Flink程序需要从nc读取字符流数据,因此在运行Flink作业前,需要先运行nc程序,命令是nc -lk 7777,否则Flink作业会运行不起来。

 在nc发送一些测试字符串,使用以下命令查看Flink作业的输出结果。 

kubectl logs session-deployment-only-taskmanager-1-1 -n flink

如果要终止Flink作业运行,直接在Flink Web UI进入作业页面,点击“Cancel Job”即可。限于篇幅,Restful接口方式就不作演示。 

第一步,需要先搭建一个HTTP服务器,本文使用Tomcat。可以直接到Tomcat官网下载安装包,也可以使用如下命令获取。
由于Tomcat默认不允许下载文件,为此,Tomcat下载解压后需要修改它的conf/web.xml,将listings改为true,如下图所示。

 接着,将Flink作业Jar包上传到Tomcat的webapps目录下,本文将Flink作业Jar包上传到webapps/jars/flink目录下。此处需要注意的是,由于运行Flink作业的时候需要通过HTTP下载Jar包,为此需要确保TaskManager Pod能够访问Tomcat,为了便于测试,建议把Tomcat部署到Kubernetes的Node节点上。

一切就绪后就可以启动Tomcat,同时使用网页浏览器访问Tomcat,确保Flink作业Jar可以下载。
第二步,编写Flink集群的yaml文件,yaml文件的内容如下所示。 
# Flink Session集群 源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: session-deployment-only
spec:
  image: flink:1.13.6
  flinkVersion: v1_13
  imagePullPolicy: IfNotPresent   # 镜像拉取策略,本地没有则从仓库拉取
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
可以看出,方式二Flink集群的yaml文件与方式一是一样的,此处不再赘述。
第三步,编写Flink作业yaml文件,如下所示。
# Flink作业  源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  namespace: flink
  name: session-job-only
spec:
  deploymentName: session-deployment-only  # 需要与创建的集群名称一致
  job:
    jarURI: http://192.168.56.85:8080/jars/flink/flink-on-k8s-demo-1.0-SNAPSHOT.jar # 使用http方式下载jar包
    entryClass: com.yale.StreamWordCount
    args:
    parallelism: 1  # 并行度
    upgradeMode: stateless
在yaml文件里有三处地方需要特别注意,首先是开头部分定义了资源的类型为FlinkSessionJob;其次,spec.deploymentName需要与前面创建的Flink集群名字一致,否则Flink作业(或拟启动的TaskManager)会因为找不到JobManager而无法启动;最后,job.jarURI指向了Flink作业Jar包的下载地址,切记TaskManager Pod与Tomcat的网络一定要联通,job.entryClass则为Flink作业的启动类名称。
第四步,运行Flink集群和Flink作业,使用如下命令提交Flink集群和Flink作业yaml文件到Kubernetes运行,并查看Pod的创建情况。
kubectl apply -f session-deployment-only.yaml 
kubectl apply -f session-job-only.yaml
kubectl get all -n flink -owide

在nc发送一些测试字符串,使用以下命令查看Flink作业的输出结果。

kubectl logs session-deployment-only-taskmanager-1-1 -n flink

如果要终止Flink作业和Flink集群运行,使用如下命令即可。
kubectl delete -f session-deployment-only.yaml 
kubectl delete -f session-job-only.yaml

3、两种方式的选择

Session部署模式的2种作业提交方式的最大区别在于Flink作业Jar包的获取方式不同,它们特点在本文开头部分已作介绍,此处不再赘述,这里想表达的是,Session部署模式的2种作业提交方式优缺点并不明显,各有适用的场景,大家按照自己的实际情况选择即可。

四、Application模式和Session模式的选择

关于Application模式和Session模式的选择,大家可以参考如下策略。
Application模式和Session模式两者最大区别在于集群的生命周期和资源管理隔离程度的不同,因此,对于核心、优先级高和需要高保障的这类作业推荐使用Application模式。而那些对保障性要求相对不高,或者出于运维管理便利的考量,例如需要通过外部系统通过调用Flink Restful接口管理作业的提交和启停,那么可以考虑使用Session模式。如果对于作业的归属模棱两可,那么建议直接使用Application模式,也就是说,把Application模式作为Flink On K8s运行模式的默认选项。

五、结语

通过Flink On K8s实践系列文章,相信大家已经了解了Flink Kubernetes Operator的特点和用途,通过动手实践安装Flink Kuberntes Operator,开发StreamWordCount程序,体验了Application和Session  2种部署模式下共4种作业的提交方式,掌握了Flink作业在Kubernetes上的运行方式,直观地看到JobManager和TaskManager Pod的创建和释放过程。
通过这部分的学习,相信大家已基本理解了Flink On K8s,并掌握了Flink On K8s的基本配置和部署。但这只是刚开始,要将Flink On K8s真正应用到生产环境,还需要掌握其他更多的知识,例如Flink Kubernetes Operator的高可用部署,Flink 作业的高可用运行以及与Hadoop和CDH集成等内容,这些内容在bigdataonk8s里有完整的视频和笔记进行详细讲解说明,欢迎大家前往观看,在此感谢大家的支持。
与Flink On K8s实践系列文章配套的课程视频,可以前往以下视频平台查看:
西瓜视频:Flink On K8s实战

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐