Flink 部署笔记:

1. Flink ConfigMap

  • 编辑flink-configuration-configmap.yaml:
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: flink-config
      labels:
        app: flink
    data:
      flink-conf.yaml: |+
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 1
        blob.server.port: 6124
        jobmanager.rpc.port: 6123
        taskmanager.rpc.port: 6122
        jobmanager.heap.size: 1024m
        taskmanager.memory.process.size: 1024m
      log4j.properties: |+
        log4j.rootLogger=INFO, file
        log4j.logger.akka=INFO
        log4j.logger.org.apache.kafka=INFO
        log4j.logger.org.apache.hadoop=INFO
        log4j.logger.org.apache.zookeeper=INFO
        log4j.appender.file=org.apache.log4j.FileAppender
        log4j.appender.file.file=${log.file}
        log4j.appender.file.layout=org.apache.log4j.PatternLayout
        log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
        log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
    
  • 创建Flink Conf ConfigMap:
    kubectl create -f flink-configuration-configmap.yaml 
    
  • 查看已创建的configmap:
    kubectl get configmap
    

2. JobManager Service

  • 编辑jobmanager-service.yaml:
    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager
    spec:
      type: ClusterIP
      ports:
      - name: rpc
        port: 6123
      - name: blob
        port: 6124
      - name: ui
        port: 8081
      selector:
        app: flink
        component: jobmanager
    
  • 创建Flink jobmanager-service:
    kubectl create -f jobmanager-service.yaml
    

3. JobManager Deployment

  • 编辑jobmanager-deployment.yaml:
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-jobmanager
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink
          component: jobmanager
      template:
        metadata:
          labels:
            app: flink
            component: jobmanager
        spec:
          containers:
          - name: jobmanager
            image: flink:1.17.1-scala_2.12-java8
            workingDir: /opt/flink
            command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
              while :;
              do
                if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
                  then tail -f -n +1 log/*jobmanager*.log;
                fi;
              done"]
            ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob
            - containerPort: 8081
              name: ui
            livenessProbe:
              tcpSocket:
                port: 6123
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j.properties
                path: log4j.properties
    
  • 创建Flink jobmanager-deployment:
    kubectl create -f jobmanager-deployment.yaml
    

4. TaskManager Deployment

  • 编辑taskmanager-deployment.yaml:
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-taskmanager
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: flink
          component: taskmanager
      template:
        metadata:
          labels:
            app: flink
            component: taskmanager
        spec:
          containers:
          - name: taskmanager
            image: flink:1.17.1-scala_2.12-java8
            workingDir: /opt/flink
            command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
              while :;
              do
                if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
                  then tail -f -n +1 log/*taskmanager*.log;
                fi;
              done"]
            ports:
            - containerPort: 6122
              name: rpc
            livenessProbe:
              tcpSocket:
                port: 6122
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf/
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j.properties
                path: log4j.properties
    
  • 创建Flink taskmanager-deployment:
    kubectl create -f taskmanager-deployment.yaml
    

5. 访问Flink UI

5.1 方法一

  • 在命令行执行kubectl proxy
  • 然后在浏览器访问 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

5.2 方法二

  • 编辑jobmanager-rest-service.yaml:
    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager-rest
    spec:
      type: NodePort
      ports:
      - name: rest
        port: 8081 #Cluster IP 上监听的端口
        targetPort: 8081 #Pod监听的端口
        nodePort: 30081 #k8s节点上监听的端口
      selector:
        app: flink
        component: jobmanager
    
  • 创建 jobmanager-rest-service:
    kubectl create -f jobmanager-rest-service.yaml
    
  • 通过kubectl get svc 查看对外端口:
    kubectl get svc
    
  • 访问查到的端口。

6. 停止集群

kubectl delete -f jobmanager-rest-service.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f flink-configuration-configmap.yaml
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐