在使用了最新版的 kafka-python 1.4.6 在 broker 對 topic 進行默認配置的情況下報出類似錯誤
CommitFailedError CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the rebalance timeout with max_poll_interval_ms, or by reducing the maximum size of batches returned in poll() with max_poll_records.
這里要申明一點,在 1.4.0 以上的 kafka-python 版本使用了獨立的心跳線程去上報心跳。
這里報錯大概表達的意思是 無法在默認 300000ms 中完成處理操作。我們通常會一次性 poll 拉默認 500 條數據下來。我們需要在 300s 中完成 500 條數據的處理。如果不能完成的話就可能會觸發這個問題。
因為這個報錯的提示寫得非常清楚,所以我們先按這個方向去嘗試處理這個問題。首先調高了我們的 max_poll_interval_ms 的時間,但是無效。
然后 records 的條數減少,依然無效,該報錯還是會報錯。這不禁讓我懷疑觸發這個問題的是否並非這里報錯建議的那些地方。
所以我把目光放到了 broker 日志那邊去,想看下到底是因為什么原因導致爆出類似錯誤。
在日志上發現了一些日志,對應的 consumer 在反復的 rebalance:
[2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-05ed83f1-aa90-4950-b097-4cf467598082 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-f7826720-fef7-4b02-8104-d1f38065c2fe in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Preparing to rebalance group sync_group_20180321 with old generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-ac5f6aff-3600-4e67-a529-31674c72b1e4 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,716] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1091 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,721] INFO [GroupCoordinator 0]: Assignment received from leader for group sync_group_20180321 for generation 1091 (kafka.coordinator.group.GroupCoordinator)
參考 sentry 打出來的錯誤,我們可以認為這和 sentry 爆出來的問題有直接關系。因此我們要從另外一個角度去思考一下為什么我的 max_poll_interval_ms 已經調高並且每次拉取處理條數降低卻依然會報出此問題,並且頻繁觸發 rebalance 。
kafka-python 在 1.4.0 版本分離了心跳線程和 poll 主線程。我的第一反應就是會不會因為 poll 線程阻塞了心跳線程的切換,或者引起了某種死鎖從而導致心跳線程無法正確的發起心跳。最后導致 broker 認為 group 已經死亡然后主動觸發了 rebalance .
然后我去 kafka-python 的 gihub 搜索了一下類似問題,馬上就發現了有不少人都有這個問題。
https://github.com/dpkp/kafka-python/issues/1418
從中找到一些有趣的信息,比如來自
I am seeing consumer rebalances even if there is no messages to consume. Start three consumers in a group and send some messages to topic and after that stop the producer. The consumer will start seeing rebalances after 5-6mins. Sample code here: https://stackoverflow.com/questions/54908902/kafka-consumer-rebalance-occurs-even-if-there-is-no-message-to-consume
他說即使在沒有消息可以消費的情況下,也可以看到 kafka consumer 在過了 5 - 6 mins 之后開啟了 rebalance 。
這就跟我們的問題非常相似,我們並不是 process 的過程消耗的時間過長而觸發了 rebalance 而是有可能是因為消費得太快,導致有些消費者處於 空 poll 的狀態從而阻塞了心跳線程。客觀來說,我目前還會報出這個問題的 topic 有多達 50 個partitions,我開啟了5個消費者對其進行消費,平均一個消費者需要消費 10 個parititons 。如果有某個 partitions 長期沒有消費過來我們可能會被阻塞在那里最終導致 heartbeat 超時。 1.4.6 的客戶端默認 10s 沒心跳就超時,而發送間隔僅為 3s 。也就是連續三個周期沒有發送就超時了。
下面看到 dpkp 的一個回復,表達了有可能就是被 poll 主線程阻塞,的問題,並且有 workaround 可以用來避免這種情況:
vimal: thanks for posting. I believe you may be hitting lock contention between an idle client.poll -- which can block and hold the client lock for the entire request_timeout_ms -- and the attempt by the heartbeat thread to send a new request. It seems to me that we may need to use KafkaClient.wakeup() to make sure that the polling thread drops the lock if/when we need to send a request from a different thread. This shouldn't be an issue when messages are flowing through your topics at a steady rate. If this is just a test environment, and you expect your production environment to have more steady live data, then you could just ignore the error in testing. But if you are managing a topic w/ very low traffic -- delays of minutes between consecutive messages, for example -- you might try to reduce the request_timeout_ms to something closer to the heartbeat_interval_ms, which should prevent the read side from blocking for much longer than the heartbeat timeout. But note that other timeouts may also need to change (max_poll_interval_ms and session_timeout_ms perhaps). Another workaround might be to reduce metadata_max_age_ms to something close / equal to your heartbeat_timeout_ms. This will cause more frequent metadata requests, but should unblock the send side when there is no socket data available for reads.
dpkp 的觀點在於,如果我們數據發送過來的頻率是穩定的,消費者是正好可以消費完隊列里面的信息的情況的時候,不應該出現這樣的問題。出現這樣的問題與我們預期和看到報錯的情況可能恰恰相反,不是我們消費得太慢,而是我們消費得太快,並且生產者發送消息的頻率過低導致的。在 poll 不到消息的時候,主線程可能會面臨阻塞,而無法及時切換到心跳線程進行心跳的發送,最終導致了這個問題。
他給到一個 trick 的方法來解決這個問題,當面臨這種情況的時候我們可以把 metadata_max_age_ms 調整到和心跳發送頻率差不多 close / equal to our heartbeat_timeout_ms.
發送 metadata_request 會解除我們發送端的阻塞,從而達到抑制死鎖的效果。
self.kafka = kafka.KafkaConsumer( auto_offset_reset=auto_offset_reset, bootstrap_servers=['10.171.97.1:9092', '10.163.13.219:9092', '10.170.249.122:9092'], group_id=group_id, metadata_max_age_ms=metadata_max_age_ms ) self.kafka.subscribe(topics)
嘗試補充了 metadata_max_age_ms 大約 3000 ms ,這個問題得到了很大程度的解決和緩解。
既然確定了可能是因為消費太快,然后生產慢導致的主線程鎖住的問題,剩下可以驗證一下是否真的是這樣。嘗試打日志看一下切換線程發送心跳的情況可以來確認該問題是否如此。
另外看代碼發現 poll 主線程在 poll 之前會默認會進行 wakeup() 但是在 1.4.6里面也因為之前的 某個 bug 而默認關閉了,不知道是否有影響,等后續測試之后補上。
Reference:
https://github.com/dpkp/kafka-python/issues/1418 Heartbeat failed for group xxxWorker because it is rebalancing
https://github.com/dpkp/kafka-python/issues/1760 [1.4.5] KafkaProducer raises KafkaTimeoutError when attempting wakeup()
https://www.cnblogs.com/huxi2b/p/6815797.html Kafka 0.11版本新功能介紹 —— 空消費組延時rebalance