相信大家都清楚asp core有着非常出色的性能,它出色的性能也源於網絡服務模塊Kestrel;在techempower測試中Kestrel基礎核心達到了700萬級別的RPS吞吐能力,具備這樣的能力那對應的Kestrel.Transport.Sockets也應有着不錯的性能。接下來簡單地分析一下Kestrel.Transport.Sockets
的設計和使用,並進行簡單的並發處理能力測試。
async/await
async/await的使用這幾年時間里大放異彩,現有新功能的IO操作方式無一不支持它,畢竟可以同步的代碼方式來實現異步處理功能,不管是開發,調試還是維護都帶來的極大的便利性;既然這樣Kestrel.Transport.Sockets
也在基礎的socket異步基礎功能上引入了async/await設計,大大簡化了上層應用編寫的復雜度;下面看一下針對SocketAsyncEventArgs封裝的Awaitable。
public class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion { private static readonly Action _callbackCompleted = () => { }; private readonly PipeScheduler _ioScheduler; private Action _callback; public SocketAwaitableEventArgs(PipeScheduler ioScheduler) { _ioScheduler = ioScheduler; } public SocketAwaitableEventArgs GetAwaiter() => this; public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); public int GetResult() { Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); _callback = null; if (SocketError != SocketError.Success) { ThrowSocketException(SocketError); } return BytesTransferred; void ThrowSocketException(SocketError e) { throw new SocketException((int)e); } } public void OnCompleted(Action continuation) { if (ReferenceEquals(_callback, _callbackCompleted) || ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) { Task.Run(continuation); } } public void UnsafeOnCompleted(Action continuation) { OnCompleted(continuation); } public void Complete() { OnCompleted(this); } protected override void OnCompleted(SocketAsyncEventArgs _) { var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); if (continuation != null) { _ioScheduler.Schedule(state => ((Action)state)(), continuation); } } }
這個Awaitable的設計得非常好,它沒沒有引用新的對象,而是直接在SocketAsyncEventArgs
的基礎派生下來實現,這樣在高並吞吐的情況可以更好地降低新對象的開銷;這即能使用await的同時也無需增加對象的開銷,不過PipeScheduler的調用設計竟然使用了匿名函數的方式帶入,這樣會增加了對象的開銷;還有就是SocketAsyncEventArgs
完成后還投遞給一個線程調度去完成后面的工作,如果協議分析的工作量不大的情況個人感覺這個處理有些重了,不過使用都可以實現自己的PipeScheduler或直接改成執行continuation,最好是根據情況來配置最佳。
引入System.IO.Pipelines
在之前的文章已經說過Pipe,它是一個Buffer讀寫對象,其重要作用是可以把不連續的數據內存塊連接起來處理起來,這樣可以使普通開發人員避開Buffer的創建和回收的繁瑣工作(畢竟這一塊工作要做好還是有點難度的)。Pipe不緊緊提供了不連續數據Buffer的讀寫,它還具備一套await狀態機制可以讓使用人員從socket的receive和send工作分離出來。每個連接會分配兩個Pipe對象,主要負責Socket的receive和send工作;其工作原理如下:
基於Pipe使用者只需要關心應用協議處理處理即可,而這個處理會變得非常簡單;只需要關注Pipe的Writer和Reader即可。雖然這樣做帶來了便利性,但經過Pipe多了兩層狀態通訊多多少少會有性能上的影響,但這些影響相對Buffer開銷,GC和處理來說則還是有比較好的回報的。這里還是要重吐嘲一下MS,為什么Writer和Reader不按BinaryReader和BinaryWriter的基准作為設計,其實Pipe對普通使用者來說還是不怎友好的!
使用
Kestrel.Transport.Sockets
的使用還真有點讓人頭痛,首先它沒有完善的文檔,還有設計集成度也比較高。要搞清楚怎么用對於新手來說還真不怎容易,出於研究它的設計和對比查看了一段時間源碼才總結出來如何用;最終發現要用得好還需真要再做一層封裝才能更好的用於實限應用中;下面講解一下如何簡單地使用它吧,首先你要在Nuget中引用它。
構建
Kestrel.Transport.Sockets
的使用入口是SocketTransportFactory
,只要能構建這個對象那接下工作就簡單很多,首先看一下這個對象的構造函數
public SocketTransportFactory(IOptions<SocketTransportOptions> options, IApplicationLifetime applicationLifetime, ILoggerFactory loggerFactory);
三個參數都是接口……沒有文檔的情況還真有點頭痛。ILoggerFactory引用Microsoft.Extensions.Logging可以得到,剩下兩個簡單地實現一下即可。
IOptions<SocketTransportOptions>
public class SocketOpetion : IOptions<SocketTransportOptions> { public SocketTransportOptions Value => new SocketTransportOptions(); }
IApplicationLifetime
public class ApplicationLifetime : IApplicationLifetime { public ApplicationLifetime() : this(new CancellationToken(), new CancellationToken(), new CancellationToken()) { } public ApplicationLifetime(CancellationToken started, CancellationToken stopping, CancellationToken stoped) { ApplicationStarted = started; ApplicationStopping = stopping; ApplicationStopped = stoped; } public CancellationToken ApplicationStarted { get; set; } public CancellationToken ApplicationStopping { get; set; } public CancellationToken ApplicationStopped { get; set; } public virtual void StopApplication() { } }
創建服務
以上接口的實現都有了,接下來就可以創建SocketTransportFactory
對象了
private static async void ListenSocket(int prot) { var loggerFactory = new LoggerFactory(); ApplicationLifetime applicationLifetime = new ApplicationLifetime(); var server = new SocketTransportFactory(new SocketOpetion(), applicationLifetime, loggerFactory); await server.Create(new AnyEndPointInformation(prot), new Program()).BindAsync(); }
同樣SocketTransportFactory
的Create方法也需要兩個接口參數,一個是監聽類型和地址描述,一個連接調度器。這里只需要IP端口監聽所以實現起來比較簡單:
public class AnyEndPointInformation : IEndPointInformation { public AnyEndPointInformation(int port) { IPEndPoint = new IPEndPoint(IPAddress.Any, port); } public ListenType Type => ListenType.IPEndPoint; public IPEndPoint IPEndPoint { get; set; } public string SocketPath => throw new NotImplementedException(); public ulong FileHandle => throw new NotImplementedException(); public FileHandleType HandleType { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } public bool NoDelay => true; }
接下來的工作就在IConnectionDispatcher
接口的OnConnection方法下處理連接
public void OnConnection(TransportConnection connection) { Session session = new Session(connection); Task.Run(session.StartRecive); }
剛開始以為有了TransportConnection
就可以進行數據接收和發送,但事情是我想得太簡單了!其實TransportConnection並不具備數據處理能力,因為里面兩路的Pipe是空的……使用者需要自己定義對應的Pipe並設置給它,以上代碼的Session是需要自己實現的,名稱隨自己喜歡定義;實現接口IDuplexPipe
,設置兩路的Pipe對象,然后設置到TransportConnection.Application
屬性上。實現IDuplexPipe
后就可以進行數據接收和發送功能了,以下是實現了一個簡單的StartRecive
后回發數據,有收有發才便於下面測試的工作。
public async Task StartRecive() { while (true) { var data = await Receiver.ReadAsync(); if (data.IsCompleted) { this.Dispose(); break; } var buffers = data.Buffer; var end = buffers.End; if (buffers.IsSingleSegment) { ReadOnlyMemory<byte> b = buffers.First; var sbuf = Sender.GetMemory(b.Length); b.CopyTo(sbuf); Sender.Advance(b.Length); } else { foreach (var b in buffers) { var sbuf = Sender.GetMemory(b.Length); b.CopyTo(sbuf); Sender.Advance(b.Length); } } var flush = await Sender.FlushAsync(); Receiver.AdvanceTo(end); } }
測試
既然研究它自然就會關心它的性能情況,針對以上最簡單接收后返回的功能進行了一個壓力測試。測試結果總體上來說還算不錯,但算不上非常出色;最終測結果在一台E3 1230V2的PC機上測試結果是:10000連接,接近20萬rps。