flink on k8s
基于 kubeshpere 的 flink on k8s 实践
一、环境准备
k8s平台:kubesphere
k8s中每个命名空间都有一个默认服务帐户。但是,default 服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新 default 服务账号的权限或指定另一个绑定了正确角色的服务账号。
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
如果不想使用 default 服务帐户,可以使用以下命令创建新的 flink-service-account 服务帐户并设置角色绑定。然后使用 config 选项 -Dkubernetes.service-account=flink-service-account 使 JobManager pod 使用 flink-service-account 服务帐户来创建/删除 TaskManager pod。
kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account
若权限不足 flink 在任务启动时将无法创建 akka
其他环境:
java(1.8)、docker(20.10.17)、flink(1.13.6)、mysql(8.0.23)
二、开始部署
2.1 编写 flink 任务
这里创建一个简单的 flink 任务,每秒生成一个随机数写入 mysql 中
package test;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author wjun
* @date 2022/8/4 15:09
* @email wjunjobs@outlook.com
* @describe
*/
public class K8sDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<String>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
ctx.collect(UUID.randomUUID().toString());
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).addSink(JdbcSink.sink(
"insert into dev.k8s values(?)",
(ps, t) -> {
ps.setString(1, t);
},
JdbcExecutionOptions.builder().withBatchSize(1).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://***")
.withUsername("***")
.withPassword("***")
.build()
));
env.execute();
}
}
最终将任务打成 jar 包
2.2 构建镜像
这里使用 Application Mode 模式在生产环境可以为应用提供更好的隔离。on k8s 要求代码与 flink 镜像绑定在一起,Application Mode 确保在应用程序终止后正确清理所有 Flink 组件。
使用 flink 社区提供的基础 docker 镜像
FROM flink:1.13.6
RUN mkdir -p $FLINK_HOME/jobs
COPY flink-on-k8s.jar $FLINK_HOME/jobs/flink-on-k8s.jar
最终 dockerfile 的工作空间如下:
构建 docker 镜像
docker build -t super/flink-on-k8s-demo .
2.3 提交任务
使用下面命令提交任务
flink run-application \
--class test.K8sDemo \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=super/flink-on-k8s-demo \
local:///opt/flink/jobs/flink-on-k8s.jar
- –class: 指定任务的主类名
- – target: 指定任务运行模式为 native k8s application
- -Dkubernetes.cluster-id: 指定集群名称并且必须是唯一的,若不指定 flink 将随机生成
- -Dkubernetes.container.image: 用于启动 pod 的镜像
- local: 指定镜像的任务 jar
kubeshpere 平台中查看任务情况
mysql 中观察数据是否写入
2.4 任务取消
kubesphere 平台中点击应用负载-服务,根据提交任务时候指定的 cluster-id 找到对应的 rest 服务
找到 NodePort 端口
使用 节点ip:NodePort 即可进入熟悉的 flink web ui 点击 cancel 即可,同时 kubeshpere 会自动删除与之相关的组件。
这样 flink on k8s 初步的任务提交、运行、取消就搞定啦
更多推荐
所有评论(0)