相关文档

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/

前置准备

flink的lib目录下放入两个依赖

bcpkix-jdk15on-1.68.jar

bcprov-jdk15on-1.69.jar

创建用户有RABC权限去执行创建pods

官网原文:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#rbac

k8s中每个命名空间都有一个默认服务帐户。但是,default 服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新 default 服务账号的权限或指定另一个绑定了正确角色的服务账号。

kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

如果不想使用 default 服务帐户,可以使用以下命令创建新的 flink-service-account 服务帐户并设置角色绑定。然后使用 config 选项 -Dkubernetes.service-account=flink-service-account 使 JobManager pod 使用 flink-service-account 服务帐户来创建/删除 TaskManager pod。

kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account

构建镜像

这里使用 Application Mode 模式在生产环境可以为应用提供更好的隔离。flink on k8s 要求代码与 flink 镜像绑定在一起,Application Mode 确保在应用程序终止后正确清理所有 Flink 组件。

编写代码

public class K8sDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (isRunning) {
                    ctx.collect(UUID.randomUUID().toString());
                    TimeUnit.SECONDS.sleep(1);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        source.print();

        env.execute();
    }
}

编写dockerfile

使用 flink 社区提供的基础 docker 镜像

FROM flink:1.13.6-scala_2.11-java8
RUN mkdir -p $FLINK_HOME/jobs
COPY k8s-demo-1.0-SNAPSHOT.jar $FLINK_HOME/jobs/flink-on-k8s.jar

image-20230425145303496

构建镜像

docker build -t flink-on-k8s-demo .

image-20230425145211047

提交任务

export一下k8s的KUBECONFIG

export KUBECONFIG=/etc/rancher/k3s/k3s.yaml

提交任务

bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.service-account=flink-service-account \
-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/czs-projects/flink-on-k8s-demo:v1.0 \
local:///opt/flink/jobs/k8s-demo-1.0-SNAPSHOT.jar

其中,根据RABC权限,启动失败要加上指定参数,否则可不加

-Dkubernetes.cluster-id=my-first-flink-cluster 
-Dkubernetes.service-account=flink-service-account 

报错To use support for EC Keys you must explicitly add this dependency to classpath将jar包加入flink client的目录下

image-20230425150044132

之后执行成功日志

image-20230425150506117

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐