kafka-mirror不穩定問題分析與解決方法


     前段時間,線上環境的kafka多集群在采用mirror組件進行跨機房數據同步時,會偶爾出現hang住不穩定的情況:
1. 現象
   a. 線上出現返回包序號不一致的現象:"Correlationid for response (13502150) does not match request"而程序hang住,cpu飆高,同步服務停止工作
  b. 發生平均頻率:線上分3組group×2共6個實例,平均每天2個實例發生
  c. 類似線上問題請參考
      kafka-mirrormaker問題(https://issues.apache.org/jira/browse/KAFKA-1257)和kafka-producer問題(https://issues.apache.org/jira/browse/KAFKA-4669)

2. 原因
   a. kafka網絡協議背景
     kafka網絡協議設計保證連接的請求和響應均是有序的,即對於每個單獨的tcp連接下,可以保證發送請求和接收響應包序列均是有序的,同時每個發送請求包和響應包均有唯一遞增id關聯編號進行關聯:“The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well.”出自kafka-network官網介紹;
   b. mirrormaker同步判斷成功與否邏輯
    mirrormaker同步給目標kafka集群的每個數據request包均會擱在本地內存池里,直到收到相同CorrelationId的響應包,然后做兩種判斷: a. 發送成功,則銷毀內存池中的數據請求包,b. 發送失敗則數據包放回隊列重新進行發送;
   c. mirrormaker同步判斷線上bug原因
    而在判斷函數handleCompletedReceives中: 由於條件a,默認認為每個發送請求包和響應包id號是一致的,而並未處理兩者id號不一致的異常情況。所以一旦出現id編號不一致異常,則異常一直向上拋,而導致當前"發送請求包"並未得到任何響應處理,同時不會做內存釋放最終導致泄露;
  d. 目前確定0.8.×、0.9.×版本均會存在線上同樣問題

3. 修復方案
     修改mirror-maker中kafka-client-0.8.2的源碼: 增加出現了錯亂包的異常捕獲邏輯:把錯亂時的數據請求包扔回內存隊列進行重發。處理修改源碼如下:

 /**
 * Handle any completed receives and update the response list with the responses received.
 * @param responses The list of responses to update
 * @param now The current time
 */
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        int source = receive.source();
        ResponseHeader header = ResponseHeader.parse(receive.payload());
        int compared = 0;
        ClientRequest req = null;
        short apiKey = -1;
        do{
            req = inFlightRequests.fristSent(source);
            if(req == null){
            	break;
            }
            apiKey = req.request().header().apiKey();
            compared = compareCorrelationId(req.request().header(), header, source);
        	if (compared < 0 && apiKey != ApiKeys.METADATA.id) {
        		responses.add(new ClientResponse(req, now, true, null));
        	}
        	if (compared < 0 || compared == 0){
        		req = inFlightRequests.completeNext(source);
        	}
        }while(compared < 0);
        if(req == null || compared > 0){
        	log.error("never go to this line");
        	continue;
        }
        Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
        if (apiKey == ApiKeys.METADATA.id) {
            handleMetadataResponse(req.request().header(), body, now);
        } else {
            // need to add body/header to response here
            responses.add(new ClientResponse(req, now, false, body));
        }
    }
} 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM