前段時間,線上環境的kafka多集群在采用mirror組件進行跨機房數據同步時,會偶爾出現hang住不穩定的情況:
1. 現象
a. 線上出現返回包序號不一致的現象:"Correlationid for response (13502150) does not match request"而程序hang住,cpu飆高,同步服務停止工作
b. 發生平均頻率:線上分3組group×2共6個實例,平均每天2個實例發生
c. 類似線上問題請參考
kafka-mirrormaker問題(https://issues.apache.org/jira/browse/KAFKA-1257)和kafka-producer問題(https://issues.apache.org/jira/browse/KAFKA-4669)
2. 原因
a. kafka網絡協議背景
kafka網絡協議設計保證連接的請求和響應均是有序的,即對於每個單獨的tcp連接下,可以保證發送請求和接收響應包序列均是有序的,同時每個發送請求包和響應包均有唯一遞增id關聯編號進行關聯:“The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well.”出自kafka-network官網介紹;
b. mirrormaker同步判斷成功與否邏輯
mirrormaker同步給目標kafka集群的每個數據request包均會擱在本地內存池里,直到收到相同CorrelationId的響應包,然后做兩種判斷: a. 發送成功,則銷毀內存池中的數據請求包,b. 發送失敗則數據包放回隊列重新進行發送;
c. mirrormaker同步判斷線上bug原因
而在判斷函數handleCompletedReceives中: 由於條件a,默認認為每個發送請求包和響應包id號是一致的,而並未處理兩者id號不一致的異常情況。所以一旦出現id編號不一致異常,則異常一直向上拋,而導致當前"發送請求包"並未得到任何響應處理,同時不會做內存釋放最終導致泄露;
d. 目前確定0.8.×、0.9.×版本均會存在線上同樣問題
3. 修復方案
修改mirror-maker中kafka-client-0.8.2的源碼: 增加出現了錯亂包的異常捕獲邏輯:把錯亂時的數據請求包扔回內存隊列進行重發。處理修改源碼如下:
/**
* Handle any completed receives and update the response list with the responses received.
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
int source = receive.source();
ResponseHeader header = ResponseHeader.parse(receive.payload());
int compared = 0;
ClientRequest req = null;
short apiKey = -1;
do{
req = inFlightRequests.fristSent(source);
if(req == null){
break;
}
apiKey = req.request().header().apiKey();
compared = compareCorrelationId(req.request().header(), header, source);
if (compared < 0 && apiKey != ApiKeys.METADATA.id) {
responses.add(new ClientResponse(req, now, true, null));
}
if (compared < 0 || compared == 0){
req = inFlightRequests.completeNext(source);
}
}while(compared < 0);
if(req == null || compared > 0){
log.error("never go to this line");
continue;
}
Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
if (apiKey == ApiKeys.METADATA.id) {
handleMetadataResponse(req.request().header(), body, now);
} else {
// need to add body/header to response here
responses.add(new ClientResponse(req, now, false, body));
}
}
}
