上篇講到.net core web app是如何啟動並接受請求的,下面接着探索kestrel server是如何完成此任務的。
1.kestrel server的入口KestrelServer.Start
(Microsoft.AspNetCore.Hosting.Server.IHttpApplication
)
FrameFactory創建的frame實例最終會交給libuv的loop回調接收請求。但是在這過程中還是有很多的初始化工作需要做的。后面我們就管中窺豹來看一看。
public void Start<TContext>(IHttpApplication<TContext> application)
{
var engine = new KestrelEngine(new ServiceContext
{
FrameFactory = context =>
{
return new Frame<TContext>(application, context);
},
AppLifetime = _applicationLifetime,
Log = trace,
ThreadPool = new LoggingThreadPool(trace),
DateHeaderValueManager = dateHeaderValueManager,
ServerOptions = Options
});
//啟動引擎。完成libuv的配置和啟動
engine.Start(threadCount);
//針對綁定的多個地址創建server來接收請求。也就是針對ip:port來啟動tcp監聽
foreach (var address in _serverAddresses.Addresses.ToArray())
{
engine.CreateServer(ipv4Address);
}
}
2.啟動kestrel engine。engine.Start(threadCount);
啟動綁定的端口*最大處理線程的thread。並初始化libuv組件。
每一個線程初始化libuv,注冊loop回調等,並啟動libuv。
public void Start(int count)
{
for (var index = 0; index < count; index++)
{
Threads.Add(new KestrelThread(this));
}
foreach (var thread in Threads)
{
thread.StartAsync().Wait();
}
}
private void ThreadStart(object parameter)
{
lock (_startSync)
{
var tcs = (TaskCompletionSource<int>) parameter;
try
{
//初始化loop
_loop.Init(_engine.Libuv);
//注冊loop回調
//EnqueueCloseHandle:持有的資源釋放后的回調方法,回調往queue內增加一個item,事件循環該queue完成資源的最終釋放
_post.Init(_loop, OnPost, EnqueueCloseHandle);
//注冊心跳定時器
_heartbeatTimer.Init(_loop, EnqueueCloseHandle);
//啟動心跳定時器
_heartbeatTimer.Start(OnHeartbeat, timeout: HeartbeatMilliseconds, repeat: HeartbeatMilliseconds);
_initCompleted = true;
tcs.SetResult(0);
}
catch (Exception ex)
{
tcs.SetException(ex);
return;
}
}
try
{
//當前線程執行到Run()這里會掛起
_loop.Run();
//應用程序stop,shutdown之類的情況,libuv喚醒當前線程,完成資源清理
if (_stopImmediate)
{
// thread-abort form of exit, resources will be leaked
//線程中止形式的退出,資源會被泄露。
return;
}
// run the loop one more time to delete the open handles
//再次運行循環以刪除打開的句柄
_post.Reference();
_post.Dispose();
_heartbeatTimer.Dispose();
// Ensure the Dispose operations complete in the event loop.
//確保事件循環中的Dispose操作完成。
_loop.Run();
_loop.Dispose();
}
catch (Exception ex)
{
_closeError = ExceptionDispatchInfo.Capture(ex);
// Request shutdown so we can rethrow this exception
// in Stop which should be observable.
//請求關閉,以便我們可以重新拋出此異常在停止應該是可觀察的。
_appLifetime.StopApplication();
}
finally
{
_threadTcs.SetResult(null);
}
}
3.libuv啟動完成之后,接着就是處理訂閱注冊tcp了。
回到1的kestrel的start中。接着執行engine.CreateServer(ipv4Address);,這里和.net 里面的tcplistener不太一樣。.net里面就是listener bind,start,accept就好了。而libuv涉及到一個多路io復用的概念,這也是為什么使用他能高並發的原因。
public IDisposable CreateServer(ServerAddress address)
{
var usingPipes = address.IsUnixPipe;
var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
var single = Threads.Count == 1;
var first = true;
foreach (var thread in Threads)
{
if(single){}//single就不考慮,這種情況真是環境是不會這樣玩的
else if (first)
{
//根據當前平台創建tcp listener
var listener = usingPipes
? (ListenerPrimary)new PipeListenerPrimary(ServiceContext)
: new TcpListenerPrimary(ServiceContext);
listener.StartAsync(pipeName, address, thread).Wait();
}
else
{
//如果是多次對同一個ip:port做監聽
var listener = usingPipes
? (ListenerSecondary)new PipeListenerSecondary(ServiceContext)
: new TcpListenerSecondary(ServiceContext);
listener.StartAsync(pipeName, address, thread).Wait();
}
first = false;
}
}
tcplistener啟動細節,這里就只看TcpListenerPrimary了。
首先說明一下TcpListenerPrimary這個類的繼承關系:TcpListenerPrimary -->ListenerPrimary -->Listener。這樣才有助於后續代碼的理解。
后續代碼到處都能看到thread.post/postaysnc的代碼。這玩意的意思是把傳入的action放到libuv loop中,並激活異步完成回調。libuv另一個重要的概念各種回調。
1.接着上面的代碼,我們進入TcpListenerPrimary.StartAsync()方法。方法在ListenerPrimary中。
public async Task StartAsync(string pipeName, ServerAddress address, KestrelThread thread)
{
_pipeName = pipeName;
await StartAsync(address, thread).ConfigureAwait(false);
await Thread.PostAsync(state => ((ListenerPrimary)state).PostCallback(), this).ConfigureAwait(false);
}
2.接着上面的代碼進入StartAsync(address, thread)。他是父類Listener的方法。
public Task StartAsync(ServerAddress address, KestrelThread thread)
{
ServerAddress = address; Thread = thread;
var tcs = new TaskCompletionSource<int>(this);
Thread.Post(state =>
{
var tcs2 = (TaskCompletionSource<int>)state;
var listener = ((Listener)tcs2.Task.AsyncState);
//創建socket
listener.ListenSocket = listener.CreateListenSocket();
////socket監聽,libu注冊監聽並設置回調函數,最大隊列。
ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this);
tcs2.SetResult(0);
}, tcs);
return tcs.Task;
}
protected override UvStreamHandle CreateListenSocket()
{
//初始化socket並bind到address
var socket = new UvTcpHandle(Log);
socket.Init(Thread.Loop, Thread.QueueCloseHandle);
//是否使用Nagle's algorithm算法。
socket.NoDelay(ServerOptions.NoDelay);
socket.Bind(ServerAddress);
// If requested port was "0", replace with assigned dynamic port.
ServerAddress.Port = socket.GetSockIPEndPoint().Port;
return socket;
}
在接着上面的代碼ListenSocket.Listen成功之后,libuv回調ConnectionCallback函數。
進入ConnectionCallback函數,完成重要的listen Accept.
step1:listen成功libuv回調ConnectionCallback方法。
step2:初始化接收請求socket,並將之關聯到監聽socket
step3:適配接收請求socket,如果是第一次適配的話則創建connection
step4:創建connection並啟動
step5:new connection 關聯 Frame
step6:啟動frame
step7:由Connection類調用一次以開始RequestProcessingAsync循環。
step8:循環接收請求,接收請求到之后交給上層程序處理
private static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state)
{
var listener = (Listener)state;
listener.OnConnection(stream, status);//step 1
}
protected override void OnConnection(UvStreamHandle listenSocket, int status)//step 2
{
var acceptSocket = new UvTcpHandle(Log);
acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
acceptSocket.NoDelay(ServerOptions.NoDelay);
listenSocket.Accept(acceptSocket);
DispatchConnection(acceptSocket);
}
protected override void DispatchConnection(UvStreamHandle socket)// step 3
{
var index = _dispatchIndex++ % (_dispatchPipes.Count + 1);
if (index == _dispatchPipes.Count)
{
base.DispatchConnection(socket);
}
else
{
DetachFromIOCP(socket);
var dispatchPipe = _dispatchPipes[index];
var write = new UvWriteReq(Log);
write.Init(Thread.Loop);
write.Write2(dispatchPipe, _dummyMessage, socket,
(write2, status, error, state) =>
{
write2.Dispose();
((UvStreamHandle)state).Dispose();
},
socket);
}
}
protected virtual void DispatchConnection(UvStreamHandle socket)//step 4
{
var connection = new Connection(this, socket);
connection.Start();
}
private Func<ConnectionContext, Frame> FrameFactory => ListenerContext.ServiceContext.FrameFactory;
public Connection(ListenerContext context, UvStreamHandle socket) : base(context)//step 5
{
SocketInput = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
SocketOutput = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
//重點代碼在這里,FrameFactory是一個委托,是KestrelServer.Start中注冊的action
_frame = FrameFactory(this);
}
public void Start()//step 6
{
Log.ConnectionStart(ConnectionId);
// Start socket prior to applying the ConnectionFilter
_socket.ReadStart(_allocCallback, _readCallback, this);
_frame.Start();
}
/// <summary>
/// Called once by Connection class to begin the RequestProcessingAsync loop.
/// </summary>
public void Start()//step 7
{
Reset();
_requestProcessingTask =
Task.Factory.StartNew(
(o) => ((Frame)o).RequestProcessingAsync(),
this,
default(CancellationToken),
TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default).Unwrap();
}
/// <summary>
/// 主循環消耗套接字輸入,將其解析為協議幀,並調用應用程序委托,只要套接字打算保持打開。
/// 從此循環得到的任務將保留在服務器需要時使用的字段中以排除和關閉所有當前活動的連接。
/// </summary>
public override async Task RequestProcessingAsync()
{
while (!_requestProcessingStopping)
{
InitializeHeaders();
var context = _application.CreateContext(this);
await _application.ProcessRequestAsync(context).ConfigureAwait(false);
}
}