Flink On K8s实践3:Application部署模式实践
Apache Flink是当下主流了流式计算引擎,在企业的实时数仓、实时BI、数据湖、智能推荐和风险风控等场景中有广泛的应用。Apache Flink支持多种Resource Providers,也就是可以在多种资源平台上运行,本系列文章以当前热门的容器平台Kubernetes作为Flink的Resource Proivder,全面讲解如何在Kubernetes平台上以Flink Kubernet
一、Application模式简介
二、示例程序
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bigdataonk8s</groupId>
<artifactId>flink-on-k8s-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入Flink相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<!-- 打包配置-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
StreamWordCount.java
package com.bigdataonk8s;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;
import java.util.Arrays;
/**
* 源码请到 https://bigdataonk8s.com 获取
*/
public class StreamWordCount {
private static Logger logger = Logger.getLogger(StreamWordCount.class);
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文本流 在k8s01行运行 nc -lk 7777
DataStreamSource<String> lineDSS = env.socketTextStream("192.168.56.80", 7777);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
}
}
Flink程序开发完成后,需要打成Jar包并上传到Kubernetes集群中。
三、运行示例程序
1、方式1 Jar包打进Flink镜像
FROM flink:1.13.6
WORKDIR /opt/flink
COPY flink-on-k8s-demo-1.0-SNAPSHOT.jar /opt/flink/flink-on-k8s-demo-1.0-SNAPSHOT.jar
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["help"]
docker build -f Dockerfile -t flink-wc:1.13.6 .
docker tag image_id registry_address/flink-wc:1.13.6
docker push registry_address/flink-wc:1.13.6
# Flink Session集群 https://bigdataonk8s.com
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: application-deployment
spec:
image: flink-wc:1.13.6 # 使用自行构建的镜像
flinkVersion: v1_13
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
taskManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
job:
jarURI: local:///opt/flink/flink-on-k8s-demo-1.0-SNAPSHOT.jar
entryClass: com.bigdataonk8s.StreamWordCount
args:
parallelism: 1
upgradeMode: stateless
在yaml文件里有三处地方需要特别注意,首先是开头部分定义了资源的类型为FlinkDeployment,这是Flink Kubernetes Operator为Flink集群所定义的自定义资源类型(CRD);其次,images引用了我们自行构建的Flink作业镜像;最后,job.jarURI指向了我们Flink作业Jar包在镜像内的路径,job.entryClass则为Flink作业的启动类名称。
第三步,运行Flink作业。由于示例Flink程序需要从nc读取字符流数据,因此在运行Flink作业前,需要先运行nc程序,命令是nc -lk 7777,否则Flink作业会运行不起来。
使用以下命令提交Flink作业的yaml文件到Kubernetes运行,并查看Pod的创建情况。
kubectl apply -f application-deployment.yaml
kubectl get all -n flink
在nc发送一些测试字符串,使用kubectl logs taskmanager-pod-name -n flink查看Flink作业的输出结果。
2、方式2 Jar包通过PV挂载到镜像
# Flink 作业jar 持久化存储pvc https://bigdataonk8s.com
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-jar-pvc # jar pvc名称
namespace: flink
spec:
storageClassName: nfs-storage #sc名称
accessModes:
- ReadOnlyMany #采用ReadOnlyMany的访问模式
resources:
requests:
storage: 1Gi #存储容量,根据实际需要更改
kubectl apply -f flink-jar-pvc.yaml
kubectl get pvc -n flink
kubectl get pv -n flin
第二步,编写Flink作业的yaml文件(application-deployment-with-pv.yaml),yaml文件的内容如下图所示。
# Flink Application集群 源码请到 https://bigdataonk8s.com 获取
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink
name: application-deployment
spec:
image: flink:1.13.6
flinkVersion: v1_13
imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取
ingress: # ingress配置,用于访问flink web页面
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
taskManager:
replicas: 1
resource:
memory: "1024m"
cpu: 1
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: flink-jar # 挂载nfs上的jar
mountPath: /opt/flink/jar
volumes:
- name: flink-jar
persistentVolumeClaim:
claimName: flink-jar-pvc
job:
jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT.jar
entryClass: com.yale.StreamWordCount
args:
parallelism: 1
upgradeMode: stateless
方式二的Flink作业yaml文件与方式一的yaml文件大体类似,但有三处地方需要特别注意,首先是images改为引用Flink的基础镜像,而不是我们自行构建的Flink作业镜像;其次新增了podTemplate的内容,目的是将我们创建的PV挂载到Flink容器里,挂载的路径是/opt/flink/jar;最后,job.jarURI指向了我们Flink作业Jar包的挂载路径,job.entryClass则为Flink作业的启动类名称。
kubectl apply -f application-deployment-with-pv.yaml
kubectl get all -n flink
在nc发送一些测试字符串,使用kubectl logs taskmanager-pod-name -n flink查看Flink作业的输出结果。
3、两种方式的选择
更多推荐
所有评论(0)