通过docker启动

镜像准备

  1. 下载docker镜像
docker pull flink:1.11
  1. 修改镜像tag 为1.11.2-scala_2.11
docker tag flink:1.11 flink:1.11.2-scala_2.11

Session Cluster启动模式

Flink会话集群可用于运行多个作业。每个flink作业公用JobManager和TaskManger

1. 设置docker容器建的通信连接

docker network create flink-network

2. 启动JobManager

docker run \
    --rm \
    --name=jobmanager \
    --network flink-network \
    -p 8081:8081 \
    --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
    flink:1.11.2-scala_2.11 jobmanager

3. 启动TaskManager

docker run \
    --rm \
    --name=taskmanager \
    --network flink-network \
    --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
    flink:1.11.2-scala_2.11 taskmanager

4.启动任务

本地环境,可通过访问flink管理后台,将作业运行主类,打成jar上传,并执行任务
地址:http://localhost:8081/


Job Cluster启动模式

Flink作业集群是运行单个作业的专用集群。每个filnk作业的JobManager和TaskManger隔离

1. 设置docker容器建的通信连接

docker network create flink-network

2. 启动JobManager

需要将Flink 需要执行任务的jar和所有依赖的lib,挂载到容器的/opt/flink/usrlib,这样在Flink 的JobManager在启动时,会根据指定的–job-classname,到target目录,加载Main类和运行环境。
本例中,将包含任务jar(flink.demo.word.WordCountMain)的目录:/tmp/flink/usrlib/artifacts1,挂载到容器的/opt/flink/usrlib/artifacts1目录

docker run \
      --mount type=bind,src=/tmp/flink/usrlib/artifacts1,target=/opt/flink/usrlib/artifacts1 \
      --mount type=bind,src=/tmp/flink/usrlib/artifacts2,target=/opt/flink/usrlib/artifacts2 \
      --rm \
      --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
      --name=jobmanager \
      --network flink-network \
      flink:1.11.2-scala_2.11 standalone-job \
      --job-classname flink.demo.word.WordCountMain \
      [--job-id <job id>] \
      [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
      [job arguments]

在Mac系统,需要注意,如果抛出如下异常,需要在docker控制台配置docker的文件权限是否包括当前挂载目录

docker: Error response from daemon: invalid mount config for type "bind": bind source path does not exist: xxx

在这里插入图片描述

如果启动时出现如下错误,需要将Flink启动类放在挂载目录中

org.apache.flink.util.FlinkException: Could not find the provided job class (com.job.ClassName) in the user lib directory (/opt/flink/usrlib).
	at org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.getJobClassNameOrScanClassPath(ClassPathPackagedProgramRetriever.java:140) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.getPackagedProgram(ClassPathPackagedProgramRetriever.java:123) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgram(StandaloneApplicationClusterEntryPoint.java:110) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
	at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:78) [flink-dist_2.12-1.11.2.jar:1.11.2]

3. 启动TaskManger

docker run \
      --mount type=bind,src=/tmp/flink/usrlib/artifacts1,target=/opt/flink/usrlib/artifacts1 \
      --mount type=bind,src=/tmp/flink/usrlib/artifacts2,target=/opt/flink/usrlib/artifacts2 \
      --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
      --network flink-network \
      flink:1.11.2-scala_2.11 taskmanager

通过k8s启动

1. 准备镜像

1.1. 基于flink:1.11.2-scala_2.11 将Job启动函数以及依赖,生成新的镜像

Dockerfile,Job 启动类(flink.demo.word.WordCountMain)在usrlib/artifacts1目录

FROM flink:1.11.2-scala_2.11
ADD usrlib/artifacts1 /opt/flink/usrlib/artifacts1
ADD usrlib/artifacts2 /opt/flink/usrlib/artifacts2

生成新的镜像

docker build -t flink_with_job_artifacts:1.11.2-scala_2.11 .

试运行启动任务
JobManager

docker run \
      --rm \
      --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
      --name=jobmanager \
      --network flink-network \
      flink_with_job_artifacts:1.11.2-scala_2.11 standalone-job 
      --job-classname flink.demo.word.WordCountMain \
      [--job-id <job id>] \
      [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
      [job arguments]

TaskManager

docker run \
      --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
      --network flink-network \
      flink_with_job_artifacts:1.11.2-scala_2.11 taskmanager

1.2. k8s集群

可通过本地搭建MiniKube,或者使用已有集群
本地可通过docker Desktop直接安装,如果一直处理启动中的状态,可以将docker镜像指定为国内镜像仓库

在这里插入图片描述


2. Per-Job模式启动flink

每提交一个任务,单独启动一个集群运行该任务,运行结束集群被删除,资源也被释放。任务启动较慢,适合于长时间运行的大型任务。需要手动指定TaskManger数量

2.1. 资源描述文件

新建flink-config配置,主要为flink公共配置。每个flink作业不同的配置,可在job配置启动参数单独配置
flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    #jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

将JobManager公开为k8s的服务,以便其他TaskManger可以注册
其中,${JOB}可替换,是区分不同作业的名称

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: ${JOB}-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: ${JOB}-jobmanager

定义JobManager资源配置,其中需要执行镜像文件image,不同的作业通过 ${JOB}来指定。
如果是本地调试,可以指定volumeMounts来加载本地flink作业运行环境,并且在args来指定运行主类

jobmanager-job.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: ${JOB}-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: ${JOB}-jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: flink_with_job_artifacts:1.11.2-scala_2.11
          env:
          args: ["standalone-job", "-Djobmanager.rpc.address=${JOB}-jobmanager","--job-classname", "flink.demo.word.WordCountMain"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
#            - name: job-artifacts-volume
#              mountPath: /opt/flink/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
#        - name: job-artifacts-volume
#          hostPath:
#            path: /tmp/flink/usrlib/artifacts1

定义TaskManager资源配置
注意同一个任务的${JOB}需要相同,否则TaskManager无法注册成功

taskmanager-job-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ${JOB}-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: ${JOB}-taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: ${JOB}-taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink_with_job_artifacts:1.11.2-scala_2.11
        env:
        args: ["taskmanager","-Djobmanager.rpc.address=${JOB}-jobmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
#        - name: job-artifacts-volume
#          mountPath: /opt/flink/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
#      - name: job-artifacts-volume
#        hostPath:
#          path: /tmp/flink/usrlib/artifacts1

2.2. 依次执行以下命令,启动一个flink 作业

执行前需要将${JOB}进行替换,
本地环境:JobManager启动成功后,可通过访问http://localhost:8081,来监控Flink作业状态

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-job.yaml
kubectl create -f taskmanager-job-deployment.yaml

任务退出

如果flink作业是全量任务,执行完后容器会自动停止。但是flink服务和flink TaskManager 需要手动释放资源。后期可通过JobManager任务执行完后的回调事件,通过k8s api释放这些资源。

kubectl delete -f jobmanager-job.yaml
kubectl delete -f taskmanager-job-deployment.yaml
kubectl create -f jobmanager-service.yaml

特点

优点:隔离性比较好,任务之间资源不冲突,一个任务单独使用一个 Flink 集群;相对于 Flink session 集群而且,资源随用随建,任务执行完成后立刻销毁资源,资源利用率会高一些。

缺点:需要提前指定 TaskManager 的数量,如果 TaskManager 指定的少了会导致作业运行失败,指定的多了仍会降低资源利用率;资源是实时创建的,启动长


3. Native Per Job 模式启动 (推荐)

该模式允许用户创建一个单独的映像,其中包含他们的作业和Flink运行环境,它将根据需要自动创建和销毁集群组件。Flink native per-job Flink 1.11 版本中提供。

交互原理

k8s调度流程

特点

native per-job cluster 也是任务提交的时候才创建 Flink 集群,不同的是,无需用户指定 TaskManager 资源的数量,因为同样借助了 Native 的特性,Flink 直接与 Kubernetes 进行通信并按需申请资源。

优点:资源按需申请,适合一次性任务,任务执行后立即释放资源,保证了资源的利用率。

缺点:资源是在任务提交后开始创建,同样意味着对于提交任务后对延时比较敏感的场景,需要一定的权衡;

3.1 启动flink集群

进入flink安装包目录执行,启动成功后,用户可以登录http://localhost:8081来访问flink控制台

./bin/flink run-application -p 8 -t kubernetes-application \
  -Dkubernetes.cluster-id=befb-d76db49flink-demo \
  -Dtaskmanager.memory.process.size=1024m \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dtaskmanager.numberOfTaskSlots=6 \
  -Dkubernetes.container.image=flink_with_job_artifacts:1.11.2-scala_2.11 \
  local:///opt/flink/usrlib/artifacts1/flink-demo-1.0-SNAPSHOT.jar

Note: local参数代表的是flink镜像文件中(flink_with_job_artifacts:1.11.2-scala_2.11),作业执行主类jar在镜像中的路径。打包请看上面的Dockerfile文件描述

3.2 停止Flink集群
./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID>
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐