轉自https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines
System.IO.Pipelines 是一個新庫,旨在使在 .NET 中執行高性能 I/O 更加容易。 該庫的目標為適用於所有 .NET 實現的 .NET Standard。
System.IO.Pipelines 解決什么問題
分析流數據的應用由樣板代碼組成,后者由許多專門且不尋常的代碼流組成。 樣板代碼和特殊情況代碼很復雜且難以進行維護。
System.IO.Pipelines
已構建為:
- 具有高性能的流數據分析功能。
- 減少代碼復雜性。
下面的代碼是典型的 TCP 服務器,它從客戶機接收行分隔的消息(由 '\n'
分隔):
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // Process a single line from the buffer ProcessLine(buffer); }
前面的代碼有幾個問題:
- 單次調用
ReadAsync
可能無法接收整條消息(行尾)。 - 忽略了
stream.ReadAsync
的結果。stream.ReadAsync
返回讀取的數據量。 - 它不能處理在單個
ReadAsync
調用中讀取多行的情況。 - 它為每次讀取分配一個
byte
數組。
要解決上述問題,需要進行以下更改:
-
緩沖傳入的數據,直到找到新行。
-
分析緩沖區中返回的所有行。
-
該行可能大於 1KB(1024 字節)。 此代碼需要調整輸入緩沖區的大小,直到找到分隔符后,才能在緩沖區內容納完整行。
- 如果調整緩沖區的大小,當輸入中出現較長的行時,將生成更多緩沖區副本。
- 壓縮用於讀取行的緩沖區,以減少空余。
-
請考慮使用緩沖池來避免重復分配內存。
-
下面的代碼解決了其中一些問題:
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // Calculate the amount of bytes remaining in the buffer. var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // Double the buffer size and copy the previously buffered data into the new buffer. var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // Return the old buffer to the pool. ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF break; } // Keep track of the amount of buffered bytes. bytesBuffered += bytesRead; var linePosition = -1; do { // Look for a EOL in the buffered data. linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // Calculate the length of the line based on the offset. var lineLength = linePosition - bytesConsumed; // Process the line. ProcessLine(buffer, bytesConsumed, lineLength); // Move the bytesConsumed to skip past the line consumed (including \n). bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
前面的代碼很復雜,不能解決所識別的所有問題。 高性能網絡通常意味着編寫非常復雜的代碼以使性能最大化。 System.IO.Pipelines
的設計目的是使編寫此類代碼更容易。
若要查看翻譯為非英語語言的代碼注釋,請在 此 GitHub 討論問題中告訴我們。
管道
Pipe 類可用於創建 PipeWriter/PipeReader
對。 寫入 PipeWriter
的所有數據都可用於 PipeReader
:
1 var pipe = new Pipe(); 2 PipeReader reader = pipe.Reader; 3 PipeWriter writer = pipe.Writer;
管道基本用法
1 async Task ProcessLinesAsync(Socket socket) 2 { 3 var pipe = new Pipe(); 4 Task writing = FillPipeAsync(socket, pipe.Writer); 5 Task reading = ReadPipeAsync(pipe.Reader); 6 7 await Task.WhenAll(reading, writing); 8 } 9 10 async Task FillPipeAsync(Socket socket, PipeWriter writer) 11 { 12 const int minimumBufferSize = 512; 13 14 while (true) 15 { 16 // Allocate at least 512 bytes from the PipeWriter. 17 Memory<byte> memory = writer.GetMemory(minimumBufferSize); 18 try 19 { 20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None); 21 if (bytesRead == 0) 22 { 23 break; 24 } 25 // Tell the PipeWriter how much was read from the Socket. 26 writer.Advance(bytesRead); 27 } 28 catch (Exception ex) 29 { 30 LogError(ex); 31 break; 32 } 33 34 // Make the data available to the PipeReader. 35 FlushResult result = await writer.FlushAsync(); 36 37 if (result.IsCompleted) 38 { 39 break; 40 } 41 } 42 43 // By completing PipeWriter, tell the PipeReader that there's no more data coming. 44 await writer.CompleteAsync(); 45 } 46 47 async Task ReadPipeAsync(PipeReader reader) 48 { 49 while (true) 50 { 51 ReadResult result = await reader.ReadAsync(); 52 ReadOnlySequence<byte> buffer = result.Buffer; 53 54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) 55 { 56 // Process the line. 57 ProcessLine(line); 58 } 59 60 // Tell the PipeReader how much of the buffer has been consumed. 61 reader.AdvanceTo(buffer.Start, buffer.End); 62 63 // Stop reading if there's no more data coming. 64 if (result.IsCompleted) 65 { 66 break; 67 } 68 } 69 70 // Mark the PipeReader as complete. 71 await reader.CompleteAsync(); 72 } 73 74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) 75 { 76 // Look for a EOL in the buffer. 77 SequencePosition? position = buffer.PositionOf((byte)'\n'); 78 79 if (position == null) 80 { 81 line = default; 82 return false; 83 } 84 85 // Skip the line + the \n. 86 line = buffer.Slice(0, position.Value); 87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); 88 return true; 89 }
有兩個循環:
FillPipeAsync
從Socket
讀取並寫入PipeWriter
。ReadPipeAsync
從PipeReader
讀取並分析傳入的行。
沒有分配顯式緩沖區。 所有緩沖區管理都委托給 PipeReader
和 PipeWriter
實現。 委派緩沖區管理使使用代碼更容易集中關注業務邏輯。
在第一個循環中:
- 調用 PipeWriter.GetMemory(Int32) 從基礎編寫器獲取內存。
- 調用 PipeWriter.Advance(Int32) 以告知
PipeWriter
有多少數據已寫入緩沖區。 - 調用 PipeWriter.FlushAsync 以使數據可用於
PipeReader
。
在第二個循環中,PipeReader
使用由 PipeWriter
寫入的緩沖區。 緩沖區來自套接字。 對 PipeReader.ReadAsync
的調用:
-
返回包含兩條重要信息的 ReadResult:
- 以
ReadOnlySequence<byte>
形式讀取的數據。 - 布爾值
IsCompleted
,指示是否已到達數據結尾 (EOF)。
- 以
找到行尾 (EOL) 分隔符並分析該行后:
- 該邏輯處理緩沖區以跳過已處理的內容。
- 調用
PipeReader.AdvanceTo
以告知PipeReader
已消耗和檢查了多少數據。
讀取器和編寫器循環通過調用 Complete
結束。 Complete
使基礎管道釋放其分配的內存。
反壓和流量控制
理想情況下,讀取和分析可協同工作:
- 寫入線程使用來自網絡的數據並將其放入緩沖區。
- 分析線程負責構造適當的數據結構。
通常,分析所花費的時間比僅從網絡復制數據塊所用時間更長:
- 讀取線程領先於分析線程。
- 讀取線程必須減緩或分配更多內存來存儲用於分析線程的數據。
為了獲得最佳性能,需要在頻繁暫停和分配更多內存之間取得平衡。
為解決上述問題,Pipe
提供了兩個設置來控制數據流:
- PauseWriterThreshold:確定在調用 FlushAsync 暫停之前應緩沖多少數據。
- ResumeWriterThreshold:確定在恢復對
PipeWriter.FlushAsync
的調用之前,讀取器必須觀察多少數據。
- 當
Pipe
中的數據量超過PauseWriterThreshold
時,返回不完整的ValueTask<FlushResult>
。 - 低於
ResumeWriterThreshold
時,返回完整的ValueTask<FlushResult>
。
使用兩個值可防止快速循環,如果只使用一個值,則可能發生這種循環。
示例
1 // The Pipe will start returning incomplete tasks from FlushAsync until 2 // the reader examines at least 5 bytes. 3 var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5); 4 var pipe = new Pipe(options);
PipeScheduler
通常在使用 async
和 await
時,異步代碼會在 TaskScheduler 或當前 SynchronizationContext 上恢復。
在執行 I/O 時,對執行 I/O 的位置進行細粒度控制非常重要。 此控件允許高效利用 CPU 緩存。 高效的緩存對於 Web 服務器等高性能應用至關重要。 PipeScheduler 提供對異步回調運行位置的控制。 默認情況下:
- 使用當前的 SynchronizationContext。
- 如果沒有
SynchronizationContext
,它將使用線程池運行回調。
1 public static void Main(string[] args) 2 { 3 var writeScheduler = new SingleThreadPipeScheduler(); 4 var readScheduler = new SingleThreadPipeScheduler(); 5 6 // Tell the Pipe what schedulers to use and disable the SynchronizationContext. 7 var options = new PipeOptions(readerScheduler: readScheduler, 8 writerScheduler: writeScheduler, 9 useSynchronizationContext: false); 10 var pipe = new Pipe(options); 11 } 12 13 // This is a sample scheduler that async callbacks on a single dedicated thread. 14 public class SingleThreadPipeScheduler : PipeScheduler 15 { 16 private readonly BlockingCollection<(Action<object> Action, object State)> _queue = 17 new BlockingCollection<(Action<object> Action, object State)>(); 18 private readonly Thread _thread; 19 20 public SingleThreadPipeScheduler() 21 { 22 _thread = new Thread(DoWork); 23 _thread.Start(); 24 } 25 26 private void DoWork() 27 { 28 foreach (var item in _queue.GetConsumingEnumerable()) 29 { 30 item.Action(item.State); 31 } 32 } 33 34 public override void Schedule(Action<object> action, object state) 35 { 36 _queue.Add((action, state)); 37 } 38 }
PipeScheduler.ThreadPool 是 PipeScheduler 實現,用於對線程池的回調進行排隊。 PipeScheduler.ThreadPool
是默認選項,通常也是最佳選項。 PipeScheduler.Inline 可能會導致意外后果,如死鎖。
管道重置
通常重用 Pipe
對象即可重置。 若要重置管道,請在 PipeReader
和 PipeWriter
完成時調用 PipeReader Reset。
PipeReader
PipeReader 代表調用方管理內存。 在調用 PipeReader.ReadAsync 之后始終調用 PipeReader.AdvanceTo 。 這使 PipeReader
知道調用方何時用完內存,以便可以對其進行跟蹤。 從 PipeReader.ReadAsync
返回的 ReadOnlySequence<byte>
僅在調用 PipeReader.AdvanceTo
之前有效。 調用 PipeReader.AdvanceTo
后,不能使用 ReadOnlySequence<byte>
。
PipeReader.AdvanceTo
采用兩個 SequencePosition 參數:
- 第一個參數確定消耗的內存量。
- 第二個參數確定觀察到的緩沖區數。
將數據標記為“已使用”意味着管道可以將內存返回到底層緩沖池。 將數據標記為“已觀察”可控制對 PipeReader.ReadAsync
的下一個調用的操作。 將所有內容都標記為“已觀察”意味着下次對 PipeReader.ReadAsync
的調用將不會返回,直到有更多數據寫入管道。 任何其他值都將使對 PipeReader.ReadAsync
的下一次調用立即返回並包含已觀察到的和未觀察到的數據,但不是已被使用的數據 。
讀取流數據方案
嘗試讀取流數據時會出現以下幾種典型模式:
- 給定數據流時,分析單條消息。
- 給定數據流時,分析所有可用消息。
以下示例使用 TryParseMessage
方法分析來自 ReadOnlySequence<byte>
的消息。 TryParseMessage
分析單條消息並更新輸入緩沖區,以從緩沖區中剪裁已分析的消息。 TryParseMessage
不是 .NET 的一部分,它是在以下部分中使用的用戶編寫的方法。
1 bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);
讀取單條消息
下面的代碼從 PipeReader
讀取一條消息並將其返回給調用方。
1 async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader, 2 CancellationToken cancellationToken = default) 3 { 4 while (true) 5 { 6 ReadResult result = await reader.ReadAsync(cancellationToken); 7 ReadOnlySequence<byte> buffer = result.Buffer; 8 9 // In the event that no message is parsed successfully, mark consumed 10 // as nothing and examined as the entire buffer. 11 SequencePosition consumed = buffer.Start; 12 SequencePosition examined = buffer.End; 13 14 try 15 { 16 if (TryParseMessage(ref buffer, out Message message)) 17 { 18 // A single message was successfully parsed so mark the start as the 19 // parsed buffer as consumed. TryParseMessage trims the buffer to 20 // point to the data after the message was parsed. 21 consumed = buffer.Start; 22 23 // Examined is marked the same as consumed here, so the next call 24 // to ReadSingleMessageAsync will process the next message if there's 25 // one. 26 examined = consumed; 27 28 return message; 29 } 30 31 // There's no more data to be processed. 32 if (result.IsCompleted) 33 { 34 if (buffer.Length > 0) 35 { 36 // The message is incomplete and there's no more data to process. 37 throw new InvalidDataException("Incomplete message."); 38 } 39 40 break; 41 } 42 } 43 finally 44 { 45 reader.AdvanceTo(consumed, examined); 46 } 47 } 48 49 return null; 50 }
前面的代碼:
- 分析單條消息。
- 更新已使用的
SequencePosition
並檢查SequencePosition
以指向已剪裁的輸入緩沖區的開始。
因為 TryParseMessage
從輸入緩沖區中刪除了已分析的消息,所以更新了兩個 SequencePosition
參數。 通常,分析來自緩沖區的單條消息時,檢查的位置應為以下位置之一:
- 消息的結尾。
- 如果未找到消息,則返回接收緩沖區的結尾。
單條消息案例最有可能出現錯誤。 將錯誤的值傳遞給“已檢查”可能會導致內存不足異常或無限循環 。 有關詳細信息,請參閱本文中的 PipeReader 常見問題部分。
讀取多條消息
以下代碼從 PipeReader
讀取所有消息,並在每條消息上調用 ProcessMessageAsync
。
1 async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default) 2 { 3 try 4 { 5 while (true) 6 { 7 ReadResult result = await reader.ReadAsync(cancellationToken); 8 ReadOnlySequence<byte> buffer = result.Buffer; 9 10 try 11 { 12 // Process all messages from the buffer, modifying the input buffer on each 13 // iteration. 14 while (TryParseMessage(ref buffer, out Message message)) 15 { 16 await ProcessMessageAsync(message); 17 } 18 19 // There's no more data to be processed. 20 if (result.IsCompleted) 21 { 22 if (buffer.Length > 0) 23 { 24 // The message is incomplete and there's no more data to process. 25 throw new InvalidDataException("Incomplete message."); 26 } 27 break; 28 } 29 } 30 finally 31 { 32 // Since all messages in the buffer are being processed, you can use the 33 // remaining buffer's Start and End position to determine consumed and examined. 34 reader.AdvanceTo(buffer.Start, buffer.End); 35 } 36 } 37 } 38 finally 39 { 40 await reader.CompleteAsync(); 41 } 42 }
取消
PipeReader.ReadAsync
:
- 支持傳遞 CancellationToken。
- 如果在讀取掛起期間取消了
CancellationToken
,則會引發 OperationCanceledException。 - 支持通過 PipeReader.CancelPendingRead 取消當前讀取操作的方法,這樣可以避免引發異常。 調用
PipeReader.CancelPendingRead
將導致對PipeReader.ReadAsync
的當前或下次調用返回 ReadResult,並將IsCanceled
設置為true
。 這對於以非破壞性和非異常的方式停止現有的讀取循環非常有用。
1 private PipeReader reader; 2 3 public MyConnection(PipeReader reader) 4 { 5 this.reader = reader; 6 } 7 8 public void Abort() 9 { 10 // Cancel the pending read so the process loop ends without an exception. 11 reader.CancelPendingRead(); 12 } 13 14 public async Task ProcessMessagesAsync() 15 { 16 try 17 { 18 while (true) 19 { 20 ReadResult result = await reader.ReadAsync(); 21 ReadOnlySequence<byte> buffer = result.Buffer; 22 23 try 24 { 25 if (result.IsCanceled) 26 { 27 // The read was canceled. You can quit without reading the existing data. 28 break; 29 } 30 31 // Process all messages from the buffer, modifying the input buffer on each 32 // iteration. 33 while (TryParseMessage(ref buffer, out Message message)) 34 { 35 await ProcessMessageAsync(message); 36 } 37 38 // There's no more data to be processed. 39 if (result.IsCompleted) 40 { 41 break; 42 } 43 } 44 finally 45 { 46 // Since all messages in the buffer are being processed, you can use the 47 // remaining buffer's Start and End position to determine consumed and examined. 48 reader.AdvanceTo(buffer.Start, buffer.End); 49 } 50 } 51 } 52 finally 53 { 54 await reader.CompleteAsync(); 55 } 56 }
PipeReader 常見問題
-
將錯誤的值傳遞給
consumed
或examined
可能會導致讀取已讀取的數據。 -
傳遞
buffer.End
作為檢查對象可能會導致以下問題:- 數據停止
- 如果數據未使用,可能最終會出現內存不足 (OOM) 異常。 例如,當一次處理來自緩沖區的單條消息時,可能會出現
PipeReader.AdvanceTo(position, buffer.End)
。
-
將錯誤的值傳遞給
consumed
或examined
可能會導致無限循環。 例如,如果buffer.Start
沒有更改,則PipeReader.AdvanceTo(buffer.Start)
將導致在下一個對PipeReader.ReadAsync
的調用在新數據到來之前立即返回。 -
將錯誤的值傳遞給
consumed
或examined
可能會導致無限緩沖(最終導致 OOM)。 -
在調用
PipeReader.AdvanceTo
之后使用ReadOnlySequence<byte>
可能會導致內存損壞(在釋放之后使用)。 -
未能調用
PipeReader.Complete/CompleteAsync
可能會導致內存泄漏。 -
在處理緩沖區之前檢查 ReadResult.IsCompleted 並退出讀取邏輯會導致數據丟失。 循環退出條件應基於
ReadResult.Buffer.IsEmpty
和ReadResult.IsCompleted
。 如果錯誤執行此操作,可能會導致無限循環。
有問題的代碼
❌ 數據丟失
當 IsCompleted
被設置為 true
時,ReadResult
可能會返回最后一段數據。 在退出讀循環之前不讀取該數據將導致數據丟失。
警告
不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> dataLossBuffer = result.Buffer; 6 7 if (result.IsCompleted) 8 { 9 break; 10 } 11 12 Process(ref dataLossBuffer, out Message message); 13 14 reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End); 15 }
警告
不要使用上述代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 前面的示例用於解釋 PipeReader 常見問題。
❌ 無限循環
如果 Result.IsCompleted
是 true
,則以下邏輯可能會導致無限循環,但緩沖區中永遠不會有完整的消息。
警告
不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer; 6 if (result.IsCompleted && infiniteLoopBuffer.IsEmpty) 7 { 8 break; 9 } 10 11 Process(ref infiniteLoopBuffer, out Message message); 12 13 reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End); 14 }
下面是另一段具有相同問題的代碼。 該代碼在檢查 ReadResult.IsCompleted
之前檢查非空緩沖區。 由於該代碼位於 else if
中,如果緩沖區中沒有完整的消息,它將永遠循環。
警告
不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer; 6 7 if (!infiniteLoopBuffer.IsEmpty) 8 { 9 Process(ref infiniteLoopBuffer, out Message message); 10 } 11 else if (result.IsCompleted) 12 { 13 break; 14 } 15 16 reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End); 17 }
❌ 意外掛起
在分析單條消息時,如果無條件調用 PipeReader.AdvanceTo
而 buffer.End
位於 examined
位置,則可能導致掛起。 對 PipeReader.AdvanceTo
的下次調用將在以下情況下返回:
- 有更多數據寫入管道。
- 以及之前未檢查過新數據。
警告
不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> hangBuffer = result.Buffer; 6 7 Process(ref hangBuffer, out Message message); 8 9 if (result.IsCompleted) 10 { 11 break; 12 } 13 14 reader.AdvanceTo(hangBuffer.Start, hangBuffer.End); 15 16 if (message != null) 17 { 18 return message; 19 } 20 }
❌ 內存不足 (OOM)
在滿足以下條件的情況下,以下代碼將保持緩沖,直到發生 OutOfMemoryException:
- 沒有最大消息大小。
- 從
PipeReader
返回的數據不會生成完整的消息。 例如,它不會生成完整的消息,因為另一端正在編寫一條大消息(例如,一條為 4GB 的消息)。
警告
不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題。
1 Environment.FailFast("This code is terrible, don't use it!"); 2 while (true) 3 { 4 ReadResult result = await reader.ReadAsync(cancellationToken); 5 ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer; 6 7 Process(ref thisCouldOutOfMemory, out Message message); 8 9 if (result.IsCompleted) 10 { 11 break; 12 } 13 14 reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End); 15 16 if (message != null) 17 { 18 return message; 19 } 20 }
❌ 內存損壞
當寫入讀取緩沖區的幫助程序時,應在調用 Advance
之前復制任何返回的有效負載。 下面的示例將返回 Pipe
已丟棄的內存,並可能將其重新用於下一個操作(讀/寫)。
警告
不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題。
1 public class Message 2 { 3 public ReadOnlySequence<byte> CorruptedPayload { get; set; } 4 }
1 Environment.FailFast("This code is terrible, don't use it!"); 2 Message message = null; 3 4 while (true) 5 { 6 ReadResult result = await reader.ReadAsync(cancellationToken); 7 ReadOnlySequence<byte> buffer = result.Buffer; 8 9 ReadHeader(ref buffer, out int length); 10 11 if (length <= buffer.Length) 12 { 13 message = new Message 14 { 15 // Slice the payload from the existing buffer 16 CorruptedPayload = buffer.Slice(0, length) 17 }; 18 19 buffer = buffer.Slice(length); 20 } 21 22 if (result.IsCompleted) 23 { 24 break; 25 } 26 27 reader.AdvanceTo(buffer.Start, buffer.End); 28 29 if (message != null) 30 { 31 // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer 32 // was captured. 33 break; 34 } 35 } 36 37 return message; 38 }
PipeWriter
PipeWriter 管理用於代表調用方寫入的緩沖區。 PipeWriter
實現IBufferWriter<byte>
。 IBufferWriter<byte>
使得無需額外的緩沖區副本就可以訪問緩沖區來執行寫入操作。
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default) 2 { 3 // Request at least 5 bytes from the PipeWriter. 4 Memory<byte> memory = writer.GetMemory(5); 5 6 // Write directly into the buffer. 7 int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span); 8 9 // Tell the writer how many bytes were written. 10 writer.Advance(written); 11 12 await writer.FlushAsync(cancellationToken); 13 }
之前的代碼:
- 使用 GetMemory 從
PipeWriter
請求至少 5 個字節的緩沖區。 - 將 ASCII 字符串
"Hello"
的字節寫入返回的Memory<byte>
。 - 調用 Advance 以指示寫入緩沖區的字節數。
- 刷新
PipeWriter
,以便將字節發送到基礎設備。
以前的寫入方法使用 PipeWriter
提供的緩沖區。 或者,PipeWriter.WriteAsync:
- 將現有緩沖區復制到
PipeWriter
。 - 根據需要調用
GetSpan``Advance
,然后調用 FlushAsync。
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default) 2 { 3 byte[] helloBytes = Encoding.ASCII.GetBytes("Hello"); 4 5 // Write helloBytes to the writer, there's no need to call Advance here 6 // (Write does that). 7 await writer.WriteAsync(helloBytes, cancellationToken); 8 }
取消
FlushAsync 支持傳遞 CancellationToken。 如果令牌在刷新掛起時被取消,則傳遞 CancellationToken
將導致 OperationCanceledException
。 PipeWriter.FlushAsync
支持通過 PipeWriter.CancelPendingFlush 取消當前刷新操作而不引發異常的方法。 調用 PipeWriter.CancelPendingFlush
將導致對 PipeWriter.FlushAsync
或 PipeWriter.WriteAsync
的當前或下次調用返回 FlushResult,並將 IsCanceled
設置為 true
。 這對於以非破壞性和非異常的方式停止暫停刷新非常有用。
PipeWriter 常見問題
- GetSpan 和 GetMemory 返回至少具有請求內存量的緩沖區。 請勿假設確切的緩沖區大小 。
- 無法保證連續的調用將返回相同的緩沖區或相同大小的緩沖區。
- 在調用 Advance 之后,必須請求一個新的緩沖區來繼續寫入更多數據。 不能寫入先前獲得的緩沖區。
- 如果未完成對
FlushAsync
的調用,則調用GetMemory
或GetSpan
將不安全。 - 如果未刷新數據,則調用
Complete
或CompleteAsync
可能導致內存損壞。
IDuplexPipe
IDuplexPipe 是支持讀寫的類型的協定。 例如,網絡連接將由 IDuplexPipe
表示。
與包含 PipeReader
和 PipeWriter
的 Pipe
不同,IDuplexPipe
代表全雙工連接的單側。 這意味着寫入 PipeWriter
的內容不會從 PipeReader
中讀取。
流
在讀取或寫入流數據時,通常使用反序列化程序讀取數據,並使用序列化程序寫入數據。 大多數讀取和寫入流 API 都有一個 Stream
參數。 為了更輕松地與這些現有 API 集成,PipeReader
和 PipeWriter
公開了一個 AsStream。 AsStream 返回圍繞 PipeReader
或 PipeWriter
的 Stream
實現。
轉載請標明本文來源:https://www.cnblogs.com/yswenli/p/11810317.html
更多內容歡迎Star、Fork我的的github:https://github.com/yswenli/
如果發現本文有什么問題和任何建議,也隨時歡迎交流~