.NET Standard支持一組新的API,System.Span
Pipelines旨在解決.NET編寫Socket通信程序時的很多困難,相信讀者也對此不勝其煩,使用stream模型進行編程,就算能夠解決,也是實在麻煩。
System.IO.Pipelines使用簡單的內存片段來管理數據,可以極大的簡化編寫程序的過程。關於Pipelines的詳細介紹,可以看看這里。現在ASP.NET Core中使用的Kestrel已經在使用這個API。(話說這個東西貌似就是Kestrel團隊搞出來的。)
可能是直接需要用Socket場景有限(物聯網用的還挺多的),Pipelines相關的資料感覺不是很多。官方給出的示例是基於ASCII協議的,有固定結尾的協議,這里我以物聯網設備常用的BINARY二進制自定義協議為例,講解基於Pipelines的程序套路。
System.IO.Pipelines
與基於Stream的方式不同,pipelines提供一個pipe,用於存儲數據,pipe中間存儲的數據有點鏈表的感覺,可以基於SequencePosition
進行slice操作,這樣就能得到一個ReadOnlySequence<T>
對象。reader可以進行自定義操作,並在操作完成之后告訴pipe已經處理了多少數據,整個過程是不需要進行內存復制操作的,因此性能得到了提升,還少了很多麻煩。可以簡單理解作為服務器端,流程:
接受數據循環:接到數據->放pipe里面->告訴pipe放了多少數據
處理數據循環:在pipe里面找一條完整數據->交給處理流程->告訴pipe處理了多少數據
協議
有一款設備,binary協議,數據包開頭0x75, 0xbd, 0x7e, 0x97一共4個字節,隨后跟數據包長度2個字節(固定2400字節,不固定長度也可以參照),隨后是數據區。在設備連接成功之后,數據主動從設備發送到PC。
關鍵代碼
雖然是.NET Core平台的,但是.NET FRAMEWORK 4.6.1上面也可以nuget安裝,直接
install-package system.io.pipelines
進行安裝就可以了。Socket相關處理的代碼不再寫了,只列關鍵的。
代碼第一步是聲明pipe。
private async void InitPipe(Socket socket)
{
Pipe pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(socket, pipe.Reader);
await Task.WhenAll(reading, writing);
}
pipe有reader還有一個writer,reader負責讀取pipe數據,主要用在數據處理循環,writer負責將數據寫入pipe,主要用在數據接受循環。
//寫入循環
private async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
//數據流量比較大,用1M字節作為buffer
const int minimumBufferSize = 1024 * 1024;
while (running)
{
try
{
//從writer中,獲得一段不少於指定大小的內存空間
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
//將內存空間變成ArraySegment,提供給socket使用
if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}
//接受數據
int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
//一次接受完畢,數據已經在pipe中,告訴pipe已經給它寫了多少數據。
writer.Advance(bytesRead);
}
catch
{
break;
}
// 提示reader可以進行讀取數據,reader可以繼續執行readAsync()方法
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// 告訴pipe完事了
writer.Complete();
}
//讀取循環
private async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
while (running)
{
//等待writer寫數據
ReadResult result = await reader.ReadAsync();
//獲得內存區域
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
//尋找head的第一個字節所在的位置
position = buffer.PositionOf((byte)0x75);
if (position != null)
{
//由於是連續四個字節作為head,需要進行比對,我這里直接使用了ToArray方法,還是有了內存拷貝動作,不是很理想,但是寫起來很方便。
//對性能有更高要求的場景,可以進行slice操作后的單獨比對,這樣不需要內存拷貝動作
var headtoCheck = buffer.Slice(position.Value, 4).ToArray();
//SequenceEqual需要引用System.Linq
if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))
{
//到這里,認為找到包開頭了(從position.value開始),接下來需要從開頭處截取整包的長度,需要先判斷長度是否足夠
if (buffer.Slice(position.Value).Length >= 2400)
{
//長度足夠,那么取出ReadOnlySequence,進行操作
var mes = buffer.Slice(position.Value, 2400);
//這里是數據處理的函數,可以參考官方文檔對ReadOnlySequence進行操作,文檔里面使用了span,那樣性能會好一些。我這里簡單實用ToArray()操作,這樣也有了內存拷貝的問題,但是處理的直接是byte數組了。
await ProcessMessage(mes.ToArray());
//這一段就算是完成了,從開頭位置,一整個包的長度就算完成了
var next = buffer.GetPosition(2400, position.Value);
//將buffer處理過的舍棄,替換為剩余的buffer引用
buffer = buffer.Slice(next);
}
else
{
//長度不夠,說明數據包不完整,等下一波數據進來再拼接,跳出循環。
break;
}
}
else
{
//第一個是0x75但是后面不匹配,可能有數據傳輸問題,那么需要舍棄第一個,0x75后面的字節開始再重新找0x75
var next = buffer.GetPosition(1, position.Value);
buffer = buffer.Slice(next);
}
}
}
while (position != null);
//數據處理完畢,告訴pipe還剩下多少數據沒有處理(數據包不完整的數據,找不到head)
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
reader.Complete();
}
以上代碼基本解決了以下問題:
- 數據接收不完整,找不到開頭結尾,導致數據大量丟棄,或者自己維護一個queue的代碼復雜性
- 數據接收與處理的同步問題
- 一次性收到多條數據的情況
后記
本文只是解釋了pipeline處理的模式,對於茫茫多的ToArray方法,可以使用基於Span的操作進行優化(有時間就來填坑)。另外,如果在await ProcessMessage(mes.ToArray());
這里,直接使用Task.Run(()=>ProcessMessage(mes);
代替的話,實測會出現莫名其妙的問題,很有可能是pipe運行快,在系統調度Task之前,已經將內存釋放導致的,如果需要優化這一塊的話,需要格外注意。