KestrelServer是基於Libuv開發的高性能web服務器,那我們現在就來看一下它是如何工作的。在上一篇文章中提到了Program的Main方法,在這個方法里Build了一個WebHost,我們再來看一下代碼:
public static void Main(string[] args) { var host = new WebHostBuilder() .UseKestrel() .UseContentRoot(Directory.GetCurrentDirectory()) .UseIISIntegration() .UseStartup<Startup>() .Build(); host.Run(); }
里面有一個UseKestrel方法調用,這個方法的作用就是使用KestrelServer作為web server來提供web服務。在WebHost啟動的時候,調用了IServer的Start方法啟動服務,由於我們使用KestrelServer作為web server,自然這里調用的就是KestrelServer.Start方法,那我們來看下KestrelServer的Start方法里主要代碼:
首先,我們發現在Start方法里創建了一個KestrelEngine對象,具體代碼如下:
var engine = new KestrelEngine(new ServiceContext { FrameFactory = context => { return new Frame<TContext>(application, context); }, AppLifetime = _applicationLifetime, Log = trace, ThreadPool = new LoggingThreadPool(trace), DateHeaderValueManager = dateHeaderValueManager, ServerOptions = Options });
KestrelEngine構造方法接受一個ServiceContext對象參數,ServiceContext里包含一個FrameFactory,從名稱上很好理解,就是Frame得工廠,Frame是什么?Frame是http請求處理對象,每個請求過來后,都會交給一個Frame對象進行受理,我們這里先記住它的作用,后面還會看到它是怎么實例化的。除了這個外,還有一個是AppLiftTime,它是一個IApplicationLifetime對象,它是整個應用生命周期的管理對象,前面沒有說到,這里補充上。
public interface IApplicationLifetime { /// <summary> /// Triggered when the application host has fully started and is about to wait /// for a graceful shutdown. /// </summary> CancellationToken ApplicationStarted { get; } /// <summary> /// Triggered when the application host is performing a graceful shutdown. /// Requests may still be in flight. Shutdown will block until this event completes. /// </summary> CancellationToken ApplicationStopping { get; } /// <summary> /// Triggered when the application host is performing a graceful shutdown. /// All requests should be complete at this point. Shutdown will block /// until this event completes. /// </summary> CancellationToken ApplicationStopped { get; } /// <summary> /// Requests termination the current application. /// </summary> void StopApplication(); }
IApplicationLifetime中提供了三個時間點,
1,ApplicationStarted:應用程序已啟動
2,ApplicationStopping:應用程序正在停止
3,ApplicationStopped:應用程序已停止
我們可以通過CancellationToken.Register方法注冊回調方法,在上面說到的三個時間點,執行我們特定的業務邏輯。IApplicationLifetime是在WebHost的Start方法里創建的,如果想在我們自己的應用程序獲取這個對象,我們可以直接通過依賴注入的方式獲取即可。
我們繼續回到ServiceContext對象,這里面還包含了Log對象,用於跟蹤日志,一般我們是用來看程序執行的過程,並可以通過它發現程序執行出現問題的地方。還包含一個ServerOptions,它是一個KestrelServerOptions,里面包含跟服務相關的配置參數:
1,ThreadCount:服務線程數,表示服務啟動后,要開啟多少個服務線程,因為每個請求都會使用一個線程來進行處理,多線程會提高吞吐量,但是並不一定線程數越多越好,在系統里默認值是跟CPU內核數相等。
2,ShutdownTimeout:The amount of time after the server begins shutting down before connections will be forcefully closed(在應用程序開始停止到強制關閉當前請求連接所等待的時間,在這個時間段內,應用程序會等待請求處理完,如果還沒處理完,將強制關閉)
3,Limits:KestrelServerLimits對象,里面包含了服務限制參數,比如MaxRequestBufferSize,MaxResponseBufferSize
其他參數就不再一個一個說明了。
KestrelEngine對象創建好后,通過調用 engine.Start(threadCount),根據配置的threadcount進行服務線程KestrelThread實例化,代碼如下:
public void Start(int count) { for (var index = 0; index < count; index++) { Threads.Add(new KestrelThread(this)); } foreach (var thread in Threads) { thread.StartAsync().Wait(); } }
上面的代碼會創建指定數量的Thread對象,然后開始等待任務處理。KestrelThread是對libuv線程處理的封裝。
這些工作都准備好后,就開始啟動監聽服務了
foreach (var endPoint in listenOptions)
{
try
{
_disposables.Push(engine.CreateServer(endPoint));
}
catch (AggregateException ex)
{
if ((ex.InnerException as UvException)?.StatusCode == Constants.EADDRINUSE)
{
throw new IOException($"Failed to bind to address {endPoint}: address already in use.", ex);
}
throw;
}
// If requested port was "0", replace with assigned dynamic port.
_serverAddresses.Addresses.Add(endPoint.ToString());
}
上面紅色字體代碼,就是創建監聽服務的方法,我們再詳細看下里面的詳細情況:
public IDisposable CreateServer(ListenOptions listenOptions) { var listeners = new List<IAsyncDisposable>(); try {
//如果前面創建的線程數量為1,直接創建listener對象,啟動監聽 if (Threads.Count == 1) { var listener = new Listener(ServiceContext); listeners.Add(listener); listener.StartAsync(listenOptions, Threads[0]).Wait(); } else {
//如果線程數不為1的時候 var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n"); var pipeMessage = Guid.NewGuid().ToByteArray(); //先創建一個主監聽對象,這個Listenerprimary就是一個Listener,監聽socket就是在這里面創建的 var listenerPrimary = new ListenerPrimary(ServiceContext); listeners.Add(listenerPrimary);
//啟動監聽 listenerPrimary.StartAsync(pipeName, pipeMessage, listenOptions, Threads[0]).Wait(); //為剩余的每個服務線程關聯一個ListenerSecondary對象,這個對象使用命名Pipe與主監聽對象通信,在主監聽對象接收到請求后,通過pipe把接受的socket對象發送給特定的線程處理 foreach (var thread in Threads.Skip(1)) { var listenerSecondary = new ListenerSecondary(ServiceContext); listeners.Add(listenerSecondary); listenerSecondary.StartAsync(pipeName, pipeMessage, listenOptions, thread).Wait(); } } return new Disposable(() => { DisposeListeners(listeners); }); } catch { DisposeListeners(listeners); throw; } }
這個時候服務就開始接受http請求了,我們前面說到了,監聽socket在listener類中創建(ListenerPrimary也是一個Listener),下面是listener的start方法
public Task StartAsync( ListenOptions listenOptions, KestrelThread thread) { ListenOptions = listenOptions; Thread = thread; var tcs = new TaskCompletionSource<int>(this); Thread.Post(state => { var tcs2 = (TaskCompletionSource<int>) state; try { var listener = ((Listener) tcs2.Task.AsyncState);
//創建監聽socket listener.ListenSocket = listener.CreateListenSocket();
//開始監聽,當有連接請求過來后,觸發ConnectionCallback方法 ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this); tcs2.SetResult(0); } catch (Exception ex) { tcs2.SetException(ex); } }, tcs); return tcs.Task; }
ConnectionCallback:當連接請求過來后被觸發,在回調方法里,進行連接處理分發,連接分發代碼如下:
protected virtual void DispatchConnection(UvStreamHandle socket) { var connection = new Connection(this, socket); connection.Start(); }
這個是listener類中的實現,我們前面看到,只有在線程數為1的情況下,才創建Listener對象進行監聽,否則創建ListenerPrimary監聽,ListenerPrimay里重寫了方法,它的實現如下:
protected override void DispatchConnection(UvStreamHandle socket) {
//這里采用輪詢的方式,把連接請求依次分發給不同的線程進行處理 var index = _dispatchIndex++ % (_dispatchPipes.Count + 1); if (index == _dispatchPipes.Count) {
// base.DispatchConnection(socket); } else { DetachFromIOCP(socket); var dispatchPipe = _dispatchPipes[index];
//這里就是通過命名pipe,傳遞socket給特定的線程 var write = new UvWriteReq(Log); write.Init(Thread.Loop); write.Write2( dispatchPipe, _dummyMessage, socket, (write2, status, error, state) => { write2.Dispose(); ((UvStreamHandle)state).Dispose(); }, socket); } }
好了,連接請求找到處理線程后,后面就可以開始處理工作了。ListenerSecondary里的代碼比較復雜,其實最終都會調用下面的代碼完成Connection對象的創建
var connection = new Connection(this, socket); connection.Start();
Connection表示的就是當前連接,下面是它的構造方法
public Connection(ListenerContext context, UvStreamHandle socket) : base(context)
{
_socket = socket;
_connectionAdapters = context.ListenOptions.ConnectionAdapters;
socket.Connection = this;
ConnectionControl = this;
ConnectionId = GenerateConnectionId(Interlocked.Increment(ref _lastConnectionId));
if (ServerOptions.Limits.MaxRequestBufferSize.HasValue)
{
_bufferSizeControl = new BufferSizeControl(ServerOptions.Limits.MaxRequestBufferSize.Value, this);
}
//創建輸入輸出socket流
Input = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
Output = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
var tcpHandle = _socket as UvTcpHandle;
if (tcpHandle != null)
{
RemoteEndPoint = tcpHandle.GetPeerIPEndPoint();
LocalEndPoint = tcpHandle.GetSockIPEndPoint();
}
//創建處理frame,這里的framefactory就是前面創建KestrelEngine時創建的工廠
_frame = FrameFactory(this);
_lastTimestamp = Thread.Loop.Now();
}
然后調用Connection的Start方法開始進行處理,這里面直接把處理任務交給Frame處理,Start方法實現:
public void Start() { Reset();
//啟動了異步處理任務開始進行處理 _requestProcessingTask = Task.Factory.StartNew( (o) => ((Frame)o).RequestProcessingAsync(),//具體的處理方法 this, default(CancellationToken), TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); _frameStartedTcs.SetResult(null); }
RequestProcessingAsync方法里不再詳細介紹了,把主要的代碼拿出來看一下:
。。。。。
//_application就是上一篇文章提到的HostApplication,首先調用CreateContext創建HttpContext對象 var context = _application.CreateContext(this); 。。。。。。
//進入處理管道 await _application.ProcessRequestAsync(context).ConfigureAwait(false); 。。。。。。
ProcessRequestAsync完成處理后,把結果輸出給客戶端,好到此介紹完畢。如果有問題,歡迎大家指點。