面試官:Kafka 如何優化內存緩沖機制造成的頻繁 GC 問題?


Jusfr 原創,轉載請注明來自博客園

Request 與 Response 的響應格式

Request 與 Response 都是以 長度+內容 形式描述, 見於 A Guide To The Kafka Protocol

Request 除了 Size+ApiKey+ApiVersion+CorrelationId+ClientId 這些固定字段, 額外的 RequestMessage 包含了具體請求數據;

Request => Size ApiKey ApiVersion CorrelationId ClientId RequestMessage
  Size => int32
  ApiKey => int16
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

Response 除了 Size+CorrelationId, 額外的 ResponseMessage 包含了具體響應數據;

Response => Size CorrelationId ResponseMessage Size => int32 CorrelationId => int32 ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse

處理序列化與反序列化需求

使用 MemoryStream

序列化 Request 需要分配內存, 從緩沖區讀取 Response 同理.

MemoryStream 是一個可靠方案, 它實現了自動擴容, 但擴容過程離不開字節拷貝, 而頻繁分配不小的內存將影響性能, 近似的擴容示例代碼如下:

// init Byte[] buffer = new Byte[4096]; Int32 offset = 0; 

//write bytes
Byte[] bytePrepareCopy = // from outside
if (bytePrepareCopy > buffer.Length - offset) {
Byte[] newBuffer = new Byte[buffer.Length * 2];
Array.Copy(buffer, 0, newBuffer, 0, offset);
buffer = newBuffer;
}
Array.Copy(bytePrepareCopy, 0, buffer, offset, bytePrepareCopy.Length);

數組擴容可以參見 List 的實現, 這里只是示意, 沒有處理長度為 (buffer.Length*2 - offset) < bytePrepareCopy.Length 的情況

在數組長度超4k 時,擴容成本非常高。如果約定“請求和響應不得超過4k“, 那么使用可回收(見下文相關內容)的固定長度的數組模擬 MemoryStream 的讀取和寫入行為, 能夠達到極大的性能收益。

KafkaStreamBinary (見於 github) 內部使用 MemoryStream, KafkaFixedBinary (見於 github) 則是基於數組的實現;

使用 BufferManager

使用過 Memcached 的人很容易理解 BufferManager 的思路: 為了降低頻繁開辟內存帶來的開銷,首先“將內存塊化”, 申請者獲取到“成塊的內存”, 被分配出去的內存塊標記為“已分配”; 與 Memcached 不同的是 BufferManager 期望申請者歸還使用完后的內存塊,以重新分配給其他申請操作。

System.ServiceModel.Channels.BufferManager 提供了一個可靠實現, 大致使用方式如下:

const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 32, maxBufferSize: size); Byte[] buffer = bm.TakeBuffer(1024); bm.ReturnBuffer(buffer);

與手動分配內容的性能對比

const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 10, maxBufferSize: size); 

var timer = new FunctionTimer();
timer.Push("BufferManager", () => {
Byte[] buffer = bm.TakeBuffer(size);
bm.ReturnBuffer(buffer);
});

timer.Push("new Byte[]", () => {
Byte[] buffer = new Byte[size];
});

timer.Initialize();
timer.Execute(100000).Print();

測試結果:

BufferManager
    Time Elapsed : 7ms CPU Cycles : 17,055,523 Memory cost : 3,388 Gen 0 : 2 Gen 1 : 2 Gen 2 : 2 new Byte[] Time Elapsed : 42ms CPU Cycles : 113,437,539 Memory cost : 24 Gen 0 : 263 Gen 1 : 2 Gen 2 : 2 
  • 過小的內容使用沒有使用 BufferManager 的必要,但BufferManager分配超過 4k 內存時性能下降明顯;
  • 最優情況是申請人獲取的內存塊大小一致,如果設置maxBufferSize = 4k,但 TakeBuffer(Int32 bufferSize) 方法使用的參數大於 4k,測試表明性能還不如手動創建 Byte 數組;
  • mono 的實現存在線程安全的問題;

強制要求業務使用的請求不超過4k 貌似做得到,但需求更大內存的場景總是存在,比如合並消息、批量消費等,Chuye.Kafka 作為類庫需要提供支持。

KafkaScalableBinary = BufferManager + Byte[][]

KafkaScalableBinary 並沒有發明新東西, 在其內部維護了一個 Dictionary<int32, byte[]=""> 保存一系列 Byte數組;

初始化時並未真正分配內存, 除非開始寫入;

public KafkaScalableBinary() : this(4096) { } 

public KafkaScalableBinary(Int32 size) {
if (size <= 0) {
throw new ArgumentOutOfRangeException("size");
}
_lengthPerArray = size;
_buffers = new Dictionary<Int32, Byte[]>(16);
}

寫入時先根據當前位置對數組長度取模 _position / _lengthPerArray 找到待寫入數組,不存在則分配新數組;

private Byte[] GetBufferForWrite() { var index = (Int32)(_position / _lengthPerArray); Byte[] buffer; if (!_buffers.TryGetValue(index, out buffer)) { if (_lengthPerArray >= 128) { buffer = ServiceProvider.BufferManager.TakeBuffer(_lengthPerArray); } else { buffer = new Byte[_lengthPerArray]; } _buffers.Add(index, buffer); } return buffer; }

然后根據當前位置對數組長度取整 _position % _lengthPerArray 找到目標位置;由於待寫入長度可能超過可使用長度,這里使用了 while 循環,一邊獲取和分配待寫入數組, 一邊將剩余字節寫入其中,直至完成;

public override void WriteByte(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } 
checked {
    var left = count;                                               <span class="co"><span class="hljs-comment"><span class="hljs-comment">//標記剩余量</span>
    <span class="kw"><span class="hljs-keyword"><span class="hljs-keyword">while</span> (left &gt; <span class="dv"><span class="hljs-number">0</span>) {
        var targetBuffer = GetBufferForWrite();                     <span class="co"><span class="hljs-comment"><span class="hljs-comment">//查找目標數組</span>
        var targetOffset = (Int32)(_position % _lengthPerArray);    <span class="co"><span class="hljs-comment"><span class="hljs-comment">//查找目標位置</span>
        <span class="kw"><span class="hljs-keyword"><span class="hljs-keyword">if</span> (targetOffset == _lengthPerArray - <span class="dv"><span class="hljs-number">1</span>) {                  <span class="co"><span class="hljs-comment"><span class="hljs-comment">//如果位置已經位於數組末尾, 說明位於起始位置;</span>
            targetOffset = <span class="dv"><span class="hljs-number">0</span>;
        }

        var prepareCopy = left;                                     <span class="co"><span class="hljs-comment"><span class="hljs-comment">//准備寫入剩余量</span>
        <span class="kw"><span class="hljs-keyword"><span class="hljs-keyword">if</span> (prepareCopy &gt; _lengthPerArray - targetOffset) {         <span class="co"><span class="hljs-comment"><span class="hljs-comment">//但數組的剩余長度可能不夠,寫入較小長度</span>
            prepareCopy = _lengthPerArray - targetOffset;
        }
        Array.Copy(buffer, count - left, targetBuffer, targetOffset, prepareCopy);  <span class="co"><span class="hljs-comment"><span class="hljs-comment">//拷貝字節</span>
        _position += prepareCopy;                                   <span class="co"><span class="hljs-comment"><span class="hljs-comment">//推進位置</span>
        left -= prepareCopy;                                        <span class="co"><span class="hljs-comment"><span class="hljs-comment">//減小剩余量</span>
        <span class="kw"><span class="hljs-keyword"><span class="hljs-keyword">if</span> (_position &gt; _length) {                                  <span class="co"><span class="hljs-comment"><span class="hljs-comment">//增大總長度</span>
            _length = _position;
        }
    }
}

}

讀取過程類似,循環查找待讀取數組和拷貝字節直到完成,不同的是分配內存的邏輯以一條異常替代;

public override Int32 ReadBytes(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return 0; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } checked { var prepareRead = (Int32)(Math.Min(count, _length - _position)); //計算待讀取長度 var left = prepareRead; //標記剩余量 while (left > 0) { var targetBuffer = GetBufferForRead(); //查找目標數組 var targetOffset = (Int32)(_position % _lengthPerArray); //查找目標位置 var prepareCopy = left; //准備讀取剩余量 if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } Array.Copy(targetBuffer, targetOffset, buffer, prepareRead - left, prepareCopy); //但數組的剩余長度可能不夠,讀取較小長度 _position += prepareCopy; //推進位置 left -= prepareCopy; //減小剩余量 } return prepareRead; } } 

private Byte[] GetBufferForRead() {
var index = (Int32)(_position / _lengthPerArray);
Byte[] buffer;
if (!_buffers.TryGetValue(index, out buffer)) {
throw new IndexOutOfRangeException();
}
return buffer;
}

釋放時釋放內部維護的的全部字節;

public override void Dispose() { foreach (var item in _buffers) { if (_lengthPerArray >= 128) { ServiceProvider.BufferManager.ReturnBuffer(item.Value); } } _buffers.Clear(); }

寫入緩沖區是對內部維護數組列表的直接操作,高度優化

public override void CopyTo(Stream destination) { foreach (var item in GetBufferAndSize()) { destination.Write(item.Key, 0, item.Value); } }

讀取緩沖區時和寫入行為類似

public override void ReadFrom(Stream source, int count) { var left = count; var loop = 0; do { var targetBuffer = GetBufferForWrite(); var targetOffset = (Int32)(_position % _lengthPerArray); var prepareCopy = left; if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } 
    var readed = source.Read(targetBuffer, targetOffset, prepareCopy);
    _position += readed;
    left -= readed;
    <span class="kw"><span class="hljs-keyword"><span class="hljs-keyword">if</span> (_position &gt; _length) {
        _length = _position;
    }
    loop++;
} <span class="kw"><span class="hljs-keyword"><span class="hljs-keyword">while</span> (left &gt; <span class="dv"><span class="hljs-number">0</span>);

}

實際上可以從 MemoryStream 定義出 ScalableMemoryStream 再改寫其行為,KafkaScalableBinary 依賴於 MemoryStream 而不是具體實現,整體就更加"設計模式"了 , 基本邏輯前文已陳述。

測試過程中發現,一來 **mono 的 BufferManager 實現存在線程安全問題*,故 Chuye.Kafka 提供了一個 ObjectPool 模式的 BufferManager 作為替代方案; 二是 KafkaScalableBinary 與 ScalableStreamBinary 的性能對比測試結果非常不穩定,但前者頻繁的取橫取整及字典開銷必然是拖累,我會繼續追蹤和優化。

KafkaScalableBinary (見於 github), 序列化部分設計示意:

 


Jusfr 原創,轉載請注明來自博客園


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM