KestrelServer是基於Libuv開發的高性能web服務器,那我們現在就來看一下它是如何工作的。在上一篇文章中提到了Program的Main方法,在這個方法里Build了一個WebHost,我們再來看一下代碼:
public static void Main(string[] args)
{
var host = new WebHostBuilder()
.UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
.UseStartup<Startup>()
.Build();
host.Run();
}
里面有一個UseKestrel方法調用,這個方法的作用就是使用KestrelServer作為web server來提供web服務。在WebHost啟動的時候,調用了IServer的Start方法啟動服務,由於我們使用KestrelServer作為web server,自然這里調用的就是KestrelServer.Start方法,那我們來看下KestrelServer的Start方法里主要代碼:
首先,我們發現在Start方法里創建了一個KestrelEngine對象,具體代碼如下:
var engine = new KestrelEngine(new ServiceContext
{
FrameFactory = context =>
{
return new Frame<TContext>(application, context);
},
AppLifetime = _applicationLifetime,
Log = trace,
ThreadPool = new LoggingThreadPool(trace),
DateHeaderValueManager = dateHeaderValueManager,
ServerOptions = Options
});
KestrelEngine構造方法接受一個ServiceContext對象參數,ServiceContext里包含一個FrameFactory,從名稱上很好理解,就是Frame得工廠,Frame是什么?Frame是http請求處理對象,每個請求過來后,都會交給一個Frame對象進行受理,我們這里先記住它的作用,后面還會看到它是怎么實例化的。除了這個外,還有一個是AppLiftTime,它是一個IApplicationLifetime對象,它是整個應用生命周期的管理對象,前面沒有說到,這里補充上。
public interface IApplicationLifetime
{
/// <summary>
/// Triggered when the application host has fully started and is about to wait
/// for a graceful shutdown.
/// </summary>
CancellationToken ApplicationStarted { get; }
/// <summary>
/// Triggered when the application host is performing a graceful shutdown.
/// Requests may still be in flight. Shutdown will block until this event completes.
/// </summary>
CancellationToken ApplicationStopping { get; }
/// <summary>
/// Triggered when the application host is performing a graceful shutdown.
/// All requests should be complete at this point. Shutdown will block
/// until this event completes.
/// </summary>
CancellationToken ApplicationStopped { get; }
/// <summary>
/// Requests termination the current application.
/// </summary>
void StopApplication();
}
IApplicationLifetime中提供了三個時間點,
1,ApplicationStarted:應用程序已啟動
2,ApplicationStopping:應用程序正在停止
3,ApplicationStopped:應用程序已停止
我們可以通過CancellationToken.Register方法注冊回調方法,在上面說到的三個時間點,執行我們特定的業務邏輯。IApplicationLifetime是在WebHost的Start方法里創建的,如果想在我們自己的應用程序獲取這個對象,我們可以直接通過依賴注入的方式獲取即可。
我們繼續回到ServiceContext對象,這里面還包含了Log對象,用於跟蹤日志,一般我們是用來看程序執行的過程,並可以通過它發現程序執行出現問題的地方。還包含一個ServerOptions,它是一個KestrelServerOptions,里面包含跟服務相關的配置參數:
1,ThreadCount:服務線程數,表示服務啟動后,要開啟多少個服務線程,因為每個請求都會使用一個線程來進行處理,多線程會提高吞吐量,但是並不一定線程數越多越好,在系統里默認值是跟CPU內核數相等。
2,ShutdownTimeout:The amount of time after the server begins shutting down before connections will be forcefully closed(在應用程序開始停止到強制關閉當前請求連接所等待的時間,在這個時間段內,應用程序會等待請求處理完,如果還沒處理完,將強制關閉)
3,Limits:KestrelServerLimits對象,里面包含了服務限制參數,比如MaxRequestBufferSize,MaxResponseBufferSize
其他參數就不再一個一個說明了。
KestrelEngine對象創建好后,通過調用 engine.Start(threadCount),根據配置的threadcount進行服務線程KestrelThread實例化,代碼如下:
public void Start(int count) { for (var index = 0; index < count; index++) { Threads.Add(new KestrelThread(this)); } foreach (var thread in Threads) { thread.StartAsync().Wait(); } }
上面的代碼會創建指定數量的Thread對象,然后開始等待任務處理。KestrelThread是對libuv線程處理的封裝。
這些工作都准備好后,就開始啟動監聽服務了
foreach (var endPoint in listenOptions)
{
try
{
_disposables.Push(engine.CreateServer(endPoint));
}
catch (AggregateException ex)
{
if ((ex.InnerException as UvException)?.StatusCode == Constants.EADDRINUSE)
{
throw new IOException($"Failed to bind to address {endPoint}: address already in use.", ex);
}
throw;
}
// If requested port was "0", replace with assigned dynamic port.
_serverAddresses.Addresses.Add(endPoint.ToString());
}
上面紅色字體代碼,就是創建監聽服務的方法,我們再詳細看下里面的詳細情況:
public IDisposable CreateServer(ListenOptions listenOptions)
{
var listeners = new List<IAsyncDisposable>();
try
{
//如果前面創建的線程數量為1,直接創建listener對象,啟動監聽
if (Threads.Count == 1)
{
var listener = new Listener(ServiceContext);
listeners.Add(listener);
listener.StartAsync(listenOptions, Threads[0]).Wait();
}
else
{
//如果線程數不為1的時候
var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
var pipeMessage = Guid.NewGuid().ToByteArray();
//先創建一個主監聽對象,這個Listenerprimary就是一個Listener,監聽socket就是在這里面創建的
var listenerPrimary = new ListenerPrimary(ServiceContext);
listeners.Add(listenerPrimary);
//啟動監聽
listenerPrimary.StartAsync(pipeName, pipeMessage, listenOptions, Threads[0]).Wait();
//為剩余的每個服務線程關聯一個ListenerSecondary對象,這個對象使用命名Pipe與主監聽對象通信,在主監聽對象接收到請求后,通過pipe把接受的socket對象發送給特定的線程處理
foreach (var thread in Threads.Skip(1))
{
var listenerSecondary = new ListenerSecondary(ServiceContext);
listeners.Add(listenerSecondary);
listenerSecondary.StartAsync(pipeName, pipeMessage, listenOptions, thread).Wait();
}
}
return new Disposable(() =>
{
DisposeListeners(listeners);
});
}
catch
{
DisposeListeners(listeners);
throw;
}
}
這個時候服務就開始接受http請求了,我們前面說到了,監聽socket在listener類中創建(ListenerPrimary也是一個Listener),下面是listener的start方法
public Task StartAsync(
ListenOptions listenOptions,
KestrelThread thread)
{
ListenOptions = listenOptions;
Thread = thread;
var tcs = new TaskCompletionSource<int>(this);
Thread.Post(state =>
{
var tcs2 = (TaskCompletionSource<int>) state;
try
{
var listener = ((Listener) tcs2.Task.AsyncState);
//創建監聽socket
listener.ListenSocket = listener.CreateListenSocket();
//開始監聽,當有連接請求過來后,觸發ConnectionCallback方法
ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this);
tcs2.SetResult(0);
}
catch (Exception ex)
{
tcs2.SetException(ex);
}
}, tcs);
return tcs.Task;
}
ConnectionCallback:當連接請求過來后被觸發,在回調方法里,進行連接處理分發,連接分發代碼如下:
protected virtual void DispatchConnection(UvStreamHandle socket)
{
var connection = new Connection(this, socket);
connection.Start();
}
這個是listener類中的實現,我們前面看到,只有在線程數為1的情況下,才創建Listener對象進行監聽,否則創建ListenerPrimary監聽,ListenerPrimay里重寫了方法,它的實現如下:
protected override void DispatchConnection(UvStreamHandle socket)
{
//這里采用輪詢的方式,把連接請求依次分發給不同的線程進行處理
var index = _dispatchIndex++ % (_dispatchPipes.Count + 1);
if (index == _dispatchPipes.Count)
{
//
base.DispatchConnection(socket);
}
else
{
DetachFromIOCP(socket);
var dispatchPipe = _dispatchPipes[index];
//這里就是通過命名pipe,傳遞socket給特定的線程
var write = new UvWriteReq(Log);
write.Init(Thread.Loop);
write.Write2(
dispatchPipe,
_dummyMessage,
socket,
(write2, status, error, state) =>
{
write2.Dispose();
((UvStreamHandle)state).Dispose();
},
socket);
}
}
好了,連接請求找到處理線程后,后面就可以開始處理工作了。ListenerSecondary里的代碼比較復雜,其實最終都會調用下面的代碼完成Connection對象的創建
var connection = new Connection(this, socket); connection.Start();
Connection表示的就是當前連接,下面是它的構造方法
public Connection(ListenerContext context, UvStreamHandle socket) : base(context)
{
_socket = socket;
_connectionAdapters = context.ListenOptions.ConnectionAdapters;
socket.Connection = this;
ConnectionControl = this;
ConnectionId = GenerateConnectionId(Interlocked.Increment(ref _lastConnectionId));
if (ServerOptions.Limits.MaxRequestBufferSize.HasValue)
{
_bufferSizeControl = new BufferSizeControl(ServerOptions.Limits.MaxRequestBufferSize.Value, this);
}
//創建輸入輸出socket流
Input = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
Output = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
var tcpHandle = _socket as UvTcpHandle;
if (tcpHandle != null)
{
RemoteEndPoint = tcpHandle.GetPeerIPEndPoint();
LocalEndPoint = tcpHandle.GetSockIPEndPoint();
}
//創建處理frame,這里的framefactory就是前面創建KestrelEngine時創建的工廠
_frame = FrameFactory(this);
_lastTimestamp = Thread.Loop.Now();
}
然后調用Connection的Start方法開始進行處理,這里面直接把處理任務交給Frame處理,Start方法實現:
public void Start()
{
Reset();
//啟動了異步處理任務開始進行處理
_requestProcessingTask =
Task.Factory.StartNew(
(o) => ((Frame)o).RequestProcessingAsync(),//具體的處理方法
this,
default(CancellationToken),
TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default).Unwrap();
_frameStartedTcs.SetResult(null);
}
RequestProcessingAsync方法里不再詳細介紹了,把主要的代碼拿出來看一下:
。。。。。
//_application就是上一篇文章提到的HostApplication,首先調用CreateContext創建HttpContext對象 var context = _application.CreateContext(this); 。。。。。。
//進入處理管道 await _application.ProcessRequestAsync(context).ConfigureAwait(false); 。。。。。。
ProcessRequestAsync完成處理后,把結果輸出給客戶端,好到此介紹完畢。如果有問題,歡迎大家指點。
