flink on k8s提交任务
flink on k8s提交任务
相关文档
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/
前置准备
flink的lib目录下放入两个依赖
bcpkix-jdk15on-1.68.jar
bcprov-jdk15on-1.69.jar
创建用户有RABC权限去执行创建pods
官网原文:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#rbac
k8s中每个命名空间都有一个默认服务帐户。但是,default 服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新 default 服务账号的权限或指定另一个绑定了正确角色的服务账号。
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
如果不想使用 default 服务帐户,可以使用以下命令创建新的 flink-service-account 服务帐户并设置角色绑定。然后使用 config 选项 -Dkubernetes.service-account=flink-service-account 使 JobManager pod 使用 flink-service-account 服务帐户来创建/删除 TaskManager pod。
kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account
构建镜像
这里使用 Application Mode 模式在生产环境可以为应用提供更好的隔离。flink on k8s 要求代码与 flink 镜像绑定在一起,Application Mode 确保在应用程序终止后正确清理所有 Flink 组件。
编写代码
public class K8sDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
ctx.collect(UUID.randomUUID().toString());
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
source.print();
env.execute();
}
}
编写dockerfile
使用 flink 社区提供的基础 docker 镜像
FROM flink:1.13.6-scala_2.11-java8
RUN mkdir -p $FLINK_HOME/jobs
COPY k8s-demo-1.0-SNAPSHOT.jar $FLINK_HOME/jobs/flink-on-k8s.jar
构建镜像
docker build -t flink-on-k8s-demo .
提交任务
export一下k8s的KUBECONFIG
export KUBECONFIG=/etc/rancher/k3s/k3s.yaml
提交任务
bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.service-account=flink-service-account \
-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/czs-projects/flink-on-k8s-demo:v1.0 \
local:///opt/flink/jobs/k8s-demo-1.0-SNAPSHOT.jar
其中,根据RABC权限,启动失败要加上指定参数,否则可不加
-Dkubernetes.cluster-id=my-first-flink-cluster
-Dkubernetes.service-account=flink-service-account
报错To use support for EC Keys you must explicitly add this dependency to classpath
将jar包加入flink client的目录下
之后执行成功日志
更多推荐
所有评论(0)