背景

最近业务需要使用Flink, 于是把之前Flink的相关技术拿出来重新回顾一下, 于是想起这个之前一直没有去解决的问题. 本文主要讲解如何解决这一问题以及发生这个问题的根本原因.

运行Flink 官方docker image

此处不多说,访问docker hub flink官方的Image. 选择自己需要版本的flink官方镜像(此处我选的是flink:scala_2.11 因为要使用到scala shell所以选的scala版本不是最新的) 然后按照官方给的docker-compose 文件简单改动一下启动即可

version: "3"
services:
  jobmanager:
    image: flink:scala_2.11
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - "JOB_MANAGER_RPC_ADDRESS=jobmanager"

  taskmanager:
    image: flink:scala_2.11
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - "JOB_MANAGER_RPC_ADDRESS=jobmanager"

此时访问web ui localhost:8081 查看 jobmanager 的 logs 和 stdout
在这里插入图片描述
file unavailable 无法查看logs 和 stdout. 提交一个wordcount程序

object StreamingJob {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val textStream = env.fromElements(
      "hello word",
            "this is flink demo"
    )

    val counts = textStream.flatMap { _.toLowerCase().split("\\W+") } .map { (_, 1) }.keyBy(0).sum(1)

    counts.print()
    env.execute("word count")
  }
}

查看结果, 运行结束且正常.
在这里插入图片描述
查看taskmanager的logs 和 stdout 依然是什么都没有, 此程序有print运行结果的,但是实际上并未查看到.
针对taskmanger容器 使用docker logs 查看日志

docker logs 18a4a82ad9fe(taskmanager container id)

看到日志里面有print出来结果

Source: Collection Source -> Flat Map -> Map (1/1) 13fcdc91c88d72a6785c2b3d6f6e23c1.
(hello,1)
(word,1)
(this,1)
(is,1)
(flink,1)
(demo,1)

只是这个结果目前不能在web ui 的 taskmanager stdout上面查看.

解决方案

遇到此类配置相关的问题, 想必大家第一时间就是去stackoverflow上面去找答案. 上面确实有这个问题的答案,我把连接贴到这里:Apache Flink: The file STDOUT is not available on the TaskExecutor

解决方案里面更改了docker-entrypoint.sh 文件将jobmanager和taskmanager 的启动相关参数, 下面是解决方案里面的改动
在这里插入图片描述

下面是改动之前的脚本:

if [ "$1" = "help" ]; then
    echo "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|help)"
    exit 0
elif [ "$1" = "jobmanager" ]; then
    shift 1
    prepare_job_manager_start

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
    shift 1
    prepare_job_manager_start

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "$@"
elif [ "$1" = "taskmanager" ]; then
    shift 1
    echo "Starting Task Manager"
    copy_plugins_if_required

    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

    set_common_options
    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}

    if [ -n "${FLINK_PROPERTIES}" ]; then
        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
    fi
    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "$@"
fi

exec "$@"

对比一下两者的区别, 改动前为start-foreground 应该是前台启动的意思 改动之后是直接start 看样子像是变成了后台启动. 然后在脚本最后执行了一个tail -f XXXX的命令 表示持续输出某个文件的内容到console.
懂docker 容器技术的人看到这个tail -f XXX 应该瞬间明白了为什么作者要这样处理. 容器中运行的程序至少一个前台进程, 如果全是后台进程,那么容器一启动就会立即停止. 所以当我们容器中的程序一定要以后台进程方式运行的时候,一般容器的启动脚本中都会加一个tail -f XXXXX保证有一个前台程序一直在run, 这样容器不会被关闭.

所以将docker-entrypoint.sh 执行jobmanager.sh 以及 执行taskmanager.sh的地方更改并在最后加上

exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"

保证容器中一直有前台进程运行,改动后的docker-entrypoint.sh 部分如下:

if [ "$1" = "help" ]; then
    echo "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|help)"
    exit 0
elif [ "$1" = "jobmanager" ]; then
    shift 1
    prepare_job_manager_start

    $FLINK_HOME/bin/jobmanager.sh start "$@"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
    shift 1
    prepare_job_manager_start

    $FLINK_HOME/bin/standalone-job.sh start "$@"
elif [ "$1" = "taskmanager" ]; then
    shift 1
    echo "Starting Task Manager"
    copy_plugins_if_required

    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

    set_common_options
    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}

    if [ -n "${FLINK_PROPERTIES}" ]; then
        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
    fi
    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"

    $FLINK_HOME/bin/taskmanager.sh start "$@"
fi

sleep 1
exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"

用这个文件替换掉官方的docker-entrypoint.sh 构建自己的flink image 替换掉官方的image
运行docker-compose.yml 访问localhost:8081查看jobmanager的log
在这里插入图片描述
发现通过web ui可以查看到日志了.
同样提交word count的程序查看stdout运行结果
在这里插入图片描述
至此,这个问题已经解决了.

问题根本原因

按照上面的步骤可以解决遇到的问题,但是需要找到问题的根本,是什么原因导致官方的镜像不能够通过web ui查看log和 stdout, 只能够通过docker logs {container_id}的方式去查看日志和运行结果. 追本溯源还是 jobmanager.sh 和 taskmanager.sh 两个脚本. 改前与改后的区别就是传参不同, 一个是start-foreground 一个是start 查看jobmanager.sh源码发现传入这两个参数之后真正执行的脚本是不同的

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi

官方镜像传入的start-foreground 后执行的脚本是flink-console.sh
而改动之后执行的是flink-daemon.sh. 其实看到这里,大概就明白七八成了. 官方的方式运行的是flink-console.sh 即是命令行模式, 凭经验命令行模式内容都打印在命令行里, 属于前台执行进程. 而改动之后执行的是flink-daemnon.sh 表示flink将以后台进程执行. 这也能解释在改动的docker-entrypoint.sh 为什么最后会加一个tail -f XXXX的命令了. 因为此处让flink后台执行了, 如果不加tail命令, 那么容器中只有flink这个后台进程,一启动会立刻关闭.

在继续深入下去看flink-console.sh 和 flink-daemon.sh 发现有一段日志设置相关的代码比较有意思:
flink-console.sh

log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")

以及 flink-daemon.sh

FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"

log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")

这里在flink-daemon.sh 中有一个 -Dlog.file的配置 log.file 非常关键的系统变量.而在flink-console.sh里面没有设置这个系统变量.
此时可以查看一下flink下conf里面关于日志的配置文件:
比如看log4j.properties

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

发现里面log4j.appender.file.file正好使用的是 log.file 这个系统变量.
所以这个问题的根源就是这个log.file系统变量配置问题. 在官方镜像执行过程中, 由于运行的是flink-console.sh 脚本, 没有设置这个log.file 因此不会再容器中{FLINK_HOME}/log 下面生成log以及stdout(可以自己启动官方镜像,然后进入容器去check,确实没有log和out文件生成). 而web ui 正好是从这个目录下获取log和stdout. 改动之后使用flink-daemon.sh 启动时, 会去设置这个log.file 系统变量, flink启动后, log以及out都会存放在{FLINK_HOME}/log, 使用web ui 就能看到这些结果了. 其实当flink不以容器启动的时候,都能够看到log文件夹下面有log和out的文件, 官方镜像启动使用flink-console.sh 让flink前台启动,保证容器启动时有前台进程,所以导致log和out没有写到log文件夹中, 进而也无法通过web ui 去查看log和stdout了.这就是这个问题的根本原因.

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐