在第一篇文章《Flink On K8s实践1:Flink Kubernetes Operator介绍》中有提到Flink On Kubernetes支持Apllication和Session这两种部署模式,本文继续讲解Apllication部署模式在Kubernetes上的2种Flink作业提交方式,并通过示例进行实践演示。

一、Application模式简介

在Application部署模式下,Kubernetes会为每个提交的Flink作业单独创建一个Flink集群,这个Flink集群由JobManager Pod和TaskManager Pod组成,其中拟启动的TaskManager Pod的数量由Flink作业所需的slot数量和每个TaskManager所能提供的可用slot数量决定,例如一个Flink作业需要10个slot,每个TaskManager提供4个slot,那么Kubernetes就会启动3个TaskManager Pod。在Flink作业运行完成的时候,或者终止Flink作业运行时候,Kubernetes会终止集群并释放全部的Pod。
Application部署模式的主要特点是它在不同Flink作业之间提供了资源隔离和负载平衡保证,使得作业间彼此独立,互不影响,此外,它还可以为不同的作业配置不同的资源,以此实现作业的精细化管理。
Application部署模式在Kubernetes上有2种Flink作业的提交方式,方式一是将Flink作业的Jar包打进Flink镜像里,在Flink作业的FlinkDeployment yaml文件里使用该镜像,当Flink作业运行(或称Flink集群创建)的时候,内部就会包含该作业的Jar包。这种方式的显著特点就是每个Flink作业都要创建自己专属的镜像,如果Flink作业的数量较多,那么也会导致镜像过多,从而占用大量镜像仓库空间。方式二是将作业Jar包放到外部存储上,例如NFS,然后通过Kubernetes的PVC+PV方式挂载到Pod里,好处就是只需维护少数几个Base Flink镜像即可,极大节省镜像仓库空间。

二、示例程序

为了进行接下来的演示,需要开发一个功能为WordCount的Flink程序,该程序从socket读取字符流,然后分词并统计单词出现次数,最后将统计结果打印到控制台。完整的代码工程请到bigdataonk8s获取。
maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bigdataonk8s</groupId>
    <artifactId>flink-on-k8s-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.6</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <!-- 引入Flink相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

    </dependencies>

    <!-- 打包配置-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

StreamWordCount.java

package com.bigdataonk8s;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;
import java.util.Arrays;

/**
 * 源码请到 https://bigdataonk8s.com 获取
 */
public class StreamWordCount {
    private static Logger logger = Logger.getLogger(StreamWordCount.class);
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文本流  在k8s01行运行 nc -lk 7777
        DataStreamSource<String> lineDSS = env.socketTextStream("192.168.56.80", 7777);
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
                .keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
                .sum(1);
        // 6. 打印
        result.print();
        // 7. 执行
        env.execute();
    }
}

Flink程序开发完成后,需要打成Jar包并上传到Kubernetes集群中。

三、运行示例程序

1、方式1 Jar包打进Flink镜像

第一步,需要编写Dockerfile,并构建Flink作业镜像,Dockerfile的内容如下所示。
FROM flink:1.13.6
WORKDIR /opt/flink
COPY flink-on-k8s-demo-1.0-SNAPSHOT.jar /opt/flink/flink-on-k8s-demo-1.0-SNAPSHOT.jar
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["help"]
此处假定Flink程序的Jar包名字为flink-on-k8s-demo-1.0-SNAPSHOT.jar,并且它与Dockerfile在同一个目录下。Dockerfile有两处地方需要特别注意,一是FROM指令引用了1.13.6版本的Flink基础镜像(Base Image),二是COPY指令会把同目录的flink-on-k8s-demo-1.0-SNAPSHOT.jar拷贝进Flink镜像的/opt/flink目录下。
Dockerfile编写好后,可以使用如下命令构建,此命令将会构建出一个名字为flink-wc,版本为1.13.6的新镜像。
docker build -f Dockerfile -t flink-wc:1.13.6 .
flink-wc镜像构建好后,需要分发到各个Kubernetes Node节点,更推荐的做法是把它发布到自有的镜像仓库,以便后续提交Flink作业yaml文件时能下载该镜像。发布到镜像仓库的命令如下。
docker tag image_id registry_address/flink-wc:1.13.6
docker push registry_address/flink-wc:1.13.6
第二步,编写Flink作业的yaml文件,yaml文件的内容如所示。
# Flink Session集群 https://bigdataonk8s.com
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: application-deployment
spec:
  image: flink-wc:1.13.6  # 使用自行构建的镜像
  flinkVersion: v1_13
  imagePullPolicy: IfNotPresent   # 镜像拉取策略,本地没有则从仓库拉取
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/flink-on-k8s-demo-1.0-SNAPSHOT.jar
    entryClass: com.bigdataonk8s.StreamWordCount
    args:
    parallelism: 1
    upgradeMode: stateless

在yaml文件里有三处地方需要特别注意,首先是开头部分定义了资源的类型为FlinkDeployment,这是Flink Kubernetes Operator为Flink集群所定义的自定义资源类型(CRD);其次,images引用了我们自行构建的Flink作业镜像;最后,job.jarURI指向了我们Flink作业Jar包在镜像内的路径,job.entryClass则为Flink作业的启动类名称。

第三步,运行Flink作业。由于示例Flink程序需要从nc读取字符流数据,因此在运行Flink作业前,需要先运行nc程序,命令是nc -lk 7777,否则Flink作业会运行不起来。

 使用以下命令提交Flink作业的yaml文件到Kubernetes运行,并查看Pod的创建情况。

kubectl apply -f application-deployment.yaml

kubectl get all -n flink

在nc发送一些测试字符串,使用kubectl logs taskmanager-pod-name -n flink查看Flink作业的输出结果。

2、方式2 Jar包通过PV挂载到镜像

第一步,需要先创建Kubernetes PVC,申请分配PV存储,把Flink作业Jar包上传放置到该PV对应的实际储存的路径下。本文采用动态PVC申请PV的方式,存储设施是NFS,因此需要提前在Kubernetes上安装配置好NFS的组件服务,并创建好StorageClass资源,具体操作可前往bigdataonk8s参考相关教程。
PVC的yaml文件内容如下所示。
#  Flink 作业jar 持久化存储pvc https://bigdataonk8s.com
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-jar-pvc  # jar pvc名称
  namespace: flink
spec:
  storageClassName: nfs-storage   #sc名称
  accessModes:
    - ReadOnlyMany   #采用ReadOnlyMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改
   
在PVC yaml文件里有2处地方需要特别注意,一是由于PVC是Kubernetes命名空间层面的资源,所以namespace需要指定为Flink Kubernetes Operator和Flink作业所在的命名空间,在本文示例中命名空间是flink;二是storageClassName需要指定为Kubernetes上已有的StorageClass名字,可以使用kubectl get sc查看。
使用以下命令创建和查看PVC和PV,并将Flink作业Jar包上传到PV所对应的实际路径下。
kubectl apply -f flink-jar-pvc.yaml
kubectl get pvc -n flink
kubectl get pv -n flin

 

第二步,编写Flink作业的yaml文件(application-deployment-with-pv.yaml),yaml文件的内容如下图所示。

# Flink Application集群 源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: application-deployment
spec:
  image: flink:1.13.6
  flinkVersion: v1_13
  imagePullPolicy: IfNotPresent   # 镜像拉取策略,本地没有则从仓库拉取
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: "1024m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - name: flink-jar  # 挂载nfs上的jar
              mountPath: /opt/flink/jar
      volumes:
        - name: flink-jar
          persistentVolumeClaim:
            claimName: flink-jar-pvc
  job:
    jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT.jar
    entryClass: com.yale.StreamWordCount
    args:
    parallelism: 1
    upgradeMode: stateless

 方式二的Flink作业yaml文件与方式一的yaml文件大体类似,但有三处地方需要特别注意,首先是images改为引用Flink的基础镜像,而不是我们自行构建的Flink作业镜像;其次新增了podTemplate的内容,目的是将我们创建的PV挂载到Flink容器里,挂载的路径是/opt/flink/jar;最后,job.jarURI指向了我们Flink作业Jar包的挂载路径,job.entryClass则为Flink作业的启动类名称。

第三步,运行Flink作业。与方式一一样,由于示例Flink程序需要从nc读取字符流数据,因此在运行Flink作业前,需要先运行nc程序,命令是nc -lk 7777,否则Flink作业会运行不起来。
使用以下命令提交Flink作业的yaml文件到Kubernetes运行,并查看Pod的创建情况。
kubectl apply -f application-deployment-with-pv.yaml
kubectl get all -n flink

在nc发送一些测试字符串,使用kubectl logs taskmanager-pod-name -n flink查看Flink作业的输出结果。

 3、两种方式的选择

关于Application部署模式2种提交方式的选择, 大家可以参考如下策略。
Application模式的2种作业提交方式的最大区别在于是否需要将作业Jar包打入Flink镜像。
对于方式一,把Jar包打进镜像,这样会使得每个Flink作业都要打一个镜像,容易导致镜像数量过多,既不便于管理,同时也会占用大量镜像仓库空间,通常1个Flink镜像的大小约为600M,当镜像数量很多的时候,需要占有的空间就很大。所以在实际应用中,不太推荐使用这种方式。
对于方式二,Jar包通过PV挂载,这样的好处就是Flink作业Jar包不用打到镜像,既可以省去镜像构建的工作,同时只需维护少数几个Flink基础镜像,这样可以极大地节省镜像仓库空间。综合考量,在实际应用中,更推荐使用方式二。
在下一篇文章中,会继续讲解和演示Flink Kubernetes Operator Session模式的2种Flink作业提交方式。更多有关Flink Kubernetes Operator的视频和学习资料,可以到bigdataonk8s观看和获取。
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐