背景

Kafka通过K8S容器化部署,Kafka重启过后,部分集群由于分区众多,会导致集群的启动过程异常缓慢,有部分现场出现需要几个小时才能正常。

又由于Kafka的存活探针最大时长为n分钟,如果服务n分钟没有起来,会导致Kafka服务反复重启,无法恢复。

通过查看Kafka启动的日志,可以

2022-11-10 15:05:43.367 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-19, dir=/var/lib/kafka] Loading producer state till offset 152509 with message format version 2
2022-11-10 15:05:43.368 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-19] Loading producer state from snapshot file '/var/lib/kafka/test1-19/00000000000000152509.snapshot'
2022-11-10 15:05:43.371 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-19, topic=test1, partition=19, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=152509) with 1 segments in 202021ms (2/80 loaded in /var/lib/kafka)
2022-11-10 15:05:43.375 [pool-6-thread-6] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-5, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:43.376 [pool-6-thread-6] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-5, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:05:43.416 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-18, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:43.417 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-18, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:05:50.287 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-16] Writing producer snapshot at offset 152586
2022-11-10 15:05:50.506 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-16, dir=/var/lib/kafka] Loading producer state till offset 152586 with message format version 2
2022-11-10 15:05:50.507 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-16] Loading producer state from snapshot file '/var/lib/kafka/test1-16/00000000000000152586.snapshot'
2022-11-10 15:05:50.511 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-16, topic=test1, partition=16, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=152586) with 1 segments in 209161ms (3/80 loaded in /var/lib/kafka)
2022-11-10 15:05:50.568 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-24, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:50.569 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-24, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:05:55.771 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-4] Writing producer snapshot at offset 150762
2022-11-10 15:05:56.598 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-4, dir=/var/lib/kafka] Loading producer state till offset 150762 with message format version 2
2022-11-10 15:05:56.600 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-4] Loading producer state from snapshot file '/var/lib/kafka/test1-4/00000000000000150762.snapshot'
2022-11-10 15:05:56.604 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-4, topic=test1, partition=4, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=150762) with 1 segments in 215254ms (4/80 loaded in /var/lib/kafka)
2022-11-10 15:05:56.678 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-6, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:56.679 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-6, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:06:00.545 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-10] Writing producer snapshot at offset 151632
2022-11-10 15:06:00.886 [pool-6-thread-5] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-7] Writing producer snapshot at offset 152082
2022-11-10 15:06:01.013 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-10, dir=/var/lib/kafka] Loading producer state till offset 151632 with message format version 2
2022-11-10 15:06:01.014 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-10] Loading producer state from snapshot file '/var/lib/kafka/test1-10/00000000000000151632.snapshot'
2022-11-10 15:06:01.019 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-10, topic=test1, partition=10, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=151632) with 1 segments in 219669ms (5/80 loaded in /var/lib/kafka)

从日志可以看出,在集群启动过程中, test1-19, test1-16, test1-4, test1-10 四个分区,分别由4个线程(pool-6-thread-2,pool-6-thread-6,pool-6-thread-1,pool-6-thread-8)进行加载,每个加载的时长都在200s左右。

整个集群有80个分区需要加载,当前已经完成5个(5/80)

对于这个问题,Kafka 官方给出的解决方案是增加集群的恢复线程数(num.recovery.threads.per.data.dir),来加快集群的恢复速度。

  • Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.

Apache Kafka

[KAFKA-6075] Kafka cannot recover after an unclean shutdown on Windows - ASF JIRA

但在实践的执行过程中,我们发现。如果将恢复线程调整到一个极大的值 ,比如100,磁盘的ioutil 会直接到达100%, 容器的load,也会处于一个非常高的值。

集群的恢复速度直接和磁盘的io挂钩,一个有700G的节点,以70MiB/s的速度加载数据,需要约3个小时(10000s) 才能完成数据的加载。

查看相关源码

private def loadLogs(): Unit = {
  info(s"Loading logs from log dirs $liveLogDirs")
  val startMs = time.hiResClockMs()
  val threadPools = ArrayBuffer.empty[ExecutorService]
  val offlineDirs = mutable.Set.empty[(String, IOException)]
  val jobs = mutable.Map.empty[File, Seq[Future[_]]]
  var numTotalLogs = 0
 
  for (dir <- liveLogDirs) {
    val logDirAbsolutePath = dir.getAbsolutePath
    try {
      val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) // 初始化 num.recovery.threads.per.data.dir 个线程的线程池
      threadPools.append(pool)
 
    //使用文档标记是否正常关闭。
      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
    
    // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不需要进行日志恢复的流程。
      if (cleanShutdownFile.exists) {
        info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
      } else {
        // log recovery itself is being performed by `Log` class during initialization
        info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
        brokerState.newState(RecoveringFromUncleanShutdown)
      }
 
      // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint
      var recoveryPoints = Map[TopicPartition, Long]()
      try {
        recoveryPoints = this.recoveryPointCheckpoints(dir).read
      } catch {
        case e: Exception =>
          warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
            s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
      }
 
      var logStartOffsets = Map[TopicPartition, Long]()
      try {
        logStartOffsets = this.logStartOffsetCheckpoints(dir).read
      } catch {
        case e: Exception =>
          warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
            s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
      }
 
     // 日志的加载与恢复主流程,并发对所有 tp 的日志执行 loadLog
      val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory)
      val numLogsLoaded = new AtomicInteger(0)
      numTotalLogs += logsToLoad.length
 
      val jobsForDir = logsToLoad.map { logDir =>
        val runnable: Runnable = () => {
          try {
            debug(s"Loading log $logDir")
 
            val logLoadStartMs = time.hiResClockMs()
            val log = loadLog(logDir, recoveryPoints, logStartOffsets)
            val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
            val currentNumLoaded = numLogsLoaded.incrementAndGet()
 
            info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +
              s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
          } catch {
            case e: IOException =>
              offlineDirs.add((logDirAbsolutePath, e))
              error(s"Error while loading log dir $logDirAbsolutePath", e)
          }
        }
        runnable
      }
 
      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
    } catch {
      case e: IOException =>
        offlineDirs.add((logDirAbsolutePath, e))
        error(s"Error while loading log dir $logDirAbsolutePath", e)
    }
  }
 
  try {
    for ((cleanShutdownFile, dirJobs) <- jobs) {
      dirJobs.foreach(_.get)
      try {
        cleanShutdownFile.delete()
      } catch {
        case e: IOException =>
          offlineDirs.add((cleanShutdownFile.getParent, e))
          error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
      }
    }
 
    offlineDirs.foreach { case (dir, e) =>
      logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
    }
  } catch {
    case e: ExecutionException =>
      error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
      throw e.getCause
  } finally {
    threadPools.foreach(_.shutdown())
  }
 
  info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
}

可以看到,日志的恢复过程会读取 cleanShutdownFile 的文件,loadLog(logDir, recoveryPoints, logStartOffsets) 进行恢复。

可以看出,是否 cleanShutdown (正常关闭)是集群能否快速启动的关键。

当对Pod进行重启时,Pod将进行关闭流程。

 详细可以参考k8s---pod的优雅退出流程(prestop和terminationGracePeriodSeconds) - du-z - 博客园

发现集群terminationGracePeriodSeconds的默认值是30s, 怀疑时间过短,无法支持Kafka进程进行正常的关闭,将宽限时间调整为300s。并且为集群增加prestop 

spec:
   preStop:
       exec:
           command: ["sh", "-ce", "kill -s TERM 1; while $(kill -0 1 2>/dev/null); do sleep 1; done"]

调整后,集群仍然无法正常的进行clean showdown,查看日志。

2022-11-16 15:03:36.003 [TxnMarkerSenderThread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Transaction Marker Channel Manager 2]: Starting
2022-11-16 15:03:36.003 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [TransactionCoordinator id=2] Startup complete.
2022-11-16 15:03:36.043 [ExpirationReaper-2-AlterAcls] INFO kafka.utils.Logging.info(Logging.scala:66) - [ExpirationReaper-2-AlterAcls]: Starting
2022-11-16 15:03:36.072 [/config/changes-event-process-thread] INFO kafka.utils.Logging.info(Logging.scala:66) - [/config/changes-event-process-thread]: Starting
2022-11-16 15:03:36.116 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Starting socket server acceptors and processors
2022-11-16 15:03:36.123 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT)
2022-11-16 15:03:36.124 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Started socket server acceptors and processors
2022-11-16 15:03:36.132 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:117) - Kafka version: 2.6.2
2022-11-16 15:03:36.133 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:118) - Kafka commitId: da65af02e5856e34
2022-11-16 15:03:36.133 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:119) - Kafka startTimeMs: 1668582216125
2022-11-16 15:03:36.136 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] started
2022-11-16 15:05:36.204 [main-SendThread(dol-zookeeper:12181)] WARN org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1190) - Client session timed out, have not heard from server in 12001ms for sessionid 0x100000463b00004
2022-11-16 15:05:36.205 [main-SendThread(dol-zookeeper:12181)] INFO org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1238) - Client session timed out, have not heard from server in 12001ms for sessionid 0x100000463b00004, closing socket connection and attempting reconnect
2022-11-16 15:05:54.650 [SIGTERM handler] INFO org.apache.kafka.common.utils.LoggingSignalHandler$1.invoke(LoggingSignalHandler.java:89) - Terminating process due to signal SIGTERM
2022-11-16 15:05:54.654 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] shutting down
2022-11-16 15:05:54.655 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] Starting controlled shutdown
2022-11-16 15:05:57.328 [main-SendThread(dol-zookeeper:12181)] ERROR org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:154) - Unable to resolve address: dol-zookeeper:12181
java.net.UnknownHostException: dol-zookeeper: Temporary failure in name resolution
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:929)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1529)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:848)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at org.apache.zookeeper.client.StaticHostProvider$1.getAllByName(StaticHostProvider.java:92)
at org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:147)
at org.apache.zookeeper.client.StaticHostProvider.next(StaticHostProvider.java:375)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1137)
2022-11-16 15:05:57.510 [main-SendThread(dol-zookeeper:12181)] WARN org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1246) - Session 0x100000463b00004 for server dol-zookeeper:12181, unexpected error, closing socket connection and attempting reconnect
java.lang.IllegalArgumentException: Unable to canonicalize address dol-zookeeper:12181 because it's not resolvable
at org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:71)
at org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:39)
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1087)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
2022-11-16 15:05:57.613 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [ZooKeeperClient Kafka server] Waiting until connected.
2022-11-16 15:05:58.612 [main-SendThread(dol-zookeeper:12181)] ERROR org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:154) - Unable to resolve address: dol-zookeeper:12181
java.net.UnknownHostException: dol-zookeeper
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at org.apache.zookeeper.client.StaticHostProvider$1.getAllByName(StaticHostProvider.java:92)
at org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:147)
at org.apache.zookeeper.client.StaticHostProvider.next(StaticHostProvider.java:375)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1137)

可以看到,集群正常收到 SIGTERM 指令,并且开始通过 shutdown-hook进行关闭。但是由于无法通过Service访问dol-zookeeper,并且输出了 Waiting until connected. 日志。查看kafka相关代码。

kafka.server.KafkaServer#shutdown


 def shutdown(): Unit = {
    try {
      info("shutting down")

      if (isStartingUp.get)
        throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")

      // To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
      // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
      // `true` at the end of this method.
      if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
        CoreUtils.swallow(controlledShutdown(), this)
        brokerState.newState(BrokerShuttingDown)

        if (dynamicConfigManager != null)
          CoreUtils.swallow(dynamicConfigManager.shutdown(), this)

        // Stop socket server to stop accepting any more connections and requests.
        // Socket server will be shutdown towards the end of the sequence.
        if (socketServer != null)
          CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
    ...

        if (logManager != null)
          CoreUtils.swallow(logManager.shutdown(), this)

        if (kafkaController != null)
          CoreUtils.swallow(kafkaController.shutdown(), this)
    ...

        // Clear all reconfigurable instances stored in DynamicBrokerConfig
        config.dynamicConfig.clear()

        brokerState.newState(NotRunning)

        startupComplete.set(false)
        isShuttingDown.set(false)
        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.brokerId.toString, metrics), this)
        shutdownLatch.countDown()
        info("shut down completed")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer shutdown.", e)
        isShuttingDown.set(false)
        throw e
    }
  }

日志文件正常关闭相关逻辑在logManager.shutdown() 中, 但是在 controlledShutdown()中发生了zk的阻塞。

 private def controlledShutdown(): Unit = {

    def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)

    val socketTimeoutMs = config.controllerSocketTimeoutMs
   
...

      var shutdownSucceeded: Boolean = false

      try {

        var remainingRetries = retries
        var prevController: Broker = null
        var ioException = false

        while (!shutdownSucceeded && remainingRetries > 0) {
          remainingRetries = remainingRetries - 1

          // 1. Find the controller and establish a connection to it.

          // Get the current controller info. This is to ensure we use the most recent info to issue the
          // controlled shutdown request.
          // If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
          zkClient.getControllerId match {
            case Some(controllerId) =>
              zkClient.getBroker(controllerId) match {
                case Some(broker) =>
                  // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
                  // attempt, connect to the most recent controller
                  if (ioException || broker != prevController) {

                    ioException = false

                    if (prevController != null)
                      networkClient.close(node(prevController).idString)

                    prevController = broker
                    metadataUpdater.setNodes(Seq(node(prevController)).asJava)
                  }
                case None =>
                  info(s"Broker registration for controller $controllerId is not available (i.e. the Controller's ZK session expired)")
              }
            case None =>
              info("No controller registered in ZooKeeper")
          }

          // 2. issue a controlled shutdown to the controller
          if (prevController != null) {
            try {

              if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs))
                throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")

              // send the controlled shutdown request
              val controlledShutdownApiVersion: Short =
                if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0
                else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1
                else if (config.interBrokerProtocolVersion < KAFKA_2_4_IV1) 2
                else 3

              val controlledShutdownRequest = new ControlledShutdownRequest.Builder(
                  new ControlledShutdownRequestData()
                    .setBrokerId(config.brokerId)
                    .setBrokerEpoch(kafkaController.brokerEpoch),
                    controlledShutdownApiVersion)
              val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
                time.milliseconds(), true)
              val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)

              val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
              if (shutdownResponse.error == Errors.NONE && shutdownResponse.data.remainingPartitions.isEmpty) {
                shutdownSucceeded = true
                info("Controlled shutdown succeeded")
              }
              else {
                info(s"Remaining partitions to move: ${shutdownResponse.data.remainingPartitions}")
                info(s"Error from controller: ${shutdownResponse.error}")
              }
            }
            catch {
              case ioe: IOException =>
                ioException = true
                warn("Error during controlled shutdown, possibly because leader movement took longer than the " +
                  s"configured controller.socket.timeout.ms and/or request.timeout.ms: ${ioe.getMessage}")
                // ignore and try again
            }
          }
          if (!shutdownSucceeded) {
            Thread.sleep(config.controlledShutdownRetryBackoffMs)
            warn("Retrying controlled shutdown after the previous attempt failed...")
          }
        }
      }
      finally
        networkClient.close()

      shutdownSucceeded
    }

    if (startupComplete.get() && config.controlledShutdownEnable) {
      // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
      // of time and try again for a configured number of retries. If all the attempt fails, we simply force
      // the shutdown.
      info("Starting controlled shutdown")

      brokerState.newState(PendingControlledShutdown)

      val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)

      if (!shutdownSucceeded)
        warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
    }
  }

controlledShutdown 首先向zk发出请求,获取当前 controllerId。但是当前请求是 retryRequestUntilConnected 的,直接导致整个shut-down-hook被卡住无法正常进行。

  def getControllerId: Option[Int] = {
    val getDataRequest = GetDataRequest(ControllerZNode.path)
    val getDataResponse = retryRequestUntilConnected(getDataRequest)
    getDataResponse.resultCode match {
      case Code.OK => ControllerZNode.decode(getDataResponse.data)
      case Code.NONODE => None
      case _ => throw getDataResponse.resultException.get
    }
  }

而zookeeper 无法被正常解析的原因是由于当pod 在被重启的同时,calico 把pod的endpoint 进行了移除。无法再进行网络请求。(测试发现,该问题是calico 版本bug,升级calico版本后问题修复)

2022-11-16 03:55:26.662 [INFO][93] felix/int_dataplane.go 1447: Received *proto.WorkloadEndpointUpdate update from calculation graph msg=id:<orchestrator_id:"k8s" workload_id:"kedacom-project-namespace/dol-kafka-2" endpoint_id:"eth0" > endpoint:<state:"active" name:"cali607076d309b" profile_ids:"kns.kedacom-project-namespace" profile_ids:"ksa.kedacom-project-namespace.default" ipv4_nets:"10.244.133.146/32" > 
2022-11-16 05:07:11.723 [INFO][93] felix/endpoint_mgr.go 476: Re-evaluated workload endpoint status adminUp=false failed=false known=false operUp=false status="" workloadEndpointID=proto.WorkloadEndpointID{OrchestratorId:"k8s", WorkloadId:"namespace/dol-kafka-2", EndpointId:"eth0"}
2022-11-16 05:07:11.723 [INFO][93] felix/status_combiner.go 58: Storing endpoint status update ipVersion=0x4 status="" workload=proto.WorkloadEndpointID{OrchestratorId:"k8s", WorkloadId:"namespace/dol-kafka-2", EndpointId:"eth0"}
2022-11-16 05:07:11.724 [INFO][93] felix/conntrack.go 90: Removing conntrack flows ip=10.244.133.146

由于以上原因,我们尝试直接调整 remainingRetries 为0, 以规避对zk的请求,使logManager 能对正常进行关闭。对应的集群配置为 controlled.shutdown.max.retries=0

重新写入数据进行测试,集群可以在2min内启动。

        while (!shutdownSucceeded && remainingRetries > 0) {
          remainingRetries = remainingRetries - 1

          // 1. Find the controller and establish a connection to it.

从逻辑上来看,进行调整后, 节点重启时,无法进行元数据的更新。如果当前节点是controller节点,无法更新通知其他节点重新选举controller。

metadataUpdater.setNodes(Seq(node(prevController)).asJava)

...

val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,time.milliseconds(), true)

由于此方案是个人方案经验方案,可能存在一些未知的问题。大家适度酌情食用。

整体总结

1、通过调整 num.recovery.threads.per.data.dir 进行集群恢复,最大速度取决于磁盘的速度。

2、在没有特殊情况下,尽量正常关闭Kafka集群。直接关闭主机等非正常关闭,可能需要重新加载所有数据文件。

3、可以通过增加 宽限时间terminationGracePeriodSeconds 和 prestop 来给Kafka足够的时间执行 shut-down-hook逻辑 。

4、设置controlled.shutdown.max.retries=0 来解决无法访问zookeeper 阻塞shut-down-hook执行的问题。(测试发现,该问题是calico 版本bug,升级calico版本后问题修复)

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐