一、环境准备

k8s平台:kubesphere

image-20220804180605541

k8s中每个命名空间都有一个默认服务帐户。但是,default 服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新 default 服务账号的权限或指定另一个绑定了正确角色的服务账号。

kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

如果不想使用 default 服务帐户,可以使用以下命令创建新的 flink-service-account 服务帐户并设置角色绑定。然后使用 config 选项 -Dkubernetes.service-account=flink-service-account 使 JobManager pod 使用 flink-service-account 服务帐户来创建/删除 TaskManager pod。

kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account

若权限不足 flink 在任务启动时将无法创建 akka

其他环境:

java(1.8)、docker(20.10.17)、flink(1.13.6)、mysql(8.0.23)

二、开始部署

2.1 编写 flink 任务

这里创建一个简单的 flink 任务,每秒生成一个随机数写入 mysql 中

package test;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author wjun
 * @date 2022/8/4 15:09
 * @email wjunjobs@outlook.com
 * @describe
 */
public class K8sDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(new SourceFunction<String>() {
      private volatile boolean isRunning = true;

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
          ctx.collect(UUID.randomUUID().toString());
          TimeUnit.SECONDS.sleep(1);
        }
      }

      @Override
      public void cancel() {
        isRunning = false;
      }
    }).addSink(JdbcSink.sink(
      "insert into dev.k8s values(?)",
      (ps, t) -> {
        ps.setString(1, t);
      },
      JdbcExecutionOptions.builder().withBatchSize(1).build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:mysql://***")
      .withUsername("***")
      .withPassword("***")
      .build()
    ));


    env.execute();
  }
}

最终将任务打成 jar 包

2.2 构建镜像

这里使用 Application Mode 模式在生产环境可以为应用提供更好的隔离。on k8s 要求代码与 flink 镜像绑定在一起,Application Mode 确保在应用程序终止后正确清理所有 Flink 组件。

使用 flink 社区提供的基础 docker 镜像

FROM flink:1.13.6
RUN mkdir -p $FLINK_HOME/jobs
COPY flink-on-k8s.jar $FLINK_HOME/jobs/flink-on-k8s.jar

最终 dockerfile 的工作空间如下:

image-20220804174533420

构建 docker 镜像

docker build -t super/flink-on-k8s-demo .

image-20220804174634513

2.3 提交任务

使用下面命令提交任务

flink run-application \
--class test.K8sDemo \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=super/flink-on-k8s-demo \
local:///opt/flink/jobs/flink-on-k8s.jar
  • –class: 指定任务的主类名
  • – target: 指定任务运行模式为 native k8s application
  • -Dkubernetes.cluster-id: 指定集群名称并且必须是唯一的,若不指定 flink 将随机生成
  • -Dkubernetes.container.image: 用于启动 pod 的镜像
  • local: 指定镜像的任务 jar

image-20220804175056425

kubeshpere 平台中查看任务情况

image-20220804175158040

mysql 中观察数据是否写入

image-20220804175252557

2.4 任务取消

kubesphere 平台中点击应用负载-服务,根据提交任务时候指定的 cluster-id 找到对应的 rest 服务

image-20220804175447672

找到 NodePort 端口

image-20220804175625323

使用 节点ip:NodePort 即可进入熟悉的 flink web ui 点击 cancel 即可,同时 kubeshpere 会自动删除与之相关的组件。

这样 flink on k8s 初步的任务提交、运行、取消就搞定啦

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐