【原創】大叔問題定位分享(4)Kafka集群broker節點從zookeeper上消失


kafka_2.8.0-0.8.1

一 現象

生產環境一組kafka集群經常發生問題,現象是kafka在zookeeper上的broker節點消失,此時kafka進程和端口都在,然后每個broker都在報錯,主要是

1)

[2017-01-09 12:40:53,832] INFO Partition [topic1,3] on broker 1361: Shrinking ISR for partition [topic1,3] from 1351,1361,1341 to 1361 (kafka.cluster.Partition)

2)

[2017-01-09 12:33:53,858] ERROR Conditional update of path /brokers/topics/topic2/partitions/0/state with data {"controller_epoch":20,"leader":1361,"version":1,"leader_epoch":13,"isr":[1361]} and expected version 23 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/topic2/partitions/0/state (kafka.utils.ZkUtils$)

同時客戶端也在報錯;

 

二 診斷

最近的一次問題發生在2017年1月9號中午,3台broker(134/135/136)先后從zookeeper上消失,最后服務不可用,根據日志整理過程如下:

136 is controller

2017-01-09 12:33:33 134 zk disconnect

2017-01-09 12:33:33 134 zk connect

2017-01-09 12:33:36 135 zk disconnect

2017-01-09 12:33:36 136 zk disconnect[dispear]

2017-01-09 12:33:36 134 become controller

2017-01-09 12:33:37 135 zk connect

2017-01-09 12:33:42 134 zk disconnect[dispear]

2017-01-09 12:33:44 135 become controller

135 zk disconnect[dispear]

 

134 restart

2017-01-09 12:37:32 134 zk connect

2017-01-09 12:37:32 134 become controller

2017-01-09 12:37:39 134 zk disconnect[dispear]

 

135 restart

2017-01-09 12:38:14 135 zk connect[ok]

2017-01-09 12:38:14 135 become controller[ok]

 

134 restart

2017-01-09 12:39:41 134 zk connect[ok]

 

136 restart

2017-01-09 12:41:19 136 zk connect[ok]

在12點33分-12點37分服務不可用期間,3台broker的jstack都有相同的兩個堆棧:

1)

"delete-topics-thread" prio=10 tid=0x00007f52ec06c800 nid=0x18b8 waiting on condition [0x00007f52b59dd000]

   java.lang.Thread.State: WAITING (parking)

        at sun.misc.Unsafe.park(Native Method)

        - parking to wait for  <0x00000000c55a2140> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

        at java.util.concurrent.locks.LockSupport.park(Unknown Source)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown Source)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)

        at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)

        at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)

        at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)

        at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)

        at kafka.utils.Utils$.inLock(Utils.scala:538)

        at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

2)

"ZkClient-EventThread-12-kafka-common1.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common2.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common3.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common4.hangzhou-1.zookeeper.internal.lede.com:2182,kafka-common5.hangzhou-1.zookeeper.internal.lede.com:2182" daemon prio=10 tid=0x00007f533435f800 nid=0xbb4 waiting on condition [0x00007f531a8be000]

   java.lang.Thread.State: WAITING (parking)

        at sun.misc.Unsafe.park(Native Method)

        - parking to wait for  <0x00000000e41c5cf8> (a java.util.concurrent.CountDownLatch$Sync)

        at java.util.concurrent.locks.LockSupport.park(Unknown Source)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown Source)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)

        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)

        at java.util.concurrent.CountDownLatch.await(Unknown Source)

        at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)

        at kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)

        at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)

        at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)

        at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)

        at kafka.utils.Utils$.inLock(Utils.scala:538)

        at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)

        at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)

        at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)

        at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)

        at kafka.utils.Utils$.inLock(Utils.scala:538)

        at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)

        at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)

        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

 

先看第二個堆棧,Kafka controller在集群中只有一個,可以通過zk上的/controller查看當前controller,controller會啟動一個DeleteTopicsThread,同時會注冊一個SessionExpirationListener,當與zookeeper的連接斷開重連之后會回調handleNewSession(如下),主要工作是清理當前的controller狀態並重新elect:

    /**

     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create

     * any ephemeral nodes here.

     *

     * @throws Exception

     *             On any error.

     */

    @throws(classOf[Exception])

    def handleNewSession() {

      info("ZK expired; shut down all controller components and try to re-elect")

      inLock(controllerContext.controllerLock) {

        onControllerResignation()

        controllerElector.elect

      }

    }

  }

 

其中onControllerResignation(如下):

  /**

   * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is

   * required to clean up internal controller data structures

   */

  def onControllerResignation() {

    inLock(controllerContext.controllerLock) {

      if (config.autoLeaderRebalanceEnable)

        autoRebalanceScheduler.shutdown()

      deleteTopicManager.shutdown()

      Utils.unregisterMBean(KafkaController.MBeanName)

      partitionStateMachine.shutdown()

      replicaStateMachine.shutdown()

      if(controllerContext.controllerChannelManager != null) {

        controllerContext.controllerChannelManager.shutdown()

        controllerContext.controllerChannelManager = null

      }

    }

  }

 

其中shutdown(如下):

  /**

   * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared

   */

  def shutdown() {

    deleteTopicsThread.shutdown()

    topicsToBeDeleted.clear()

    topicsIneligibleForDeletion.clear()

  }

其中shutdown(如下):

  def shutdown(): Unit = {

    info("Shutting down")

    isRunning.set(false)

    if (isInterruptible)

      interrupt()

    shutdownLatch.await()

    info("Shutdown completed")

  }

第二個堆棧中線程就是卡在這一行,除了這剛才說到的SessionExpirationListener,kafka在啟動時還會啟動一個KafkaHealthcheck:

    kafkaController = new KafkaController(config, zkClient)

   

    /* start processing requests */

    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)

    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

  

    Mx4jLoader.maybeLoad()

 

    replicaManager.startup()

    kafkaController.startup()

   

    topicConfigManager = new TopicConfigManager(zkClient, logManager)

    topicConfigManager.startup()

   

    /* tell everyone we are alive */

    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

    kafkaHealthcheck.startup()

其中也會注冊SessionExpireListener(如下),這里會在zk上/brokers/ids下注冊broker結點:

   /**

     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create

     * any ephemeral nodes here.

     *

     * @throws Exception

     *             On any error.

     */

    @throws(classOf[Exception])

    def handleNewSession() {

      info("re-registering broker info in ZK for broker " + brokerId)

      register()

      info("done re-registering broker")

      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))

}

但是這個listener是在controller之后注冊的,在ZkClient代碼中是通過list存放這些listener,並且回調的時候是逐個串行回調,所以如果一個broker上有controller,則必須在controller的handleNewSession之后才會調用KafkaHealthcheck的handleNewSession,所以當controller的handleNewSession卡住之后就不會到/brokers/ids下注冊broker節點,看起來就是broker節點從zk上消失,卡住是因為controller在等DeleteTopicsThread做shutdown,DeleteTopicsThread的工作流程是:

  override def run(): Unit = {

    info("Starting ")

    try{

      while(isRunning.get()){

        doWork()

      }

    } catch{

      case e: Throwable =>

        if(isRunning.get())

          error("Error due to ", e)

    }

    shutdownLatch.countDown()

    info("Stopped ")

  }

第一個堆棧就是卡在doWork(如下)上:

 class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") {

    val zkClient = controllerContext.zkClient

    override def doWork() {

      inLock(controllerContext.controllerLock) {

        awaitTopicDeletionNotification()

        val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted

其中awaitTopicDeletionNotification(如下):

 /**

   * Invoked by the delete-topic-thread to wait until events that either trigger, restart or halt topic deletion occur.

   * controllerLock should be acquired before invoking this API

   */

  private def awaitTopicDeletionNotification() {

    while(!deleteTopicStateChanged) {

      info("Waiting for signal to start or continue topic deletion")

      deleteTopicsCond.await()

    }

    deleteTopicStateChanged = false

  }

這個condition只有在resumeTopicDeletionThread方法中被喚醒:

 /**

   * Signals the delete-topic-thread to process topic deletion

   */

  private def resumeTopicDeletionThread() {

    deleteTopicStateChanged = true

    deleteTopicsCond.signal()

  }

這個resumeTopicDeletionThread方法會在4種情形中調用,否則doWork會一直卡住;

 

簡單來說,controller在zookeeper連接斷開重連之后,會嘗試onControllerResignation(清理之前的controller狀態)並重新elect,onControllerResignation會等待DeleteTopicsThread退出,而DeleteTopicsThread卡在doWork上導致controller流程被卡住,只有幾種情形下才會正常執行完doWork(這個有隨機性);這個問題在新版的kafka中被修復(0.9及以上),其中shutdown方法被修改為:

  /**

   * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.

   */

  def shutdown() {

    // Only allow one shutdown to go through

    if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {

      // Resume the topic deletion so it doesn't block on the condition

      resumeTopicDeletionThread()

      // Await delete topic thread to exit

      deleteTopicsThread.awaitShutdown()

      topicsToBeDeleted.clear()

      partitionsToBeDeleted.clear()

      topicsIneligibleForDeletion.clear()

    }

  }

其中initiateShutdown和awaitShutdown方法如下:

  def initiateShutdown(): Boolean = {

    if(isRunning.compareAndSet(true, false)) {

      info("Shutting down")

      isRunning.set(false)

      if (isInterruptible)

        interrupt()

      true

    } else

      false

  }

 

    /**

   * After calling initiateShutdown(), use this API to wait until the shutdown is complete

   */

  def awaitShutdown(): Unit = {

    shutdownLatch.await()

    info("Shutdown completed")

  }

可見是將0.8版本中的shutdown方法拆成initiateShutdown和awaitShutdown方法,並在中間調用resumeTopicDeletionThread方法避免在doWork上卡住,升級到至少0.9版本就可以解決,官方升級流程如下:

Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0

0.9.0.0 has potential breaking changes (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.

For a rolling upgrade:

  1. Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X
  2. Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
  3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.
  4. Restart the brokers one by one for the new protocol version to take effect

Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

 

詳見:http://kafka.apache.org/documentation/#upgrade_9

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM