Flink 原生支持 Kubernetes #

本页面介绍了如何在Kubernetes 上本地部署 Flink 。

开始 #

入门部分将指导您在 Kubernetes 上设置功能齐全的 Flink 集群。

介绍 #

Kubernetes 是一种流行的容器编排系统,用于自动化计算机应用程序的部署、扩展和管理。Flink 的原生 Kubernetes 集成允许您直接在正在运行的 Kubernetes 集群上部署 Flink。此外,Flink 能够根据所需资源动态分配和取消分配 TaskManager,因为它可以直接与 Kubernetes 对话。

准备 #

入门部分假定运行Kubernetes集群满足以下要求:

  • Kubernetes >= 1.9。
  • KubeConfig,它有权列出、创建、删除 Pod 和服务,可通过~/.kube/config. 您可以通过运行来验证权限kubectl auth can-i <list|create|edit|delete> pods
  • 启用 Kubernetes DNS。
  • default具有RBAC权限的服务帐户可以创建、删除 pod。

如果您在设置 Kubernetes 集群时遇到问题,请查看如何设置 Kubernetes 集群

在 Kubernetes 上启动 Flink 会话 #

一旦您的 Kubernetes 集群运行并kubectl配置为指向它,您就可以通过以下方式在会话模式下启动 Flink 集群

# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

# (2) Submit example job
$ ./bin/flink run \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    ./examples/streaming/TopSpeedWindowing.jar

# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster

使用Minikube 时,需要调用minikube tunnel才能在 Minikube 上暴露 Flink 的 LoadBalancer 服务

恭喜!您已经通过在 Kubernetes 上部署 Flink 成功运行了 Flink 应用程序。

回到顶部

部署模式 #

对于生产使用,我们建议在应用模式下部署 Flink 应用程序,因为这些模式为应用程序提供了更好的隔离。

申请模式 #

所述应用程序模式需要用户代码是捆绑在一起的弗林克图像一起,因为它运行用户代码的main()群集上的方法。Application Mode 确保在应用程序终止后所有 Flink 组件都被正确清理。

Flink 社区提供了一个基础 Docker 镜像,可用于捆绑用户代码:

FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar

在 下创建并发布 Docker 镜像后custom-image-name,您可以使用以下命令启动一个应用程序集群:

$ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-first-application-cluster \
    -Dkubernetes.container.image=custom-image-name \
    local:///opt/flink/usrlib/my-flink-job.jar

注意 local是应用模式中唯一支持的方案。

kubernetes.cluster-id选项指定集群名称并且必须是唯一的。如果不指定这个选项,那么 Flink 将生成一个随机名称。

kubernetes.container.image选项指定用于启动 pod 的图像。

部署应用程序集群后,您可以与它进行交互:

# List running job on the cluster
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

您可以conf/flink-conf.yaml通过将键值对传递-Dkey=valuebin/flink.

Per-Job 集群模式 #

Kubernetes 上的 Flink 不支持 Per-Job 集群模式。

会话模式 #

您已经在本页顶部的入门指南中看到了会话集群的部署。

会话模式可以在两种模式下执行:

  • 分离模式(默认):kubernetes-session.sh在 Kubernetes 上部署 Flink 集群,然后终止。
  • 附加模式-Dexecution.attached=true):kubernetes-session.sh保持活动状态并允许输入命令来控制正在运行的 Flink 集群。例如,stop停止正在运行的 Session 集群。键入help以列出所有支持的命令。

my-first-flink-cluster使用集群 ID 重新连接到正在运行的会话集群,请使用以下命令:

$ ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    -Dexecution.attached=true

您可以conf/flink-conf.yaml通过将键值对传递-Dkey=valuebin/kubernetes-session.sh.

停止正在运行的会话集群 #

为了使用集群 id 停止正在运行的会话集群,my-first-flink-cluster您可以删除 Flink 部署或使用:

$ echo 'stop' | ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    -Dexecution.attached=true

回到顶部

Kubernetes 上的 Flink 参考 #

在 Kubernetes 上配置 Flink #

Kubernetes 特定的配置选项列在配置页面上

Flink 使用Fabric8 Kubernetes 客户端与 Kubernetes APIServer 通信来创建/删除 Kubernetes 资源(例如 Deployment、Pod、ConfigMap、Service 等),以及观察 Pods 和 ConfigMaps。除了上述 Flink 配置选项,Fabric8 Kubernetes 客户端的一些专家选项可以通过系统属性或环境变量进行配置。

例如,用户可以使用以下 Flink 配置选项来设置并发最大请求数,这允许在使用Kubernetes HA 服务时在会话集群中运行更多作业。请注意,每个 Flink 作业都会消耗3并发请求。

containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
env.java.opts.jobmanager: "-Dkubernetes.max.concurrent.requests=200"

访问 Flink 的 Web UI #

Flink 的 Web UI 和 REST 端点可以通过kubernetes.rest-service.exposed.type配置选项以多种方式公开。

  • ClusterIP:在集群内部 IP 上公开服务。该服务只能在集群内访问。如果要访问 JobManager UI 或将作业提交到现有会话,则需要启动本地代理。然后,您可以使用localhost:8081将 Flink 作业提交到会话或查看仪表板。
$ kubectl port-forward service/<ServiceName> 8081
  • NodePort:在静态端口 (the NodePort)处公开每个节点 IP 上的服务。 <NodeIP>:<NodePort>可用于联系 JobManager 服务。 NodeIP也可以替换为 Kubernetes ApiServer 地址。您可以在 kube 配置文件中找到它的地址。
  • LoadBalancer:使用云提供商的负载均衡器在外部公开服务。由于云提供商和 Kubernetes 需要一些时间来准备负载均衡器,您可能会NodePort在客户端日志中获得一个JobManager Web 界面。您可以使用kubectl get services/<cluster-id>-rest获取 EXTERNAL-IP 并手动构建负载均衡器 JobManager Web 界面http://<EXTERNAL-IP>:8081

有关在 Kubernetes 中发布服务的更多信息,请参阅官方文档。

根据您的环境,启动具有LoadBalancerREST 服务公开类型的 Flink 集群可能会使集群可公开访问(通常具有执行任意代码的能力)。

记录 #

Kubernetes 集成向 Pod公开conf/log4j-console.propertiesconf/logback-console.xml作为 ConfigMap。对这些文件的更改将对新启动的集群可见。

访问日志 #

默认情况下,JobManager 和 TaskManager 会同时将日志输出到控制台和/opt/flink/log每个 Pod 中。该STDOUTSTDERR输出只会被重定向到控制台。您可以通过以下方式访问它们

$ kubectl logs <pod-name>

如果 pod 正在运行,您还可以使用kubectl exec -it <pod-name> bash隧道进入并查看日志或调试进程。

访问 TaskManager 的日志 #

Flink 会自动取消分配空闲的 TaskManager,以免浪费资源。这种行为会使访问相应 Pod 的日志变得更加困难。您可以通过配置resourcemanager.taskmanager-timeout来增加空闲 TaskManager 释放之前的时间,以便您有更多时间检查日志文件。

动态更改日志级别 #

如果您已将记录器配置为自动检测配置更改,那么您可以通过更改相应的 ConfigMap(假设集群 ID 为my-first-flink-cluster)来动态调整日志级别:

$ kubectl edit cm flink-config-my-first-flink-cluster

使用插件 #

为了使用插件,你必须将它们复制到 Flink JobManager/TaskManager pod 中的正确位置。您可以使用内置插件而无需安装卷或构建自定义 Docker 映像。例如,使用以下命令为您的 Flink 会话集群启用 S3 插件。

$ ./bin/kubernetes-session.sh
    -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.2.jar \
    -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.13.2.jar

自定义 Docker 镜像 #

如果要使用自定义 Docker 映像,则可以通过配置选项指定它kubernetes.container.image。Flink 社区提供了丰富的Flink Docker 镜像,这是一个很好的起点。查看如何自定义 Flink 的 Docker 镜像,了解如何启用插件、添加依赖项和其他选项。

使用秘密 #

Kubernetes Secrets是一个包含少量敏感数据(例如密码、令牌或密钥)的对象。否则,此类信息可能会放在 pod 规范或图像中。Flink on Kubernetes 可以通过两种方式使用 Secrets:

  • 使用 Secrets 作为 pod 中的文件;
  • 使用 Secrets 作为环境变量;
使用机密作为来自 Pod 的文件 #

以下命令将在启动的 pod 中mysecret的路径下挂载机密/path/to/secret

$ ./bin/kubernetes-session.sh -Dkubernetes.secrets=mysecret:/path/to/secret

秘密的用户名和密码,mysecret然后可以存储在文件中找到/path/to/secret/username/path/to/secret/password。有关更多详细信息,请参阅官方 Kubernetes 文档

使用 Secrets 作为环境变量 #

以下命令将mysecret在启动的 pod中将机密公开为环境变量:

$ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\
    env:SECRET_USERNAME,secret:mysecret,key:username;\
    env:SECRET_PASSWORD,secret:mysecret,key:password

env 变量SECRET_USERNAME包含用户名, env 变量SECRET_PASSWORD包含 secret 的密码mysecret。有关更多详细信息,请参阅官方 Kubernetes 文档

Kubernetes 上的高可用性 #

对于 Kubernetes 上的高可用性,您可以使用现有的高可用性服务

手动资源清理 #

Flink 使用Kubernetes OwnerReference来清理所有集群组件。所有弗林克创建的资源,其中包括ConfigMapService,和Pod,有OwnerReference设定为deployment/<cluster-id>。当部署被删除时,所有相关资源将被自动删除。

$ kubectl delete deployment/<cluster-id>

支持的 Kubernetes 版本 #

目前>= 1.9支持所有 Kubernetes 版本。

命名空间 #

Kubernetes 中的命名空间通过资源配额在多个用户之间划分集群资源。Kubernetes 上的 Flink 可以使用命名空间来启动 Flink 集群。命名空间可以通过kubernetes.namespace进行配置。

RBAC #

基于角色的访问控制 ( RBAC ) 是一种基于企业内单个用户的角色来调节对计算网络资源的访问的方法。用户可以配置 JobManager 使用的 RBAC 角色和服务帐户,以访问 Kubernetes 集群内的 Kubernetes API 服务器。

每个命名空间都有一个默认的服务帐户。但是,default服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新default服务帐户的权限或指定另一个具有正确角色绑定的服务帐户。

$ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

如果您不想使用default服务帐号,请使用以下命令创建新的flink-service-account服务帐号并设置角色绑定。然后使用 config 选项-Dkubernetes.service-account=flink-service-account让 JobManager pod 使用flink-service-account服务帐户来创建/删除 TaskManager pods 和领导者 ConfigMaps。此外,这将允许 TaskManager 观察领导者 ConfigMaps 以检索 JobManager 和 ResourceManager 的地址。

$ kubectl create serviceaccount flink-service-account
$ kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account

有关RBAC 授权的更多信息,请参阅 Kubernetes 官方文档。

Pod 模板 #

Flink 允许用户通过模板文件定义 JobManager 和 TaskManager pod。这允许直接支持 Flink Kubernetes 配置选项不支持的高级功能。使用kubernetes.pod-template-file 指定包含荚定义的本地文件。它将用于初始化 JobManager 和 TaskManager。应使用 name 定义主容器flink-main-container。有关更多信息,请参阅pod 模板示例

Flink 覆盖的字段 #

pod 模板的某些字段会被 Flink 覆盖。解析有效字段值的机制可以分为以下几类:

  • **Flink 定义:**用户不能配置。

  • **用户定义:**用户可以自由指定该值。Flink 框架不会设置任何附加值,有效值来自配置选项和模板。

    优先顺序:首先是一个显式的配置选项值,然后是 pod 模板中的值,最后是一个配置选项的默认值(如果没有指定)。

  • 与 Flink 合并: Flink 会将设置的值与用户定义的值合并(参见“由用户定义”的优先顺序)。在同名字段的情况下,Flink 值优先。

有关将被覆盖的 pod 字段的完整列表,请参阅下表。pod 模板中定义的所有未在表中列出的字段都不会受到影响。

Pod 元数据

钥匙类别相关配置选项描述
姓名由 Flink 定义JobManager pod 名称将被kubernetes.cluster-id定义的部署覆盖。TaskManager pod 名称将被<clusterID>-<attempt>-<index>Flink ResourceManager 生成的模式覆盖。
命名空间由用户定义kubernetes.namespaceJobManager 部署和 TaskManager Pod 都将在用户指定的命名空间中创建。
所有者参考由 Flink 定义JobManager 和 TaskManager pod 的所有者引用将始终设置为 JobManager 部署。请使用kubernetes.jobmanager.owner.reference控制何时删除部署。
注释由用户定义kubernetes.jobmanager.annotations kubernetes.taskmanager.annotationsFlink 将添加由 Flink 配置选项指定的附加注释。
标签与 Flink 合并kubernetes.jobmanager.labels kubernetes.taskmanager.labelsFlink 会为用户定义的值添加一些内部标签。

Pod 规格

钥匙类别相关配置选项描述
imagePullSecrets由用户定义kubernetes.container.image.pull-secretsFlink 将添加由 Flink 配置选项指定的额外 pull secret。
节点选择器由用户定义kubernetes.jobmanager.node-selector kubernetes.taskmanager.node-selectorFlink 将添加由 Flink 配置选项指定的额外节点选择器。
容忍由用户定义kubernetes.jobmanager.tolerations kubernetes.taskmanager.tolerationsFlink 将添加由 Flink 配置选项指定的额外容忍度。
重启策略由 Flink 定义JobManager pod 为“always”,TaskManager pod 为“never”。 JobManager pod 将始终通过部署重新启动。并且不应重新启动 TaskManager pod。
服务帐号由用户定义kubernetes.service-accountJobManager 和 TaskManager Pod 将使用用户定义的服务帐户创建。
与 Flink 合并Flink 会添加一些内部 ConfigMap 卷(例如 flink-config-volume、hadoop-config-volume),这些是传送 Flink 配置和 hadoop 配置所必需的。

主要容器规格

钥匙类别相关配置选项描述
环境与 Flink 合并containerized.master.env.{ENV_NAME} containerized.taskmanager.env.{ENV_NAME}Flink 会在用户定义的值中添加一些内部环境变量。
图片由用户定义kubernetes.container.image容器映像将根据用户定义值的定义优先顺序进行解析。
图像拉取策略由用户定义kubernetes.container.image.pull-policy容器映像拉取策略将根据用户定义值的定义优先顺序进行解析。
姓名由 Flink 定义Flink 将使用“flink-main-container”覆盖容器名称。
资源由用户定义内存: jobmanager.memory.process.size taskmanager.memory.process.size CPU: kubernetes.jobmanager.cpu kubernetes.taskmanager.cpu内存和 CPU 资源(包括请求和限制)将被 Flink 配置选项覆盖。所有其他资源(例如临时存储)将被保留。
容器端口与 Flink 合并Flink 会添加一些内部容器端口(例如 rest、jobmanager-rpc、blob、taskmanager-rpc)。
卷挂载与 Flink 合并Flink 会添加一些内部卷挂载(例如 flink-config-volume、hadoop-config-volume),这是传送 Flink 配置和 hadoop 配置所必需的。
Pod 模板示例 #
pod-template.yaml
apiVersion: v1
kind: Pod
metadata:
  name: jobmanager-pod-template
spec:
  initContainers:
    - name: artifacts-fetcher
      image: artifacts-fetcher:latest
      # Use wget or other tools to get user jars from remote storage
      command: [ 'wget', 'https://path/of/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
      volumeMounts:
        - mountPath: /flink-artifact
          name: flink-artifact
  containers:
    # Do not change the main container name
    - name: flink-main-container
      resources:
        requests:
          ephemeral-storage: 2048Mi
        limits:
          ephemeral-storage: 2048Mi
      volumeMounts:
        - mountPath: /opt/flink/volumes/hostpath
          name: flink-volume-hostpath
        - mountPath: /opt/flink/artifacts
          name: flink-artifact
        - mountPath: /opt/flink/log
          name: flink-logs
      # Use sidecar container to push logs to remote storage or do some other debugging things
    - name: sidecar-log-collector
      image: sidecar-log-collector:latest
      command: [ 'command-to-upload', '/remote/path/of/flink-logs/' ]
      volumeMounts:
        - mountPath: /flink-logs
          name: flink-logs
  volumes:
    - name: flink-volume-hostpath
      hostPath:
        path: /tmp
        type: Directory
    - name: flink-artifact
      emptyDir: { }
    - name: flink-logs
      emptyDir: { }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐