System.IO.Pipelines——高性能IO(一)


轉自https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines

System.IO.Pipelines 是一個新庫,旨在使在 .NET 中執行高性能 I/O 更加容易。 該庫的目標為適用於所有 .NET 實現的 .NET Standard。

System.IO.Pipelines 解決什么問題

分析流數據的應用由樣板代碼組成,后者由許多專門且不尋常的代碼流組成。 樣板代碼和特殊情況代碼很復雜且難以進行維護。

System.IO.Pipelines 已構建為:

  • 具有高性能的流數據分析功能。
  • 減少代碼復雜性。

下面的代碼是典型的 TCP 服務器,它從客戶機接收行分隔的消息(由 '\n' 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代碼有幾個問題:

  • 單次調用 ReadAsync 可能無法接收整條消息(行尾)。
  • 忽略了 stream.ReadAsync 的結果。 stream.ReadAsync 返回讀取的數據量。
  • 它不能處理在單個 ReadAsync 調用中讀取多行的情況。
  • 它為每次讀取分配一個 byte 數組。

要解決上述問題,需要進行以下更改:

  • 緩沖傳入的數據,直到找到新行。

  • 分析緩沖區中返回的所有行。

  • 該行可能大於 1KB(1024 字節)。 此代碼需要調整輸入緩沖區的大小,直到找到分隔符后,才能在緩沖區內容納完整行。

    • 如果調整緩沖區的大小,當輸入中出現較長的行時,將生成更多緩沖區副本。
    • 壓縮用於讀取行的緩沖區,以減少空余。
  • 請考慮使用緩沖池來避免重復分配內存。

  • 下面的代碼解決了其中一些問題:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

前面的代碼很復雜,不能解決所識別的所有問題。 高性能網絡通常意味着編寫非常復雜的代碼以使性能最大化。 System.IO.Pipelines 的設計目的是使編寫此類代碼更容易。

若要查看翻譯為非英語語言的代碼注釋,請在 此 GitHub 討論問題中告訴我們。

管道

Pipe 類可用於創建 PipeWriter/PipeReader 對。 寫入 PipeWriter 的所有數據都可用於 PipeReader

1 var pipe = new Pipe();
2 PipeReader reader = pipe.Reader;
3 PipeWriter writer = pipe.Writer;

 

管道基本用法

 1 async Task ProcessLinesAsync(Socket socket)
 2 {
 3     var pipe = new Pipe();
 4     Task writing = FillPipeAsync(socket, pipe.Writer);
 5     Task reading = ReadPipeAsync(pipe.Reader);
 6 
 7     await Task.WhenAll(reading, writing);
 8 }
 9 
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12     const int minimumBufferSize = 512;
13 
14     while (true)
15     {
16         // Allocate at least 512 bytes from the PipeWriter.
17         Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18         try
19         {
20             int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21             if (bytesRead == 0)
22             {
23                 break;
24             }
25             // Tell the PipeWriter how much was read from the Socket.
26             writer.Advance(bytesRead);
27         }
28         catch (Exception ex)
29         {
30             LogError(ex);
31             break;
32         }
33 
34         // Make the data available to the PipeReader.
35         FlushResult result = await writer.FlushAsync();
36 
37         if (result.IsCompleted)
38         {
39             break;
40         }
41     }
42 
43      // By completing PipeWriter, tell the PipeReader that there's no more data coming.
44     await writer.CompleteAsync();
45 }
46 
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49     while (true)
50     {
51         ReadResult result = await reader.ReadAsync();
52         ReadOnlySequence<byte> buffer = result.Buffer;
53 
54         while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55         {
56             // Process the line.
57             ProcessLine(line);
58         }
59 
60         // Tell the PipeReader how much of the buffer has been consumed.
61         reader.AdvanceTo(buffer.Start, buffer.End);
62 
63         // Stop reading if there's no more data coming.
64         if (result.IsCompleted)
65         {
66             break;
67         }
68     }
69 
70     // Mark the PipeReader as complete.
71     await reader.CompleteAsync();
72 }
73 
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76     // Look for a EOL in the buffer.
77     SequencePosition? position = buffer.PositionOf((byte)'\n');
78 
79     if (position == null)
80     {
81         line = default;
82         return false;
83     }
84 
85     // Skip the line + the \n.
86     line = buffer.Slice(0, position.Value);
87     buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88     return true;
89 }

有兩個循環:

  • FillPipeAsync 從 Socket 讀取並寫入 PipeWriter
  • ReadPipeAsync 從 PipeReader 讀取並分析傳入的行。

沒有分配顯式緩沖區。 所有緩沖區管理都委托給 PipeReader 和 PipeWriter 實現。 委派緩沖區管理使使用代碼更容易集中關注業務邏輯。

在第一個循環中:

在第二個循環中,PipeReader 使用由 PipeWriter 寫入的緩沖區。 緩沖區來自套接字。 對 PipeReader.ReadAsync 的調用:

  • 返回包含兩條重要信息的 ReadResult

    • 以 ReadOnlySequence<byte> 形式讀取的數據。
    • 布爾值 IsCompleted,指示是否已到達數據結尾 (EOF)。

找到行尾 (EOL) 分隔符並分析該行后:

  • 該邏輯處理緩沖區以跳過已處理的內容。
  • 調用 PipeReader.AdvanceTo 以告知 PipeReader 已消耗和檢查了多少數據。

讀取器和編寫器循環通過調用 Complete 結束。 Complete 使基礎管道釋放其分配的內存。

反壓和流量控制

理想情況下,讀取和分析可協同工作:

  • 寫入線程使用來自網絡的數據並將其放入緩沖區。
  • 分析線程負責構造適當的數據結構。

通常,分析所花費的時間比僅從網絡復制數據塊所用時間更長:

  • 讀取線程領先於分析線程。
  • 讀取線程必須減緩或分配更多內存來存儲用於分析線程的數據。

為了獲得最佳性能,需要在頻繁暫停和分配更多內存之間取得平衡。

為解決上述問題,Pipe 提供了兩個設置來控制數據流:

具有 ResumeWriterThreshold 和 PauseWriterThreshold 的圖

PipeWriter.FlushAsync

  • 當 Pipe 中的數據量超過 PauseWriterThreshold 時,返回不完整的 ValueTask<FlushResult>
  • 低於 ResumeWriterThreshold 時,返回完整的 ValueTask<FlushResult>

使用兩個值可防止快速循環,如果只使用一個值,則可能發生這種循環。

示例

1 // The Pipe will start returning incomplete tasks from FlushAsync until
2 // the reader examines at least 5 bytes.
3 var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
4 var pipe = new Pipe(options);

PipeScheduler

通常在使用 async 和 await 時,異步代碼會在 TaskScheduler 或當前 SynchronizationContext 上恢復。

在執行 I/O 時,對執行 I/O 的位置進行細粒度控制非常重要。 此控件允許高效利用 CPU 緩存。 高效的緩存對於 Web 服務器等高性能應用至關重要。 PipeScheduler 提供對異步回調運行位置的控制。 默認情況下:

  • 使用當前的 SynchronizationContext
  • 如果沒有 SynchronizationContext,它將使用線程池運行回調。
 1 public static void Main(string[] args)
 2 {
 3     var writeScheduler = new SingleThreadPipeScheduler();
 4     var readScheduler = new SingleThreadPipeScheduler();
 5 
 6     // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
 7     var options = new PipeOptions(readerScheduler: readScheduler,
 8                                   writerScheduler: writeScheduler,
 9                                   useSynchronizationContext: false);
10     var pipe = new Pipe(options);
11 }
12 
13 // This is a sample scheduler that async callbacks on a single dedicated thread.
14 public class SingleThreadPipeScheduler : PipeScheduler
15 {
16     private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
17      new BlockingCollection<(Action<object> Action, object State)>();
18     private readonly Thread _thread;
19 
20     public SingleThreadPipeScheduler()
21     {
22         _thread = new Thread(DoWork);
23         _thread.Start();
24     }
25 
26     private void DoWork()
27     {
28         foreach (var item in _queue.GetConsumingEnumerable())
29         {
30             item.Action(item.State);
31         }
32     }
33 
34     public override void Schedule(Action<object> action, object state)
35     {
36         _queue.Add((action, state));
37     }
38 }

PipeScheduler.ThreadPool 是 PipeScheduler 實現,用於對線程池的回調進行排隊。 PipeScheduler.ThreadPool 是默認選項,通常也是最佳選項。 PipeScheduler.Inline 可能會導致意外后果,如死鎖。

管道重置

通常重用 Pipe 對象即可重置。 若要重置管道,請在 PipeReader 和 PipeWriter 完成時調用 PipeReader Reset

PipeReader

PipeReader 代表調用方管理內存。 在調用 PipeReader.ReadAsync 之后始終調用 PipeReader.AdvanceTo 。 這使 PipeReader 知道調用方何時用完內存,以便可以對其進行跟蹤。 從 PipeReader.ReadAsync 返回的 ReadOnlySequence<byte> 僅在調用 PipeReader.AdvanceTo 之前有效。 調用 PipeReader.AdvanceTo 后,不能使用 ReadOnlySequence<byte>

PipeReader.AdvanceTo 采用兩個 SequencePosition 參數:

  • 第一個參數確定消耗的內存量。
  • 第二個參數確定觀察到的緩沖區數。

將數據標記為“已使用”意味着管道可以將內存返回到底層緩沖池。 將數據標記為“已觀察”可控制對 PipeReader.ReadAsync 的下一個調用的操作。 將所有內容都標記為“已觀察”意味着下次對 PipeReader.ReadAsync 的調用將不會返回,直到有更多數據寫入管道。 任何其他值都將使對 PipeReader.ReadAsync 的下一次調用立即返回並包含已觀察到的和未觀察到的數據,但不是已被使用的數據 。

讀取流數據方案

嘗試讀取流數據時會出現以下幾種典型模式:

  • 給定數據流時,分析單條消息。
  • 給定數據流時,分析所有可用消息。

以下示例使用 TryParseMessage 方法分析來自 ReadOnlySequence<byte> 的消息。 TryParseMessage 分析單條消息並更新輸入緩沖區,以從緩沖區中剪裁已分析的消息。 TryParseMessage 不是 .NET 的一部分,它是在以下部分中使用的用戶編寫的方法。

1 bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);

讀取單條消息

下面的代碼從 PipeReader 讀取一條消息並將其返回給調用方。

 1 async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
 2  CancellationToken cancellationToken = default)
 3 {
 4     while (true)
 5     {
 6         ReadResult result = await reader.ReadAsync(cancellationToken);
 7         ReadOnlySequence<byte> buffer = result.Buffer;
 8 
 9         // In the event that no message is parsed successfully, mark consumed
10         // as nothing and examined as the entire buffer.
11         SequencePosition consumed = buffer.Start;
12         SequencePosition examined = buffer.End;
13 
14         try
15         {
16             if (TryParseMessage(ref buffer, out Message message))
17             {
18                 // A single message was successfully parsed so mark the start as the
19                 // parsed buffer as consumed. TryParseMessage trims the buffer to
20                 // point to the data after the message was parsed.
21                 consumed = buffer.Start;
22 
23                 // Examined is marked the same as consumed here, so the next call
24                 // to ReadSingleMessageAsync will process the next message if there's
25                 // one.
26                 examined = consumed;
27 
28                 return message;
29             }
30 
31             // There's no more data to be processed.
32             if (result.IsCompleted)
33             {
34                 if (buffer.Length > 0)
35                 {
36                     // The message is incomplete and there's no more data to process.
37                     throw new InvalidDataException("Incomplete message.");
38                 }
39 
40                 break;
41             }
42         }
43         finally
44         {
45             reader.AdvanceTo(consumed, examined);
46         }
47     }
48 
49     return null;
50 }

前面的代碼:

  • 分析單條消息。
  • 更新已使用的 SequencePosition 並檢查 SequencePosition 以指向已剪裁的輸入緩沖區的開始。

因為 TryParseMessage 從輸入緩沖區中刪除了已分析的消息,所以更新了兩個 SequencePosition 參數。 通常,分析來自緩沖區的單條消息時,檢查的位置應為以下位置之一:

  • 消息的結尾。
  • 如果未找到消息,則返回接收緩沖區的結尾。

單條消息案例最有可能出現錯誤。 將錯誤的值傳遞給“已檢查”可能會導致內存不足異常或無限循環 。 有關詳細信息,請參閱本文中的 PipeReader 常見問題部分。

讀取多條消息

以下代碼從 PipeReader 讀取所有消息,並在每條消息上調用 ProcessMessageAsync

 1 async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
 2 {
 3     try
 4     {
 5         while (true)
 6         {
 7             ReadResult result = await reader.ReadAsync(cancellationToken);
 8             ReadOnlySequence<byte> buffer = result.Buffer;
 9 
10             try
11             {
12                 // Process all messages from the buffer, modifying the input buffer on each
13                 // iteration.
14                 while (TryParseMessage(ref buffer, out Message message))
15                 {
16                     await ProcessMessageAsync(message);
17                 }
18 
19                 // There's no more data to be processed.
20                 if (result.IsCompleted)
21                 {
22                     if (buffer.Length > 0)
23                     {
24                         // The message is incomplete and there's no more data to process.
25                         throw new InvalidDataException("Incomplete message.");
26                     }
27                     break;
28                 }
29             }
30             finally
31             {
32                 // Since all messages in the buffer are being processed, you can use the
33                 // remaining buffer's Start and End position to determine consumed and examined.
34                 reader.AdvanceTo(buffer.Start, buffer.End);
35             }
36         }
37     }
38     finally
39     {
40         await reader.CompleteAsync();
41     }
42 }

取消

PipeReader.ReadAsync

  • 支持傳遞 CancellationToken
  • 如果在讀取掛起期間取消了 CancellationToken,則會引發 OperationCanceledException
  • 支持通過 PipeReader.CancelPendingRead 取消當前讀取操作的方法,這樣可以避免引發異常。 調用 PipeReader.CancelPendingRead 將導致對 PipeReader.ReadAsync 的當前或下次調用返回 ReadResult,並將 IsCanceled 設置為 true。 這對於以非破壞性和非異常的方式停止現有的讀取循環非常有用。
 1 private PipeReader reader;
 2 
 3 public MyConnection(PipeReader reader)
 4 {
 5     this.reader = reader;
 6 }
 7 
 8 public void Abort()
 9 {
10     // Cancel the pending read so the process loop ends without an exception.
11     reader.CancelPendingRead();
12 }
13 
14 public async Task ProcessMessagesAsync()
15 {
16     try
17     {
18         while (true)
19         {
20             ReadResult result = await reader.ReadAsync();
21             ReadOnlySequence<byte> buffer = result.Buffer;
22 
23             try
24             {
25                 if (result.IsCanceled)
26                 {
27                     // The read was canceled. You can quit without reading the existing data.
28                     break;
29                 }
30 
31                 // Process all messages from the buffer, modifying the input buffer on each
32                 // iteration.
33                 while (TryParseMessage(ref buffer, out Message message))
34                 {
35                     await ProcessMessageAsync(message);
36                 }
37 
38                 // There's no more data to be processed.
39                 if (result.IsCompleted)
40                 {
41                     break;
42                 }
43             }
44             finally
45             {
46                 // Since all messages in the buffer are being processed, you can use the
47                 // remaining buffer's Start and End position to determine consumed and examined.
48                 reader.AdvanceTo(buffer.Start, buffer.End);
49             }
50         }
51     }
52     finally
53     {
54         await reader.CompleteAsync();
55     }
56 }

PipeReader 常見問題

  • 將錯誤的值傳遞給 consumed 或 examined 可能會導致讀取已讀取的數據。

  • 傳遞 buffer.End 作為檢查對象可能會導致以下問題:

    • 數據停止
    • 如果數據未使用,可能最終會出現內存不足 (OOM) 異常。 例如,當一次處理來自緩沖區的單條消息時,可能會出現 PipeReader.AdvanceTo(position, buffer.End)
  • 將錯誤的值傳遞給 consumed 或 examined 可能會導致無限循環。 例如,如果 buffer.Start 沒有更改,則 PipeReader.AdvanceTo(buffer.Start) 將導致在下一個對 PipeReader.ReadAsync 的調用在新數據到來之前立即返回。

  • 將錯誤的值傳遞給 consumed 或 examined 可能會導致無限緩沖(最終導致 OOM)。

  • 在調用 PipeReader.AdvanceTo 之后使用 ReadOnlySequence<byte> 可能會導致內存損壞(在釋放之后使用)。

  • 未能調用 PipeReader.Complete/CompleteAsync 可能會導致內存泄漏。

  • 在處理緩沖區之前檢查 ReadResult.IsCompleted 並退出讀取邏輯會導致數據丟失。 循環退出條件應基於 ReadResult.Buffer.IsEmpty 和 ReadResult.IsCompleted。 如果錯誤執行此操作,可能會導致無限循環。

有問題的代碼

❌ 數據丟失

當 IsCompleted 被設置為 true 時,ReadResult 可能會返回最后一段數據。 在退出讀循環之前不讀取該數據將導致數據丟失。

 警告

不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
 6 
 7     if (result.IsCompleted)
 8     {
 9         break;
10     }
11 
12     Process(ref dataLossBuffer, out Message message);
13 
14     reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
15 }

 警告

不要使用上述代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 前面的示例用於解釋 PipeReader 常見問題

❌ 無限循環

如果 Result.IsCompleted 是 true,則以下邏輯可能會導致無限循環,但緩沖區中永遠不會有完整的消息。

 警告

不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
 6     if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
 7     {
 8         break;
 9     }
10 
11     Process(ref infiniteLoopBuffer, out Message message);
12 
13     reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
14 }

下面是另一段具有相同問題的代碼。 該代碼在檢查 ReadResult.IsCompleted 之前檢查非空緩沖區。 由於該代碼位於 else if 中,如果緩沖區中沒有完整的消息,它將永遠循環。

 警告

不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
 6 
 7     if (!infiniteLoopBuffer.IsEmpty)
 8     {
 9         Process(ref infiniteLoopBuffer, out Message message);
10     }
11     else if (result.IsCompleted)
12     {
13         break;
14     }
15 
16     reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
17 }

❌ 意外掛起

在分析單條消息時,如果無條件調用 PipeReader.AdvanceTo 而 buffer.End 位於 examined 位置,則可能導致掛起。 對 PipeReader.AdvanceTo 的下次調用將在以下情況下返回:

  • 有更多數據寫入管道。
  • 以及之前未檢查過新數據。

 警告

不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> hangBuffer = result.Buffer;
 6 
 7     Process(ref hangBuffer, out Message message);
 8 
 9     if (result.IsCompleted)
10     {
11         break;
12     }
13 
14     reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
15 
16     if (message != null)
17     {
18         return message;
19     }
20 }

❌ 內存不足 (OOM)

在滿足以下條件的情況下,以下代碼將保持緩沖,直到發生 OutOfMemoryException

  • 沒有最大消息大小。
  • 從 PipeReader 返回的數據不會生成完整的消息。 例如,它不會生成完整的消息,因為另一端正在編寫一條大消息(例如,一條為 4GB 的消息)。

 警告

不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題

 1 Environment.FailFast("This code is terrible, don't use it!");
 2 while (true)
 3 {
 4     ReadResult result = await reader.ReadAsync(cancellationToken);
 5     ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
 6 
 7     Process(ref thisCouldOutOfMemory, out Message message);
 8 
 9     if (result.IsCompleted)
10     {
11         break;
12     }
13 
14     reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
15 
16     if (message != null)
17     {
18         return message;
19     }
20 }

❌ 內存損壞

當寫入讀取緩沖區的幫助程序時,應在調用 Advance 之前復制任何返回的有效負載。 下面的示例將返回 Pipe 已丟棄的內存,並可能將其重新用於下一個操作(讀/寫)。

 警告

不要使用以下代碼 。 使用此示例將導致數據丟失、掛起和安全問題,並且不應復制 。 以下示例用於解釋 PipeReader 常見問題

1 public class Message
2 {
3     public ReadOnlySequence<byte> CorruptedPayload { get; set; }
4 }
 1 Environment.FailFast("This code is terrible, don't use it!");
 2     Message message = null;
 3 
 4     while (true)
 5     {
 6         ReadResult result = await reader.ReadAsync(cancellationToken);
 7         ReadOnlySequence<byte> buffer = result.Buffer;
 8 
 9         ReadHeader(ref buffer, out int length);
10 
11         if (length <= buffer.Length)
12         {
13             message = new Message
14             {
15                 // Slice the payload from the existing buffer
16                 CorruptedPayload = buffer.Slice(0, length)
17             };
18 
19             buffer = buffer.Slice(length);
20         }
21 
22         if (result.IsCompleted)
23         {
24             break;
25         }
26 
27         reader.AdvanceTo(buffer.Start, buffer.End);
28 
29         if (message != null)
30         {
31             // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
32             // was captured.
33             break;
34         }
35     }
36 
37     return message;
38 }

PipeWriter

PipeWriter 管理用於代表調用方寫入的緩沖區。 PipeWriter 實現IBufferWriter<byte>。 IBufferWriter<byte> 使得無需額外的緩沖區副本就可以訪問緩沖區來執行寫入操作。

 1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
 2 {
 3     // Request at least 5 bytes from the PipeWriter.
 4     Memory<byte> memory = writer.GetMemory(5);
 5 
 6     // Write directly into the buffer.
 7     int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
 8 
 9     // Tell the writer how many bytes were written.
10     writer.Advance(written);
11 
12     await writer.FlushAsync(cancellationToken);
13 }

之前的代碼:

  • 使用 GetMemory 從 PipeWriter 請求至少 5 個字節的緩沖區。
  • 將 ASCII 字符串 "Hello" 的字節寫入返回的 Memory<byte>
  • 調用 Advance 以指示寫入緩沖區的字節數。
  • 刷新 PipeWriter,以便將字節發送到基礎設備。

以前的寫入方法使用 PipeWriter 提供的緩沖區。 或者,PipeWriter.WriteAsync

  • 將現有緩沖區復制到 PipeWriter
  • 根據需要調用 GetSpan``Advance,然后調用 FlushAsync
1 async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
2 {
3     byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
4 
5     // Write helloBytes to the writer, there's no need to call Advance here
6     // (Write does that).
7     await writer.WriteAsync(helloBytes, cancellationToken);
8 }

取消

FlushAsync 支持傳遞 CancellationToken。 如果令牌在刷新掛起時被取消,則傳遞 CancellationToken 將導致 OperationCanceledException。 PipeWriter.FlushAsync 支持通過 PipeWriter.CancelPendingFlush 取消當前刷新操作而不引發異常的方法。 調用 PipeWriter.CancelPendingFlush 將導致對 PipeWriter.FlushAsync 或 PipeWriter.WriteAsync 的當前或下次調用返回 FlushResult,並將 IsCanceled 設置為 true。 這對於以非破壞性和非異常的方式停止暫停刷新非常有用。

PipeWriter 常見問題

  • GetSpan 和 GetMemory 返回至少具有請求內存量的緩沖區。 請勿假設確切的緩沖區大小 。
  • 無法保證連續的調用將返回相同的緩沖區或相同大小的緩沖區。
  • 在調用 Advance 之后,必須請求一個新的緩沖區來繼續寫入更多數據。 不能寫入先前獲得的緩沖區。
  • 如果未完成對 FlushAsync 的調用,則調用 GetMemory 或 GetSpan 將不安全。
  • 如果未刷新數據,則調用 Complete 或 CompleteAsync 可能導致內存損壞。

IDuplexPipe

IDuplexPipe 是支持讀寫的類型的協定。 例如,網絡連接將由 IDuplexPipe 表示。

與包含 PipeReader 和 PipeWriter 的 Pipe 不同,IDuplexPipe 代表全雙工連接的單側。 這意味着寫入 PipeWriter 的內容不會從 PipeReader 中讀取。

在讀取或寫入流數據時,通常使用反序列化程序讀取數據,並使用序列化程序寫入數據。 大多數讀取和寫入流 API 都有一個 Stream 參數。 為了更輕松地與這些現有 API 集成,PipeReader 和 PipeWriter 公開了一個 AsStream。 AsStream 返回圍繞 PipeReader 或 PipeWriter 的 Stream 實現。

 

System.IO.Pipelines——高性能IO(一) 

System.IO.Pipelines——高性能IO(二)   

System.IO.Pipelines——高性能IO(三)  

 


轉載請標明本文來源:https://www.cnblogs.com/yswenli/p/11810317.html
更多內容歡迎Star、Fork我的的github:https://github.com/yswenli/
如果發現本文有什么問題和任何建議,也隨時歡迎交流~

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM