什么是 Rebalance?
Rebalance 為什么會發生?
Rebalance 的過程
記得之前在一段時間密集面試的時候總會問候選人這些問題。
什么是 Rebalance
重平衡 Rebalance 就是讓整個 Consumer Group 下的所有的 Consumer 實例久如何消費訂閱主題的所有分區達成共識的過程。在 Rebalance 的過程中,所有 Consumer 實例都需要參與進來,在 Coordinator 的幫助下完成分配。所以可以很明顯的回答上面的第三個問題,在 Rebalance 的時候是無法進行消費的持續消費的。就可能會造成隊列的段時間阻塞。
Rebalance 為什么會發生
1. 我們發現線上處理能力不夠了,需要向 group 中增加新的 consumer ,就會觸發 Rebalance 。任何新成員的「進入」和「離開」都會觸發 Rebalance。消費會停下來重新給所有 Consumer 分配對應的 partiitons.
2.我們為 topic 增加分區,比如原本一個 topic 只有 10個分區,后因為性能問題需要擴展增加到 20 個分區就會發生 Rebanlace.
大的情況可以分為這兩種,其實第一種新成員 「進入」 和 「離開」還可以細講一下,因為 90% 以上的離開和進入都不是我們想要的結果。他有可能是消費超過超時時間被一腳踢出了 group 造成離開從而造成 Rebalance 。然后又因為踢出之后又去請求又意外的加入 group 從而繼續引發 Rebalance 往復循環。
我們的消費者在與 broker 進行溝通的時候都是與一個叫 Coordinator 的組件進行交互, Coodinator 是專門負責管理消費者組 加入離開和位移的組件。
那么什么情況下 Coordinator 會認為某個 Consumer 實例掛了需要退組呢?
當 Rebalance 完成之后,我們的每個 Consumer 都會向 Coordinator 定時發送心跳,以表明客戶端還活着。
如果某個 Consumer 沒有按照約定好的規則發送請求給 Coordinator ,Coordinator就會認為這個 Consumer 已經掛了,並將其一腳踢出 Group 然后通過心跳包 response 組內其他 Consumer 盡快開始 Rebalance。這里有一點需要注意,Consumer 心跳參數 heartbeat_interval_ms 會在一個 session_timeout 周期內決定是否 Consumer 已經離開。
打個比方,如果我們的 session_timeout_ms 設置為默認的 10s 那么如果心跳三次 heartbeat_interval_ms 都失敗那么就會讓 Coordinator 開啟重平衡。而 Coordinator 通知組內其他消費者的辦法也是通過消費者發送的心跳,在 broker 對他進行 response 的時候塞進 REBALANCE_NEEDED 標志位,通知他們進行 rebalance。
Kafka broker 端有個日志叫 server.log ,在這個日志中我們可以看到 GroupCoordinator 的身影
[2019-07-17 15:20:37,266] INFO [GroupCoordinator 0]: Preparing to rebalance group answer_action with old generation 741 (__consumer_offsets-9) (kafka.coordinator.group.GroupCoordinator) [2019-07-17 15:20:37,266] INFO [GroupCoordinator 0]: Group answer_action with generation 742 is now empty (__consumer_offsets-9) (kafka.coordinator.group.GroupCoordinator) [2019-07-17 15:21:37,662] INFO [GroupCoordinator 0]: Preparing to rebalance group answer_action with old generation 742 (__consumer_offsets-9) (kafka.coordinator.group.GroupCoordinator) [2019-07-17 15:22:07,663] INFO [GroupCoordinator 0]: Stabilized group answer_action generation 743 (__consumer_offsets-9) (kafka.coordinator.group.GroupCoordinator) [2019-07-17 15:22:07,664] INFO [GroupCoordinator 0]: Assignment received from leader for group answer_action for generation 743 (kafka.coordinator.group.GroupCoordinator) [2019-07-17 15:22:37,664] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-3feb5deb-de82-40a4-8da9-1c6ec7d1ca4f in group answer_action has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
從第一條開始可以看到是 preparing to rebalance ,group-id 是 answer_action
但是大家注意
[2019-07-17 15:22:37,664] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-3feb5deb-de82-40a4-8da9-1c6ec7d1ca4f in group answer_action has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
因為 group 中其中有一個 consumer 的異常,被 coordinator 踢出了 group 。
所以平時 Kafka 出現的 Rebalance 相關的異常我們可以很容易的在日志中發現,並且可以得到一些細節。
Consumer 端中有一個參數
session_timeout_ms (int): The timeout used to detect failures when using Kafka's group management facilities. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms. Default: 10000
Python 社區客戶端 kafka-python 該參數的默認值是 10s 。
如果 Coordinator 在 10s 內沒有收到 Group 下 Consumer 的實例心跳,就會認為這個 Consumer 已經掛了,就會觸發 Rebalance。
除了 session.timeout.ms 還有一個控制發送心跳周期的參數 heartbeat.interval.ms
heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session_timeout_ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000
heartbeat.interval.ms 的默認周期是 3s。這里社區客戶端作者也描述得非常清楚(社區客戶端的文檔真的很好!)應該設置這個參數小於 session_timeout_ms ,通常大家會推薦一個公式為
heartbet_inverval_ms * 3 = session_timeout_ms
為什么要這么做?
因為 session_timeout_ms 到期如果 coordinator 沒有收到心跳會認為客戶端死了,如果按照上述的配置,期間客戶端至少有三次時間訪問到 coordinator 並且刷新過期時間。這樣如果中間有 1 2 次因為網絡問題沒有發送成功的情況也可以一定程度避免。
心跳時間不超過 session_timeout_ms 的值,但是也不應該過長,過長可能會引起 Rebalance 的檢測緩慢且失去效果。如果有一個 consumer 失效了,如果他無法再恢復我們要做的迅速讓他被踢出 group 中。否則嚴重的話可能會引起 partitions 的數據傾斜,lag 也會越來越大。
除了
session.timeout.ms
heartbeat.interval.ms
還有兩個參數從客戶端角度去控制 Rebalance
max_poll_records (int): The maximum number of records returned in a single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
max_poll_interval_ms (int): The maximum delay between invocations of :meth:`~kafka.KafkaConsumer.poll` when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If :meth:`~kafka.KafkaConsumer.poll` is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. Default 300000
max_poll_records 控制我們一次從 broker 請求多少數據過來,默認是 500 條。
max_poll_interval_ms 控制兩次 poll 之間的時間間隔,如果我們請求過來的 500 條消息 300 s 都還沒有消費完,沒有繼續調用 poll 那么 consumer 會自動向 coordinator 發出消息要求把自己踢出組內。coordinator 收到消息會開始新一輪的 Rebalance。
這里關於提交的時間還有一個需要被注意的地方。在社區 Kafka-Python 1.4.0 以下的版本中(broker 0.10.1 之前),心跳是跟隨 poll 一起發送的。並沒有啟動一個 background 獨立的線程去發送心跳包。這會造成一個什么問題呢?
之前如果我們拉取一批消息開始處理,他如果超過了設置的 session.timeout.ms 也就是 默認的 10s ,那么就會被觸發 rebalance 。因為如果你不調用 poll 方法,你就無法發送心跳。Coodinator 無法收到心跳就會按照約定把你踢出 group 然后進行 rebalance .
Java 版本在發布了該功能之后,社區 kafka 版本從 1.4.0 之后開始支持了 background thread 處理心跳。詳情可以參閱 reference。
請注意 max_poll_inerval_ms 這個參數是1.4.0版本以上的參數 1.3 的最后一個版本 1.3.5 不會有該參數的存在。所以如果使用 1.3.5 版本及其以下版本需要自己保證 單次拉取數據 的處理時間 < session_timeout_ms ,否則就會不停的觸發 Rebalance 導致程序重復消費,嚴重可能引起死循環崩潰。
另外 1.3.5 及其以下版本中 session.timeout.ms 的默認值是 30s 並非現在版本的 10s ,時間比較長也是為了避免長時間處理引發的 Rebanlance ,老版本心跳的時間還是 3s 。
所以如果有要阻塞處理的任務,比如 retry 比如調用很多數據庫操作,我們可以把 max_poll_interval_ms 的時間設得長一些,或者把需要處理的消息 max_poll_records 設置得少一些。這樣至少不會因為正常的業務處理得慢而造成 kafka 頻繁 Rebalance 從而引起其他的問題。
Rebalance 的過程
現在我們倒頭回來聊聊 Rebalance 的過程。rebalance 的前提是當前消費者組 Coordinator 已經確定的情況。
在該 Consumer group 組有 Consumer 第一次請求的時候就會被分配 Coordinator broker .
計算方法為:
offset_partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
得到的分區的 leader 就是對應的 Coordinator
e.g.
public static void main(String[] args) {
String c = "illidan-c";
System.out.println(Math.abs(c.hashCode() % 10));
}
這里我的 nums.partitions 是 10 所以這里我得到的數是 9 然后查看 __consumer_offsets 9 的 leader 是 broker 0
然后我手動重啟進程對其進行 rebalance 得到日志輸出
[2020-01-06 12:11:23,808] INFO [GroupCoordinator 0]: Preparing to rebalance group illidan-c with old generation 158651 (__consumer_offsets-9) (kafka.coordinator.group.GroupCoordinator)
[2020-01-06 12:11:30,650] INFO [GroupCoordinator 0]: Member kafka-python-1.4.7-d03089db-9df1-4464-bf06-4968df80bfa6 in group illidan-c has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-01-06 12:11:30,960] INFO [GroupCoordinator 0]: Member kafka-python-1.4.7-50c01dbc-eab5-41aa-823b-3530d32f845d in group illidan-c has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-01-06 12:11:30,960] INFO [GroupCoordinator 0]: Stabilized group illidan-c generation 158652 (__consumer_offsets-9) (kafka.coordinator.group.GroupCoordinator)
[2020-01-06 12:11:30,979] INFO [GroupCoordinator 0]: Assignment received from leader for group illidan-c for generation 158652 (kafka.coordinator.group.GroupCoordinator)
[2020-01-06 12:11:40,980] INFO [GroupCoordinator 0]: Member kafka-python-1.4.7-0094bb59-7cc1-4c80-9020-7c10c61bae98 in group illidan-c has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-01-06 12:11:40,980] INFO [GroupCoordinator 0]: Preparing to rebalance group illidan-c with old generation 158652 (__consumer_offsets-9) (kafka.coordinator.group.GroupCoordinator)
可以很清楚的看到即 consumer_offsets-9 作為 coordinator 的機器 broker 0 重新開始 Rebalance
之后該 group 組內的消費者都與此 coordinator 進行通信。
這里要再次強調一下。 關於 coordinator 是分為 broker 端 group coordinator 組件和消費者組這邊的 coordinator leader 同步方案分配制定人的。
通常意義上我們說的 coordinator 一般指的是 broker 端的 group coordinator 組件。
Rebalance 的時候分為兩步
1. Join 加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求入組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——注意leader和coordinator不是一個概念。leader負責消費分配方案的制定。
2. Sync 這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之后會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。
上面提到的當 response 發回,那么我們需要等到所有消費者都確認了 SyncGroup 才會開始 response ,kafka 會將已經先收到的請求放到一個 purgatory 的地方。
總結一下就是當所有消費者成員都發送了 Join 加入組消息之后,broker 端 coordinator 會選舉一個 consumer 擔任 leader 角色。
這個 leader 要做的事情就是接收 coordinator 發送的成員信息及成員訂閱 topic 信息並為這個消費者組內的消費者消費什么 parititons 制定分配方案和策略。
當方案制定完成之后,ledaer 會將方案通過 SyncGroup 請求發送回 coordinator ,這時非 leader 的 consumer 也會發送 SyncGroup 請求給 coordinator 然后 coordinator 將接收到的 leader 發來的分配方案通過 SyncGroup 的 response 發送回各 consumer 從而來完成 rebalance。
最后附上一張 rebalance 的狀態機圖片。
Reference:
https://www.cnblogs.com/huxi2b/p/6223228.html Kafka消費組(consumer group)
https://matt33.com/2018/01/28/server-group-coordinator/ Kafka 源碼解析之 GroupCoordinator 詳解(十)
https://github.com/dpkp/kafka-python/releases
https://github.com/dpkp/kafka-python/pull/1266 KAFKA-3888 Use background thread to process consumer heartbeats
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread KIP-62: Allow consumer to send heartbeats from a background thread