kafka shutdown停止關閉很慢問題的解決方案



kafka shutdown停止很慢問題


在數據量大的時候,consumer一次抓取數據的數據很多,進入到業務處理的數據可能有很多,

假設一次poll有1萬條數據進入業務程序,而且業務程序是和poll綁定在一起線程同步執行的,假設平均每條數據,執行業務程序花費100ms,

那么poll一次的數據,至少要執行 1w*0.1s = 1000s = 16.67分鍾。

所以,在數據量大的時候,停止一個線程(需要先等待業務程序處理完數據),可能要十幾分鍾。

 

shutdown問題解決方案

1、改成異步處理數據,consumer取出來的數據,放到BlockQueue中,由異步線程去處理,當異步線程處理不過來時,阻塞consumer,調用consumer.pause()方法avoid group management rebalance,代碼如下(來源於Spring-Kafka):

// avoid group management rebalance due to a slow consumer
this.consumer.pause(this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()]));

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    this.assignedPartitions = partitions;
}


2、如果是同步執行數據處理,考慮提高業務程序 處理數據的速度。


3、同步處理數據,但是改成手動提交offset,當shutdown的時候,poll的數據不需要全部處理,只需要記錄處理的位置即可。代碼示例如下:

list data = consumer.poll();
for(record: data) { 
    if(shutdown) {  // 收到shutdown命令后立即停止,未處理的數據將丟棄
        break;
    }
   deal(record);
   saveTopicOffset(record);
}
submitDealtDataOffset();



另外,

Kafka停不掉shutdown關閉不了問題

原因是卡在了consumer.close()方法里面,它會提交offset信息,如果網絡中斷或者kafka服務器有問題導致提交不了offset,則consumer.close方法會一直卡住(不停的循環嘗試提交offset,永不中斷)。


參見:Kafka poll一直等待的bug:

https://issues.apache.org/jira/browse/KAFKA-4189?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC


https://issues.apache.org/jira/browse/KAFKA-3172?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC



解決方法:目前還沒有好的辦法,只能將offset的自動提交改成手動提交offset。但是,我寫了一個程序可以在調用consumer.close后將線程強行殺死,作為臨時解決方案。




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM