Kafka 0.8源碼分析—ZookeeperConsumerConnector


1.HighLevelApi

High Level Api是多線程的應用程序,以Topic的Partition數量為中心。消費的規則如下:

  • 一個partition只能被同一個ConsumersGroup的一個線程所消費.
  • 線程數小於partition數,某些線程會消費多個partition.
  • 線程數等於partition數,一個線程正好消費一個線程.
  • 當添加消費者線程時,會觸發rebalance,partition的分配發送變化.
  • 同一個partition的offset保證消費有序,不同的partition消費不保證順序.

image

關於與ZK的幾個參數意思解釋

  • zookeeper.connect: ZK連接。
  • group.id: Consumer消費ID。
  • zookeeper.session.timeout.ms: kafka節點與ZK會話的超時時間。
  • zookeeper.sync.time.ms: zk的follower與leader的同步時間間隔。
  • auto.commit.interval.ms: Consumer offset自動提交給Zookeeper的時間。

Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.(由於記錄Offset是基於時間的,所以當Consumer發生錯誤的時候,有可能會收到重復的消息。)

消費者的代碼

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threads));

Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(threads);

for (KafkaStream<byte[], byte[]> stream : streams) {
    executor.submit(new KafkaConsumerWorker(stream));
}

2.ZookeeperConsumerConnector

一個Consumer會創建一個ZookeeperConsumerConnector,代表一個消費者進程.

  • fetcher: 消費者獲取數據, 使用ConsumerFetcherManager fetcher線程抓取數據
  • zkUtils: 消費者要和ZK通信, 除了注冊自己,還有其他信息也會寫到ZK中
  • topicThreadIdAndQueues: 消費者會指定自己消費哪些topic,並指定線程數, 所以topicThreadId都對應一個隊列
  • messageStreamCreated: 消費者會創建消息流, 每個隊列都對應一個消息流
  • offsetsChannel: offset可以存儲在ZK或者kafka中,如果存在kafka里,像其他請求一樣,需要和Broker通信
  • 還有其他幾個Listener監聽器,分別用於topicPartition的更新,負載均衡,消費者重新負載等

當Broker掛掉的時候,在這個Broker上的所有Partition都丟失了,而Partition是給消費者服務的.
所以Broker掛掉后在做遷移的時候,會將其上的Partition轉移到其他Broker上,因此消費者要消費的Partition也跟着變化.

2.1 init

在創建ZookeeperConsumerConnector時,有幾個初始化方法需要事先執行.

  • 消費者要和ZK通信,所以connectZk會確保連接上ZooKeeper
  • 消費者要消費數據,需要有抓取線程,所有的抓取線程交給ConsumerFetcherManager統一管理
  • 由消費者客戶端自己保存offset,而消費者會消費多個topic的多個partition.
  • 多個partition的offset管理類OffsetManager是一個GroupCoordinator
  • 定時提交線程會使用OffsetManager建立的通道定時提交offset到zk或者kafka.
    image

2.2 createMessageStreams

ConsumerConnector創建消息流,需要指定解碼器,因為要將日志反序列化(生產者寫消息時對消息序列化到日志文件).

在kafka的運行過程中,會有其他的線程將數據放入partition對應的queue中. 而queue是用於KafkaStream的.
一旦數據添加到queue后,KafkaStream的阻塞隊列就有數據了,消費者就可以從隊列中消費消息.

  • createMessageStreams: 返回KafkaStream, 每個Topic都對應了多個KafkaStream. 數量和topicCount中的count一樣.
例子解釋

假設消費者C1聲明了topic1:2, topic2:3. topicThreadIds=consumerThreadIdsPerTopicMap.
topicThreadIds.values = [ (C1_1,C1_2), (C1_1,C1_2,C1_3)]一共有5個線程,queuesAndStreams也有5個元素.

consumerThreadIdsPerTopicMap = {
    topic1: [C1_1, C1_2],
    topic2: [C1_1, C1_2, C1_3]
}
topicThreadIds.values = [
    [C1_1, C1_2],
    [C1_1, C1_2, C1_3]
]
threadIdSet循環[C1_1, C1_2]時, 生成兩個queue->stream pair. 
threadIdSet循環[C1_1, C1_2, C1_3]時, 生成三個queue->stream pair. 
queuesAndStreams = [
    (LinkedBlockingQueue_1,KafkaStream_1),      //topic1:C1_1
    (LinkedBlockingQueue_2,KafkaStream_2),      //topic1:C1_2
    (LinkedBlockingQueue_3,KafkaStream_3),      //topic2:C1_1
    (LinkedBlockingQueue_4,KafkaStream_4),      //topic2:C1_2
    (LinkedBlockingQueue_5,KafkaStream_5),      //topic2:C1_3
]
  • 客戶端關注的是我的每個線程都對應了一個隊列,每個隊列都是一個消息流就可以了.
  • 客戶端的每個線程實際上是針對Partition級別的,一個線程對應一個或多個partition。

2.3 registerConsumerInZK

消費者需要向ZK注冊一個臨時節點,路徑為:/consumers/[group_id]/ids/[consumer_id],內容為訂閱的topic.

問題:什么時候這個節點會被刪除掉呢? Consumer進程掛掉時,或者Session失效時刪除臨時節點. 重連時會重新創建.
由於是臨時節點,一旦創建節點的這個進程掛掉了,臨時節點就會自動被刪除掉. 這是由zk機制決定的,不是由消費者完成的.

2.4 reinitializeConsumer listener

當前Consumer在ZK注冊之后,需要重新初始化Consumer.對於全新的消費者,注冊多個監聽器,在zk的對應節點的注冊事件發生時,會回調監聽器的方法.

  • 將topic對應的消費者線程id及對應的LinkedBlockingQueue放入topicThreadIdAndQueues中,LinkedBlockingQueue是真正存放數據的queue
  1. 注冊sessionExpirationListener,監聽狀態變化事件.在session失效重新創建session時調用
  2. 向/consumers/[group_id]/ids注冊Child變更事件的loadBalancerListener,當消費組下的消費者發生變化時調用
  3. 向/brokers/topics/[topic]注冊Data變更事件的topicPartitionChangeListener,在topic數據發生變化時調用
  • 顯式調用loadBalancerListener.syncedRebalance(), 會調用reblance方法進行consumer的初始化工作
private def reinitializeConsumer[K,V](topicCount: TopicCount, 
  queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
  val dirs = new ZKGroupDirs(config.groupId)
  // ② listener to consumer and partition changes
  if (loadBalancerListener == null) {
    val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
    loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString, 
      topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
  }
  // ① create listener for session expired event if not exist yet
  if (sessionExpirationListener == null) sessionExpirationListener = 
    new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)
  // ③ create listener for topic partition change event if not exist yet
  if (topicPartitionChangeListener == null) 
    topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)

  // listener to consumer and partition changes
  zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
  zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
  // register on broker partition path changes.
  topicStreamsMap.foreach { topicAndStreams => 
    zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)
  }

  // explicitly trigger load balancing for this consumer
  loadBalancerListener.syncedRebalance()
}

ZKRebalancerListener傳入ZKSessionExpireListener和ZKTopicPartitionChangeListener.它們都會使用ZKRebalancerListener完成自己的工作.

2.5 ZKSessionExpireListener

當Session失效時,新的會話建立時,立即進行rebalance操作.

2.6 ZKTopicPartitionChangeListener

當topic的數據變化時,通過觸發的方式啟動rebalance操作.

2.7 ZKRebalancerListener watcher

image

image

class ZKRebalancerListener(val group: String, val consumerIdString: String,
                           val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
  extends IZkChildListener {
  private var isWatcherTriggered = false
  private val lock = new ReentrantLock
  private val cond = lock.newCondition()

  private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
    override def run() {
      var doRebalance = false
      while (!isShuttingDown.get) {
          lock.lock()
          try {
            // 如果isWatcherTriggered=false,則不會觸發syncedRebalance. 等待1秒后,繼續判斷
            if (!isWatcherTriggered)
              cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
          } finally {
            // 不管isWatcherTriggered值是多少,在每次循環時,都會執行. 如果isWatcherTriggered=true,則會執行syncedRebalance
            doRebalance = isWatcherTriggered
            // 重新設置isWatcherTriggered=false, 因為其他線程觸發一次后就失效了,想要再次觸發,必須再次設置isWatcherTriggered=true
            isWatcherTriggered = false
            lock.unlock()
          }
          if (doRebalance) syncedRebalance        // 只有每次rebalanceEventTriggered時,才會調用一次syncedRebalance
      }
    }
  }
  watcherExecutorThread.start()

  // 觸發rebalance開始進行, 修改isWatcherTriggered標志位,觸發cond條件運行
  def rebalanceEventTriggered() {
    inLock(lock) {
      isWatcherTriggered = true
      cond.signalAll()
    }
  }

3.ZKRebalancerListener rebalance

因為消費者加入/退出時,消費組的成員會發生變化,而消費組中的所有存活消費者負責消費可用的partitions.
可用的partitions或者消費組中的消費者成員一旦發生變化,都要重新分配partition給存活的消費者.下面是一個示例.

當然分配partition的工作絕不僅僅是這么簡單的,還要處理與之相關的線程,並重建必要的數據:

  1. 關閉數據抓取線程,獲取之前為topic設置的存放數據的queue並清空該queue
  2. 釋放partition的ownership,刪除partition和consumer的對應關系
  3. 為各個partition重新分配threadid
    獲取partition最新的offset並重新初始化新的PartitionTopicInfo(queue存放數據,兩個offset為partition最新的offset)
  4. 重新將partition對應的新的consumer信息寫入zookeeper
  5. 重新創建partition的fetcher線程

image

private def rebalance(cluster: Cluster): Boolean = {
  val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, 
    zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
  val brokers = zkUtils.getAllBrokersInCluster()
  if (brokers.size == 0) {
    zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
    true
  } else {
    // ① 停止fetcher線程防止數據重復.如果當前調整失敗了,被釋放的partitions可能被其他消費者擁有.
    // 而沒有先停止fetcher的話,原先的消費者仍然會和新的擁有者共同消費同一份數據.  
    closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
    // ② 釋放topicRegistry中topic-partition的owner
    releasePartitionOwnership(topicRegistry)
    // ③ 為partition重新分配消費者....
    // ④ 為partition添加consumer owner
    if(reflectPartitionOwnershipDecision(partitionAssignment)) {
        allTopicsOwnedPartitionsCount = partitionAssignment.size
        topicRegistry = currentTopicRegistry
        // ⑤ 創建拉取線程
        updateFetcher(cluster)
        true
    }
  }
}

rebalance操作涉及了以下內容:

  • PartitionOwnership: Partition的所有者(ownership)的刪除和重建
  • AssignmentContext: 分配信息上下文
  • PartitionAssignor: 為Partition分配Consumer的算法
  • PartitionAssignment: Partition分配之后的上下文
  • PartitionTopicInfo: Partition的最終信息
  • Fetcher: 完成了rebalance,消費者就可以重新開始抓取數據

3.1 核心:PartitionAssignor

將可用的partitions以及消費者線程排序, 將partitions處於線程數,表示每個線程(不是消費者數量)平均可以分到幾個partition.

如果除不盡,剩余的會分給前面幾個消費者線程. 比如有兩個消費者,每個都是兩個線程,一共有5個可用的partitions: (p0-p4).

每個消費者線程(一共四個線程)可以獲取到至少一共partition(5/4=1),剩余一個(5%4=1)partition分給第一個線程.
最后的分配結果為: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1

image

關閉Fetcher時要注意:

  • 先提交offset,然后才停止消費者. 因為在停止消費者的時候當前的數據塊中還會有點殘留數據.
  • 因為這時候還沒有釋放partiton的ownership(即partition還歸當前consumer所有),強制提交offset,
    這樣擁有這個partition的下一個消費者線程(rebalance后),就可以使用已經提交的offset了,確保不中斷.
  • 因為fetcher線程已經關閉了(stopConnections),這是消費者能得到的最后一個數據塊,以后不會有了,直到平衡結束,fetcher重新開始
  1. topicThreadIdAndQueues來自於topicThreadIds,所以它的topic應該都在relevantTopicThreadIdsMap的topics中.
    為什么還要過濾呢? 注釋中說到在本次平衡之后,只需要清理可能不再屬於這個消費者的隊列(部分的topicPartition抓取隊列).

  2. 問題:新創建的ZKRebalancerListener中kafkaMessageAndMetadataStreams(即這里的messageStreams)為空的Map.
    如何清空里面的數據? 實際上KafkaStream只是一個迭代器,在運行過程中會有數據放入到這個流中,這樣流就有數據了.

4.ConsumerFetcherManager

Fetcher線程要抓取數據關心的是PartitionTopicInfo,首先要找出Partition Leader(因為只向Leader Partition發起抓取請求).
初始時假設所有topicInfos(PartitionTopicInfo)都找不到Leader,即同時加入partitionMap和noLeaderPartitionSet.
在LeaderFinderThread線程中如果找到Leader,則從noLeaderPartitionSet中移除.

ConsumerFetcherManager管理了當前Consumer的所有Fetcher線程.

image

5.小結

high level的Consumer Rebalance的控制策略是由每一個Consumer通過在Zookeeper上注冊Watch完成的。
每個Consumer被創建時會觸發Consumer Group的Rebalance,具體的啟動流程是:

  1. (High Level)Consumer啟動時將其ID注冊到其Consumer Group下 (registerConsumerInZK)
  2. 在/consumers/[group_id]/ids上和/brokers/ids上分別注冊Watch (reinitializeConsumer->Listener)
  3. 強制自己在其Consumer Group內啟動Rebalance流程 (ZKRebalancerListener.rebalance)

在這種策略下,每一個Consumer或者Broker的增加或者減少都會觸發Consumer Rebalance。
因為每個Consumer只負責調整自己所消費的Partition,為了保證整個Consumer Group的一致性,
當一個Consumer觸發了Rebalance時,該Consumer Group內的其它所有其它Consumer也應該同時觸發Rebalance。

該方式有如下缺陷:

  • Herd effect(羊群效應): 任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance
  • Split Brain(腦裂): 每個Consumer分別單獨通過Zookeeper判斷哪些Broker和Consumer 宕機了,
    那么不同Consumer在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會造成不正確的Reblance嘗試。
  • 調整結果不可控: 所有的Consumer都並不知道其它Consumer的Rebalance是否成功,這可能會導致Kafka工作在一個不正確的狀態。

根據Kafka官方文檔,Kafka作者正在考慮在還未發布的0.9.x版本中使用中心協調器(coordinator)。大體思想是選舉出一個broker作為coordinator,由它watch Zookeeper,從而判斷是否有partition或者consumer的增減,然后生成rebalance命令,並檢查是否這些rebalance在所有相關的consumer中被執行成功,如果不成功則重試,若成功則認為此次rebalance成功.

參考這篇博文,將這個類好好總結一下。
涉及到元數據信息的不一致問題,還有rebalance的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM