kafka shutdown停止很慢問題
在數據量大的時候,consumer一次抓取數據的數據很多,進入到業務處理的數據可能有很多,
假設一次poll有1萬條數據進入業務程序,而且業務程序是和poll綁定在一起線程同步執行的,假設平均每條數據,執行業務程序花費100ms,
那么poll一次的數據,至少要執行 1w*0.1s = 1000s = 16.67分鍾。
所以,在數據量大的時候,停止一個線程(需要先等待業務程序處理完數據),可能要十幾分鍾。
shutdown問題解決方案
1、改成異步處理數據,consumer取出來的數據,放到BlockQueue中,由異步線程去處理,當異步線程處理不過來時,阻塞consumer,調用consumer.pause()方法avoid group management rebalance,代碼如下(來源於Spring-Kafka):
// avoid group management rebalance due to a slow consumer this.consumer.pause(this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()])); public void onPartitionsAssigned(Collection<TopicPartition> partitions) { this.assignedPartitions = partitions; }
2、如果是同步執行數據處理,考慮提高業務程序 處理數據的速度。
3、同步處理數據,但是改成手動提交offset,當shutdown的時候,poll的數據不需要全部處理,只需要記錄處理的位置即可。代碼示例如下:
list data = consumer.poll(); for(record: data) { if(shutdown) { // 收到shutdown命令后立即停止,未處理的數據將丟棄 break; } deal(record); saveTopicOffset(record); } submitDealtDataOffset();
另外,
Kafka停不掉shutdown關閉不了問題
原因是卡在了consumer.close()方法里面,它會提交offset信息,如果網絡中斷或者kafka服務器有問題導致提交不了offset,則consumer.close方法會一直卡住(不停的循環嘗試提交offset,永不中斷)。
參見:Kafka poll一直等待的bug:
https://issues.apache.org/jira/browse/KAFKA-4189?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC
https://issues.apache.org/jira/browse/KAFKA-3172?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC
解決方法:目前還沒有好的辦法,只能將offset的自動提交改成手動提交offset。但是,我寫了一個程序可以在調用consumer.close后將線程強行殺死,作為臨時解決方案。