Kafka 0.8 Consumer Rebalance


1 Rebalance時機

0.10kafka的rebalance條件

  • 條件1:有新的consumer加入
  • 條件2:舊的consumer掛了
  • 條件3:coordinator掛了,集群選舉出新的coordinator(0.10 特有的)
  • 條件4:topic的partition新加
  • 條件5:consumer調用unsubscrible(),取消topic的訂閱

當一個group中,有consumer加入或者離開時,會觸發partitions均衡.均衡的最終目的,是提升topic的並發消費能力.

當consumer啟動時,所觸發的操作:

  1. 首先進行"Consumer Id注冊",eg:/consumers/clientSearchBhvGp/ids/clientSearchBhvGp_yz4823.hadoop.data.sina.com.cn-1466668340717-ed3a5f41
  2. 然后在"Consumer id"節點下注冊一個watch用來監聽當前group中其他consumer的"退出"和"加入";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).
  3. 在"Broker id"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.

2 消費Partition

Consumer Group中各個consumer是根據先后啟動的順序有序消費一個topic的所有partitions的。

分配算法

  1. 假如topic1,具有如下partitions: P0,P1,P2,P3
  2. 加入group中,有如下consumer: C0,C1
  3. 首先根據partition索引號對partitions排序: P0,P1,P2,P3
  4. 根據(consumer.id + '-'+ thread序號)排序: C0,C1
  5. 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
  6. 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

舉例說明:80個partition,有12個線程,如何進行分配。

3 Consumer owner

zk路徑:/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引編號

可以查看每個partition屬於哪個Consumers線程。

4 Consumer offset

zk路徑:/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

這個是永久節點,用來跟蹤每個consumer目前所消費的partition中最大的offset。

通過這個可以排查不正常的Consumers線程,如果它的Offset不增長,而該topic的其它partition的Offset在增加,說明該partition的消費是不正常的。

5 Rebalance的策略

Consumer Rebalance的控制策略是由每一個Consumer通過在Zookeeper上注冊Watch完成的。每個Consumer被創建時會觸發 Consumer Group的Rebalance,具體啟動流程如下:

  • High Level Consumer啟動時將其ID注冊到其Consumer Group下,在Zookeeper上的路徑為/consumers/[consumer group]/ids/[consumer id]
  • 在/consumers/[consumer group]/ids上注冊Watch
  • 在/brokers/ids上注冊Watch
    如果Consumer通過Topic Filter創建消息流,則它會同時在/brokers/topics上也創建Watch
  • 強制自己在其Consumer Group內啟動Rebalance流程

該方式有如下缺陷:

  • Herd effect:
    任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance

  • Split Brain:
    每個Consumer分別單獨通過Zookeeper判斷哪些Broker和Consumer 宕機了,那么不同Consumer在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會造成不正確的Reblance嘗試。

  • 調整結果不可控:
    所有的Consumer都並不知道其它Consumer的Rebalance是否成功,這可能會導致Kafka工作在一個不正確的狀態。(這個就是目前出現的bug)

根據Kafka社區wiki,Kafka作者正在考慮在還未發布的0.9.x版本中使用中心協調器(Coordinator)。大體思想是為所有Consumer Group的子集選舉出一個Broker作為Coordinator,由它來管理Consumer的增減,然后生成Rebalance命令,並檢查是否這些Rebalance。

6 影響Rebalance失敗的因素

6.1 rebalance失敗官方解釋:

consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for “conflict in “).

If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms.

Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won’t realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.

最后的這一條說,實際情況並不是如此:rebalance.max.retries(4) * rebalance.backoff.ms(2000) > zookeeper.session.timeout.ms(6000),集群原來是這樣的配置,仍然出現了rebalance失敗。

6.2 分析

  1. rebalance 重試的sleep時間:kafka/consumer/ZookeeperConsumerConnector.scala:393
  • "rebalance.backoff.ms","zookeeper.sync.time.ms", 2000
  1. rebalance 重試次數超過4次,syncedRebalance拋出的是RuntimeException,在下面的代碼過程中,將這個異常捕獲了,只記錄這個ERROR。

kafka/consumer/ZookeeperConsumerConnector.scala:328,正確的做法是捕獲到RunTimeException異常,通過exit(-1)讓JVM這個進程退出。對於OLS程序會讓它,重啟一個Container繼續運行。

6.3 解決

  • 加大重試時間:"rebalance.backoff.ms=5000"
  • 加大retry: "rebalance.max.retries=10"
  • 捕獲"ConsumerRebalanceFailedException",退出程序。

7 一個失敗的例子

consumer group:client_search_hbv(12個線程)消費topic:weibo_common_act2(80個partition),出現6個partition無線程消費,表現的日志是Consumers Owner Instances is None。

1.現象

該6個partition本該被一個進程id:90ac 占用消費,zookeeper上有該ids的節點。查看90ac的啟動日志,發現它rebalance 失敗了4次。

2016-06-23 15:52:31,473 ERROR kafka.consumer.ZookeeperConsumerConnector: [clientSearchBhvGp_yz4834.hadoop.data.sina.com.cn-1466668272656-90a8bbdc], error during syncedRebalance
kafka.common.ConsumerRebalanceFailedException: clientSearchBhvGp_yz4834.hadoop.data.sina.com.cn-1466668272656-90a8bbdc can't rebalance after 4 retries
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:397)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:326)

2.分析

44-49的partition沒有被消費,由48.34的進程消費;48.34在爭取44-49 partitions的過程中,節點的值沒有被48.37的進程release,導致一直創建臨時節點不成功。

  1. 48.34
  • 15:52:22,840 開始搶占partition 44 - 49
  • 15:52:31,473 重試四次失敗
  1. 48.37
  • 15:52:24,032 准備開始搶partition 50 - 55
  • 15:53:05,034 釋放Release partition 44-49 ownership

Consumers 實例 Rebalance的過程

  • Stop fetcher,停止向原有的partition拉取數據
  • 先釋放自己已有的partition owner
  • 去爭取自己被分配的新的partition,若創建臨時節點成功,則成功。
  • 重新啟動新的fetcher拉取數據

3.解決辦法

為Kafka支持ols.kafka.property.zookeeper.session.timeout.ms=30000等屬性。
在workflow.xml文件中,CfgString支持以 ols.kafka.property. 開頭的屬性.

用戶修改程序的2個步驟

  1. 修改pom.xml的OLS_Yarn依賴為 0.2.2.2
<dependency>
    <groupId>com.sina.mis</groupId>
    <artifactId>OLS_Yarn</artifactId>
    <version>0.2.2.2</version>
</dependency>
  1. 提交的workflow.xml在 添加
ols.kafka.property.rebalance.backoff.ms=5000,ols.kafka.property.rebalance.max.retries=10

8 總結

consumer rebalance失敗是0.8版本的bug,在0.9以后,這個模塊由組件Coordinator負責,能夠保證rebalance成功。

從0.10.0-src來看,ZookeeperConsumerConnector已經重構了,新增了ConsumerCoordinator。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM