Pipelines - .NET中的新IO API指引(一)
Pipelines - .NET中的新IO API指引(二)
關於System.IO.Pipelines的一篇說明
System.IO.Pipelines: .NET高性能IO
System.IO.Pipelines 是對IO的統一抽象,文件、com口、網絡等等,重點在於讓調用者注意力集中在讀、寫緩沖區上,典型的就是 IDuplexPipe中的Input Output。
可以理解為將IO類抽象為讀、寫兩個緩沖區。
目前官方實現還處於preview狀態,作者使用Socket和NetworkStream 實現了一個 Pipelines.Sockets.Unofficial
作者在前兩篇中提到使用System.IO.Pipelines 改造StackExchange.Redis,在本篇中作者采用了改造現有的SimplSockets庫來說明System.IO.Pipelines的使用。
文章中的代碼(SimplPipelines,KestrelServer )
## SimplSockets說明
+ 可以單純的發送(Send),也可以完成請求/響應處理(SendRecieve)+ 同步Api
+ 提供簡單的幀協議封裝消息數據
+ 使用byte[]
+ 服務端可以向所有客戶端廣播消息
+ 有心跳檢測等等
屬於非常典型的傳統Socket庫。
## 作者的改造說明
### 對緩沖區數據進行處理的一些方案及選型
1. 使用byte[]拷貝出來,作為獨立的數據副本使用,簡單易用但成本高(分配和復制)2. 使用 ReadOnlySequence<byte> ,零拷貝,快速但有限制。一旦在管道上執行Advance操作,數據將被回收。在有嚴格控制的服務端處理場景(數據不會逃離請求上下文)下可以使用,言下之意使用要求比較高。
3. 作為2的擴展方案,將數據載荷的解析處理代碼移至類庫中(處理ReadOnlySequence<byte>),只需將解構完成的數據發布出來,也許需要一些自定義的structs 映射(map)一下。這里說的應該是直接將內存映射為Struct?
4. 通過返回Memory<byte> 獲取一份數據拷貝,也許需要從ArrayPool<byte>.Share 池中返回一個大數組;但是這樣對調用者要求較高,需要返回池。並且從Memory<T> 獲取一個T[]屬於高級和不安全的操作。不安全,有風險。( not all Memory<T> is based on T[])
5. 一個妥協方案,返回一個提供Memory<T>(Span<T>)的東西,並且使用一些明確的顯而易見的Api給用戶,這樣用戶就知道應該如何處理返回結果。比如IDisposable/using這種,在Dispose()被調用時將資源歸還給池。
作者認為,設計一個通用的消息傳遞Api時,方案5更為合理,調用方可以保存一段時間的數據並且不會干擾到管道的正常工作,也可以很好的利用ArrayPool。如果調用者沒有使用using也不會有什么大麻煩,只是效率會降低一些,就像使用方案1一樣。
但是方案的選擇需要充分考慮你的實際場景,比如在StackExchange.Redis 客戶端中使用的是方案3;在不允許數據離開請求上下文時使用方案2.。
一旦選定方案,以后基本不可能再更改。
針對效率最高的方案2,作者提出的專業建議是 **使用ref struct** 。
此處選擇的是方案5,與方案4的區別就是對Memory<T> 的處理,作者使用 System.Buffers.IMemoryOwner<T>接口
public interface IMemoryOwner<T> : IDisposable
{
Memory<T> Memory { get; }
}
以下為實現代碼,Dispose時歸還借出的數組,並考慮線程安全,避免多次歸還(very bad)。
private sealed class ArrayPoolOwner<T>:IMemoryOwner<T>{
private readonly int _length;
private T[] _oversized;
internal ArrayPoolOwner(T[] oversized,int length){
_length=length;
_oversized=oversized;
}
public Memory<T> Memory=>new Memory<T>(GetArray(),0,_length);
private T[] GetArray()=>Interlocked.CompareExchange(ref _oversized,null,null)
?? throw new ObjectDisposedException(ToString());
public void Dispose(){
var arr=Interlocked.Exchange(ref _oversized,null);
if(arr!=null) ArrayPool<T>.Shared.Return(arr);
}
}
Dispose后如果再次調用Memory將會失敗,即 使用時 using,不要再次使用。
**對ArrayPool的一些說明**
+ 從ArrayPool借出的數組比你需要的要大,你給定的大小在ArrayPool看來屬於下限(不可小於你給定的大小),見:ArrayPool<T>.Shared.Rent(int minimumLength);
+ 歸還時數組默認不清空,因此你借出的數組內可能會有垃圾數據;如果需要清空,在歸還時使用 ArrayPool<T>.Shared.Return(arr,true) ;
+ 從ArrayPool借出的數組比你需要的要大,你給定的大小在ArrayPool看來屬於下限(不可小於你給定的大小),見:ArrayPool<T>.Shared.Rent(int minimumLength);
+ 歸還時數組默認不清空,因此你借出的數組內可能會有垃圾數據;如果需要清空,在歸還時使用 ArrayPool<T>.Shared.Return(arr,true) ;
作者對ArrayPool的一些建議:
增加 IMemoryOwner<T> RentOwned(int length),T[] Rent(int minimumLength) 及借出時清空數組,歸還時清空數組的選項。
增加 IMemoryOwner<T> RentOwned(int length),T[] Rent(int minimumLength) 及借出時清空數組,歸還時清空數組的選項。
這里的想法是通過IMemoryOwner<T>實現一種所有權的轉移,典型調用方法如下
void DoSomething(IMemoryOwner<byte> data){
using(data){
// ... other things here ...
DoTheThing(data.Memory);
}
// ... more things here ...
}
通過ArrayPool的借、還機制避免頻繁分配。
**作者的警告:**
+ 不要把data.Memory 單獨取出亂用,using完了就不要再操作它了(這種錯誤比較基礎)
+ 有人會用MemoryMarshal搞出數組使用,作者認為可以實現一個 MemoryManager<T>(ArrayPoolOwner<T> : MemoryManager<T>, since MemoryManager<T> : IMemoryOwner<T>)讓.Span如同.Memory一樣失敗。
---- 作者也挺糾結(周道)的 :)。
+ 不要把data.Memory 單獨取出亂用,using完了就不要再操作它了(這種錯誤比較基礎)
+ 有人會用MemoryMarshal搞出數組使用,作者認為可以實現一個 MemoryManager<T>(ArrayPoolOwner<T> : MemoryManager<T>, since MemoryManager<T> : IMemoryOwner<T>)讓.Span如同.Memory一樣失敗。
---- 作者也挺糾結(周道)的 :)。
使用 ReadOnlySequence<T> 填充ArrayPoolOwner(構造,實例化)
public static IMemoryOwner<T> Lease<T>(this ReadOnlySequence<T> source)
{
if (source.IsEmpty) return Empty<T>();
int len = checked((int)source.Length);
var arr = ArrayPool<T>.Shared.Rent(len);//借出
source.CopyTo(arr);
return new ArrayPoolOwner<T>(arr, len);//dispose時歸還
}
### 基本API
服務端和客戶端雖然不同但代碼有許多重疊的地方,比如都需要某種線程安全機制的寫入,需要某種讀循環來處理接收的數據,因此可以共用一個基類。基類中使用IDuplexPipe(包括input,output兩個管道)作為管道。
public abstract class SimplPipeline : IDisposable
{
private IDuplexPipe _pipe;
protected SimplPipeline(IDuplexPipe pipe)
=> _pipe = pipe;
public void Dispose() => Close();
public void Close() {/* burn the pipe*/}
}
首先,需要一種線程安全的寫入機制並且不會過度阻塞調用方。在原SimplSockets(包括StackExchange.Redis v1)中使用消息隊列來處理。調用方Send時同步的將消息入隊,在將來的某刻,消息出隊並寫入到Socket中。此方式存在的問題
+ 有許多移動的部分
+ 與“pipelines”有些重復
+ 有許多移動的部分
+ 與“pipelines”有些重復
管道本身即是隊列,本身具備輸出(寫、發送)緩沖區,沒必要再增加一個隊列,直接把數據寫入管道即可。取消原有隊列只有一些小的影響,在StackExchange.Redis v1 中使用隊列完成優先級排序處理(隊列跳轉),作者表示不擔心這一點。
**寫入Api設計**
+ 不一定時同步的
+ 調用方可以單純的傳入一段內存數據(ReadOnlyMember<byte>),或者是一個(IMemoryOwner<byte>)由Api寫入后進行清理。
+ 先假設讀、寫分開(暫不考慮響應)
返回值使用ValueTask因為寫入管道通常是同步的,只有管道執行Flush時才可能是異步的(大多數情況下也是同步的,除非在管道被備份時)。
+ 不一定時同步的
+ 調用方可以單純的傳入一段內存數據(ReadOnlyMember<byte>),或者是一個(IMemoryOwner<byte>)由Api寫入后進行清理。
+ 先假設讀、寫分開(暫不考慮響應)
protected async ValueTask WriteAsync(IMemoryOwner<byte> payload, int messageId)//調用方不再使用payload,需要我們清理
{
using (payload)
{
await WriteAsync(payload.Memory, messageId);
}
}
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId);//調用方自己清理
messageId標識一條消息,寫入消息頭部, 用於之后處理響應回復信息。
返回值使用ValueTask因為寫入管道通常是同步的,只有管道執行Flush時才可能是異步的(大多數情況下也是同步的,除非在管道被備份時)。
### 寫入與錯誤
首先需要保證單次寫操作,lock在此不合適,因為它不能與異步操作很好的協同。考慮flush有可能是異步的,導致后續(continuation )部分可能會在另外的線程上。這里使用與異步兼容的SemaphoreSlim。下面是一條指南:**一般來說, 應用程序代碼應針對可讀性進行優化;庫代碼應針對性能進行優化。**
以下為機翻原文
> 您可能同意也可能不同意這一點, 但這是我編寫代碼的一般指南。我的意思是,類庫代碼往往有一個單一的重點目的, 往往由一個人的經驗可能是 "深刻的, 但不一定是 廣泛的" 維護;你的大腦專注於那個領域, 用奇怪的長度來優化代碼是可以的。相反,應用程序代碼往往涉及更多不同概念的管道-"寬但不一定深" (深度隱藏在各種庫 中)。應用程序代碼通常具有更復雜和不可預知的交互, 因此重點應放在可維護和 "明顯正確" 上。
基本上, 我在這里的觀點是, 我傾向於把很多注意力集中在通常不會放入應用程序代碼中的優化上, 因為我從經驗和廣泛的基准測試中知道它們真的很重要。所以。。。我要做一些看起來很奇怪的事情, 我希望你和我一起踏上這段旅程。
“明顯正確”的代碼
private readonly SemaphoreSlim _singleWriter= new SemaphoreSlim(1);
protected async ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{
await _singleWriter.WaitAsync();
try
{
WriteFrameHeader(writer, payload.Length, messageId);
await writer.WriteAsync(payload);
}
finally
{
_singleWriter.Release();
}
}
這段代碼沒有任何問題,但是即便所有部分都是同步完成的,就會產生多余的狀態機-------大概是 不是所有地方都需要異步處理 的意思。
通過兩個問題進行重構
- 單次寫入是否沒有競爭?(無人爭用)
- Flush是否為同步
通過兩個問題進行重構
- 單次寫入是否沒有競爭?(無人爭用)
- Flush是否為同步
重構,將原WriteAsync 更名為 WriteAsyncSlowPath,增加新的WriteAsync
作者的“一些看起來很奇怪的” 實現
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{
// try to get the conch; if not, switch to async
//writer已經被占用,異步
if (!_singleWriter.Wait(0))
return WriteAsyncSlowPath(payload, messageId);
bool release = true;
try
{
WriteFrameHeader(writer, payload.Length, messageId);
var write = writer.WriteAsync(payload);
if (write.IsCompletedSuccessfully) return default;
release = false;
return AwaitFlushAndRelease(write);
}
finally
{
if (release) _singleWriter.Release();
}
}
async ValueTask AwaitFlushAndRelease(ValueTask<FlushResult> flush)
{
try { await flush; }
finally { _singleWriter.Release(); }
}
三個地方
1. _singleWriter.Wait(0) 意味着writer處於空閑狀態,沒有其他人在調用
2. write.IsCompletedSuccessfully 意味着writer同步flush
3. 輔助方法 AwaitFlushAndRelease 處理異步flush情況
-------------------------------------------------------------------------------------
### 協議頭處理
協議頭由兩個int組成,小端,第一個是長度,第二個是messageId,共8字節。void WriteFrameHeader(PipeWriter writer, int length, int messageId)
{
var span = writer.GetSpan(8);
BinaryPrimitives.WriteInt32LittleEndian(
span, length);
BinaryPrimitives.WriteInt32LittleEndian(
span.Slice(4), messageId);
writer.Advance(8);
}
### 管道客戶端實現 發送
public class SimplPipelineClient : SimplPipeline
{
public async Task<IMemoryOwner<byte>> SendReceiveAsync(ReadOnlyMemory<byte> message)
{
var tcs = new TaskCompletionSource<IMemoryOwner<byte>>();
int messageId;
lock (_awaitingResponses)
{
messageId = ++_nextMessageId;
if (messageId == 0) messageId = 1;
_awaitingResponses.Add(messageId, tcs);
}
await WriteAsync(message, messageId);
return await tcs.Task;
}
public async Task<IMemoryOwner<byte>> SendReceiveAsync(IMemoryOwner<byte> message)
{
using (message)
{
return await SendReceiveAsync(message.Memory);
}
}
}
- _awaitingResponses 是個字典,保存已經發送的消息,用於將來處理對某條(messageId)消息的回復。
### 接收循環
protected async Task StartReceiveLoopAsync(CancellationToken cancellationToken = default)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var readResult = await reader.ReadAsync(cancellationToken);
if (readResult.IsCanceled) break;
var buffer = readResult.Buffer;
var makingProgress = false;
while (TryParseFrame(ref buffer, out var payload, out var messageId))
{
makingProgress = true;
await OnReceiveAsync(payload, messageId);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (!makingProgress && readResult.IsCompleted) break;
}
try { reader.Complete(); } catch { }
}
catch (Exception ex)
{
try { reader.Complete(ex); } catch { }
}
}
protected abstract ValueTask OnReceiveAsync(ReadOnlySequence<byte> payload, int messageId);
接收緩沖區里什么時間會有什么東西由發送方和系統環境決定,因此延遲是必然的,所以這里全部按異步處理就好。
- TryParseFrame 讀出緩沖區數據,根據幀格式解析出實際數據、id等
- OnRecieveAsync 處理數據,比如對於回復/響應的處理等
- reader中的數據讀出后盡快Advance一下,通知管道你的讀取進度;
- TryParseFrame 讀出緩沖區數據,根據幀格式解析出實際數據、id等
- OnRecieveAsync 處理數據,比如對於回復/響應的處理等
- reader中的數據讀出后盡快Advance一下,通知管道你的讀取進度;
解析幀
private bool TryParseFrame(
ref ReadOnlySequence<byte> input,
out ReadOnlySequence<byte> payload, out int messageId)
{
if (input.Length < 8)
{ // not enough data for the header
payload = default;
messageId = default;
return false;
}
int length;
if (input.First.Length >= 8)
{ // already 8 bytes in the first segment
length = ParseFrameHeader(
input.First.Span, out messageId);
}
else
{ // copy 8 bytes into a local span
Span<byte> local = stackalloc byte[8];
input.Slice(0, 8).CopyTo(local);
length = ParseFrameHeader(
local, out messageId);
}
// do we have the "length" bytes?
if (input.Length < length + 8)
{
payload = default;
return false;
}
// success!
payload = input.Slice(8, length);
input = input.Slice(payload.End);
return true;
}
緩沖區是不連續的,一段一段的,像鏈表一樣,第一段就是input.First。
代碼很簡單,主要演示一些用法;
輔助方法
OnReceiveAsync
代碼很簡單,主要演示一些用法;
輔助方法
static int ParseFrameHeader(
ReadOnlySpan<byte> input, out int messageId)
{
var length = BinaryPrimitives
.ReadInt32LittleEndian(input);
messageId = BinaryPrimitives
.ReadInt32LittleEndian(input.Slice(4));
return length;
}
OnReceiveAsync
protected override ValueTask OnReceiveAsync(
ReadOnlySequence<byte> payload, int messageId)
{
if (messageId != 0)
{ // request/response
TaskCompletionSource<IMemoryOwner<byte>> tcs;
lock (_awaitingResponses)
{
if (_awaitingResponses.TryGetValue(messageId, out tcs))
{
_awaitingResponses.Remove(messageId);
}
}
tcs?.TrySetResult(payload.Lease());
}
else
{ // unsolicited
MessageReceived?.Invoke(payload.Lease());
}
return default;
}
到此為止,其余部分主要是一些服務端和其他功能實現及benchmark。。。

