前提
本文的分析基於kafka 0.9版本的client, 0.10.1.0
中已經修改心跳線程為后台線程,並支持設置max.poll.records
,參見ChangeLog。
使用場景
Kafka是一個高吞吐量的分布式消息系統,在APM的移動端請求數據的處理中,使用了Kafka。Kafka數據使用多線程阻塞的方式進行消費,即每個線程通過poll()
的形式消費一個或者多個partition
, 每次得到的消息集處理完成之后才會繼續進行下一次poll()
操作,同時使用了自動提交offset
的模式。Rebalance發生的原因有可能是集群的問題,但大部分都在客戶端,一旦服務端在設定的超時時間內沒有收到消費者發起的心跳,則認為這個消費者已經死掉,就會執行Rebalance動作。
從源碼上,我們一路從KafkaConsumer.poll(timeout)
跟進來可以看到
/**
* Do one round of polling. In addition to checking for new data, this does any needed
* heart-beating, auto-commits, and offset updates.
* @param timeout The maximum time to block in the underlying poll
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
...
// 上面是一些檢查動作
fetcher.initFetches(cluster);
client.poll(timeout);
return fetcher.fetchedRecords();
}
從注釋中,我們可以看出poll動作會發出一些列的心跳、自動offset提交和更新的動作。這是我們設定了自動提交的時候,我們的消費者發出心跳和offset的地方。
再進client.poll(timeout)
方法中可以看到
//ConsumerNetworkClient.java
private void poll(long timeout, long now, boolean executeDelayedTasks) {
...
//一些前置的判斷
// execute scheduled tasks
if (executeDelayedTasks)
delayedTasks.poll(now);
...
//其他動作
}
從源碼里面可以看到會吧delayedTask里面的所有任務執行掉,其中就有我們的心跳任務。 那么,很明顯,如果我們在兩次poll()
調用的間隔做了太多的事情,也就是消費拉取下來的數據花了過長的時間,而沒有及時發出心跳,則我們會被判定為死掉的節點,這個時候集群就會發起Rebalance。
Rebalance有什么影響
Rebalance本身是Kafka集群的一個保護設定,用於剔除掉無法消費或者過慢的消費者,然后由於我們的數據量較大,同時后續消費后的數據寫入需要走網絡IO,很有可能存在依賴的第三方服務存在慢的情況而導致我們超時。
Rebalance對我們數據的影響主要有以下幾點:
- 數據重復消費: 消費過的數據由於提交offset任務也會失敗,在partition被分配給其他消費者的時候,會造成重復消費,數據重復且增加集群壓力
- Rebalance擴散到整個ConsumerGroup的所有消費者,因為一個消費者的退出,導致整個Group進行了Rebalance,並在一個比較慢的時間內達到穩定狀態,影響面較大
- 頻繁的Rebalance反而降低了消息的消費速度,大部分時間都在重復消費和Rebalance
- 數據不能及時消費,會累積lag,在Kafka的TTL之后會丟棄數據
上面的影響對於我們系統來說,都是致命的。
我們遇到Rebalance的場景
首先為了看下我們的rebalance有多么嚴重,我們增加了ConsumerRebalanceListener
,並計算Rebalance發生的頻率,同時將Rebalance的信息上報到監控平台上。
我們可以看到,Rebalance出現的非常頻繁,一旦開始Rebalance則通常是多個機器多個消費線程同時開始Rebalance,並在一定時間后達到穩定。
同時加了一些日志看看每個partition rebalance需要多長的時間,每個partition rebalance完成都需要20秒左右(當然有些partition會被rebalance到其他消費者去,因為沒有響應partition的成對的開始和結束日志),可想而知很頻繁的rebalance會有很嚴重的問題。
2017-05-15 18:00:20,343-consumer:consumer-pool-3 ,topic:foreground.-_-.apm.online,partition:37 rebalance end, rebalance time:14818...
2017-05-15 18:00:20,343-consumer:consumer-pool-3 ,topic:foreground.-_-.apm.online,partition:36 rebalance end, rebalance time:14818...
2017-05-15 18:00:20,344-consumer:consumer-pool-9 ,topic:foreground.-_-.apm.interaction.online,partition:7 rebalance end, rebalance time:20035...
2017-05-15 18:00:20,344-consumer:consumer-pool-9 ,topic:foreground.-_-.apm.interaction.online,partition:6 rebalance end, rebalance time:20035...
2017-05-15 18:00:20,344-consumer:consumer-pool-8 ,topic:foreground.-_-.apm.interaction.online,partition:11 rebalance end, rebalance time:20322...
2017-05-15 18:00:20,344-consumer:consumer-pool-8 ,topic:foreground.-_-.apm.interaction.online,partition:10 rebalance end, rebalance time:20322...
2017-05-15 18:00:20,344-consumer:consumer-pool-1 ,topic:foreground.-_-.apm.web.online,partition:8 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-1 ,topic:foreground.-_-.apm.web.online,partition:9 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-6 ,topic:foreground.-_-.apm.diagnose.online,partition:22 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-6 ,topic:foreground.-_-.apm.diagnose.online,partition:23 rebalance end, rebalance time:15162...
常見且簡單的Rebalance場景
我們的業務數據會寫入Hbase,最經典的場景就是Hbase集群服務抖動或者我們寫入數據造成Hbase RegionServer過熱會造成消費到的消息過慢觸發心跳超時。這種場景下,我們可以在日志里面明顯看到Hbase寫入拋出的異常。例如:
-
由於集群的抖動,導致我們無法正常寫入數據,會造成Rebalance
2017-03-08 14:31:45,593 411615 [htable-pool1-t99] (AsyncProcess.java:713) INFO org.apache.hadoop.hbase.client.AsyncProcess - #5, table=mam:MobileNetData, attempt=2/10 failed 56 ops, last exception: org.apache.hadoop.hbase.RegionTooBusyException: org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit, regionName=mam:MobileNetData,5852818ca18dc5fdf63bec8eded1db3c_9223370556190336483,1483673115059.19745fc99b3370aab016af1c8cc70d69., server=hbase8.photo.163.org,60020,1488738168459, memstoreSize=1077613864, blockingMemStoreSize=1073741824 at org.apache.hadoop.hbase.regionserver.HRegion.checkResources(HRegion.java:2937) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2249) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2216) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2220) at org.apache.hadoop.hbase.regionserver.HRegionServer.doBatchOp(HRegionServer.java:4478) at org.apache.hadoop.hbase.regionserver.HRegionServer.doNonAtomicRegionMutation(HRegionServer.java:3661) at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3550) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29949) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2027) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108) at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:110) at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:90) at java.lang.Thread.run(Thread.java:662)
-
由於寫入熱點的問題導致的Rebalance
2017-04-13 13:18:09,809 35209 [htable-pool10-t16] INFO org.apache.hadoop.hbase.client.AsyncProcess - #9, table=mam:MobileDiagnoseData, attempt=3/10 failed 86 ops, last exception: org.apache.hadoop.hbase.NotServingRegionException: org.apache.hadoop.hbase.NotServingRegionException: mam:MobileDiagnoseData,7d56fba948c044a0c7b95709a1d9084e_9223370546420477958_487c7012778ee696320ac91264d33fd1,1491464751441.e2d1589b4b227a56789cf4a5e0d6ec21. is closing at org.apache.hadoop.hbase.regionserver.HRegion.startRegionOperation(HRegion.java:5906) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2254) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2216) at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2220) at org.apache.hadoop.hbase.regionserver.HRegionServer.doBatchOp(HRegionServer.java:4478) at org.apache.hadoop.hbase.regionserver.HRegionServer.doNonAtomicRegionMutation(HRegionServer.java:3661) at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3550) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29949) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2027) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108) at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:110) at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:90) at java.lang.Thread.run(Thread.java:745) on hbase12.xs.163.org,60020,1491464637625, tracking started Thu Apr 13 13:18:06 CST 2017, retrying after 3004 ms, replay 86 ops.
上面兩種都是很容易解決的Rebalance,這種處理起來也簡單,從發生的源頭,要么查看是否集群出現問題,要么解決一下寫入熱點問題就可以了。
然而難解決的是沒有出錯日志,但是依舊會頻繁的Rebalance。
需要進一步分析的異常
為了減少消費Kafka后寫入到Hbase的數據不會產生明顯的峰谷,我們采取了限流的策略,在寫入Hbase的時候,使用RateLimiter
獲取令牌后寫入到Hbase中,這加劇了Rebalance問題的產生,因為每次消費的時間會加上等待令牌產生的時間。 從統計上來看,左邊是增加了限流后的Rebalance情況,右側這部分紅框中,則是去掉了限流之后的情況,21點左右Rebalance情況減少。
前面提到了Rebalance的原因就是同步消費poll()
操作得到的數據的時間過長導致的,我們解決了這些簡單的Case之后,發現還是很經常發生Rebalance,同時為什么限流會加劇Rebalance,只能增加日志來看poll()
動作得到的數據的消費時間到底是多長。
這個日志中,poll record size
是每次渠道的數據的條數,consumer time
是消費這部分數據的時間,poll interval
是消費線程兩次poll的間隔。
從日志中,我們可以看到每次poll,在數據較多時會poll到1w多條數據,消費時間是3秒多,兩次poll之間的間隔是12秒左右。而我們的參數中heartbeat.interval.ms
設置的是10秒,而session.timeout.ms
設置的是30秒,兩次poll間隔12秒顯然已經超出了心跳的10秒的間隔。從左邊的日志也可以看出,集群發生了rebalance。
上述的日志中,我們有兩個疑問:
- 能否調整每次拉取到的數據的條數,條數少一些,每次消費也會比較快一些
- 消費的時間只有3秒多,總的一次poll加上消費的時間竟然達到了12秒,
poll(timeout)
我們指定了超時時間為1秒,中間也沒有其他操作了,所以只能懷疑poll動作有問題。
對於第1個問題,kafka0.9版本沒有設置每次獲取的數據條數的參數,在0.10版本中新增了,但是因為集群是0.9,所以這個暫時也沒有辦法,對於第2個問題,我們先加一些日志看看poll動作
顯然poll()的超時時間已經超過1秒。
我們再看一下poll()和超時相關的代碼
@Override
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
//循環退出條件是超過了超時時間
do {
//傳入的也是剩余的超時時間
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
fetcher.initFetches(metadata.fetch());
client.quickPoll();
return new ConsumerRecords<>(records);
}
//減去這次poll花掉的時間
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
poll()
會進行多次的pollOnce()
,直到時間用盡。從這個代碼片段來看,並不可能出現大於兩倍的超時時間的情況,所以我們設定超時時間1秒,但是實際的poll()
調用在2秒以內是正常的,可能是做了兩次pollOnce()
動作,但是日志中的7秒+和8秒+的調用時間是比較奇怪的。我們繼續跟蹤了代碼。最終跟蹤到了org.apache.kafka.common.network.Selector
,最終poll()的超時邏輯會走到
private int select(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (ms == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(ms);
}
可以看到這個超時時間只是nio中select的超時時間,並不包含讀取數據的時間,所以從kafka集群讀取數據的時間過長會導致單次poll的時間超長,感覺與超時的語義不符,應該保證在設定的timeout時間內返回設計才比較合理。這個受集群負載影響,不是我們所能控制的。
回到上面的問題,為什么正常消費也會產生Rebalance以及為什么限流會加劇Rebalance也就有了解釋,因為poll()的時間會受集群影響,導致單次poll的時間超長,限流則因為限流獲取令牌的等待時間會導致單次消費時間較長,加劇了Rebalance。
解決方案
那么我們如何解決這種問題呢,我們期待的結果是可以Rebalance,但是不應該因為一個消費者消費較慢或者突然的波動,而影響整個集群,導致整個集群Rebalance。很容易想到的方案就是單獨控制心跳任務,讓Kafka集群知道消費者還活着,但是如果依舊采用目前的消費模式,是做不到的。不過還是有解決方案的,那就是關閉offset的自動提交,由我們手動的管理offset的提交和心跳。方案網上有現成的,可以使用Spring-Kafka
,參考簡書上的文章, 當然也可以自己編碼實現,因為項目的緊迫性,我們使用了現成的方案。
我們在另一個業務里面先進行了驗證性的部署,修改后的效果可以如圖中所示:
上線前晚上的rebalance情況,高峰時期,數據量也較多,Kafka集群負載也較大
然后我們在第二天下午15點上線了新的Kafka消費者邏輯,可以看到晚上高峰時期也沒有Rebalance了
優缺點
陸陸續續經過好久,終於基本解決了Kafka消費者不斷的Rebalance的問題,好處很明顯,可以解決上述的會導致我們數據的各種問題,集群也不會因為某一個消費線程比較慢而影響整個集群,這是一個惡性循環的過程。但是也有缺點,比如出現過消費線程出現問題,心跳卻在繼續,數據不會被消費,也不會進行Rebalance。原因還在分析中,但是這種現象畢竟非常少數,可以通過報警和手動操作的方式進行重啟等操作。
網易雲新用戶大禮包:https://www.163yun.com/gift
本文來自網易實踐者社區,經作者丁偉偉授權發布。