asp.net core mvc剖析:KestrelServer


KestrelServer是基於Libuv開發的高性能web服務器,那我們現在就來看一下它是如何工作的。在上一篇文章中提到了Program的Main方法,在這個方法里Build了一個WebHost,我們再來看一下代碼:

public static void Main(string[] args)
   {
       var host = new WebHostBuilder()
           .UseKestrel()
           .UseContentRoot(Directory.GetCurrentDirectory())
           .UseIISIntegration()
           .UseStartup<Startup>()
           .Build();
 
       host.Run();
   }

  里面有一個UseKestrel方法調用,這個方法的作用就是使用KestrelServer作為web server來提供web服務。在WebHost啟動的時候,調用了IServer的Start方法啟動服務,由於我們使用KestrelServer作為web server,自然這里調用的就是KestrelServer.Start方法,那我們來看下KestrelServer的Start方法里主要代碼:

 首先,我們發現在Start方法里創建了一個KestrelEngine對象,具體代碼如下:

var engine = new KestrelEngine(new ServiceContext
{
       FrameFactory = context =>
       {
           return new Frame<TContext>(application, context);
       },
       AppLifetime = _applicationLifetime,
       Log = trace,
       ThreadPool = new LoggingThreadPool(trace),
       DateHeaderValueManager = dateHeaderValueManager,
       ServerOptions = Options
 });

  KestrelEngine構造方法接受一個ServiceContext對象參數,ServiceContext里包含一個FrameFactory,從名稱上很好理解,就是Frame得工廠,Frame是什么?Frame是http請求處理對象,每個請求過來后,都會交給一個Frame對象進行受理,我們這里先記住它的作用,后面還會看到它是怎么實例化的。除了這個外,還有一個是AppLiftTime,它是一個IApplicationLifetime對象,它是整個應用生命周期的管理對象,前面沒有說到,這里補充上。

public interface IApplicationLifetime
    {
        /// <summary>
        /// Triggered when the application host has fully started and is about to wait
        /// for a graceful shutdown.
        /// </summary>
        CancellationToken ApplicationStarted { get; }

        /// <summary>
        /// Triggered when the application host is performing a graceful shutdown.
        /// Requests may still be in flight. Shutdown will block until this event completes.
        /// </summary>
        CancellationToken ApplicationStopping { get; }

        /// <summary>
        /// Triggered when the application host is performing a graceful shutdown.
        /// All requests should be complete at this point. Shutdown will block
        /// until this event completes.
        /// </summary>
        CancellationToken ApplicationStopped { get; }

        /// <summary>
        /// Requests termination the current application.
        /// </summary>
        void StopApplication();
    }

  IApplicationLifetime中提供了三個時間點,

  1,ApplicationStarted:應用程序已啟動
  2,ApplicationStopping:應用程序正在停止
  3,ApplicationStopped:應用程序已停止

  我們可以通過CancellationToken.Register方法注冊回調方法,在上面說到的三個時間點,執行我們特定的業務邏輯。IApplicationLifetime是在WebHost的Start方法里創建的,如果想在我們自己的應用程序獲取這個對象,我們可以直接通過依賴注入的方式獲取即可。

 我們繼續回到ServiceContext對象,這里面還包含了Log對象,用於跟蹤日志,一般我們是用來看程序執行的過程,並可以通過它發現程序執行出現問題的地方。還包含一個ServerOptions,它是一個KestrelServerOptions,里面包含跟服務相關的配置參數:

1,ThreadCount:服務線程數,表示服務啟動后,要開啟多少個服務線程,因為每個請求都會使用一個線程來進行處理,多線程會提高吞吐量,但是並不一定線程數越多越好,在系統里默認值是跟CPU內核數相等。

2,ShutdownTimeout:The amount of time after the server begins shutting down before connections will be forcefully closed(在應用程序開始停止到強制關閉當前請求連接所等待的時間,在這個時間段內,應用程序會等待請求處理完,如果還沒處理完,將強制關閉)

3,Limits:KestrelServerLimits對象,里面包含了服務限制參數,比如MaxRequestBufferSize,MaxResponseBufferSize

其他參數就不再一個一個說明了。

KestrelEngine對象創建好后,通過調用 engine.Start(threadCount),根據配置的threadcount進行服務線程KestrelThread實例化,代碼如下:
     public void Start(int count)
        {
            for (var index = 0; index < count; index++)
            {
                Threads.Add(new KestrelThread(this));
            }

            foreach (var thread in Threads)
            {
                thread.StartAsync().Wait();
            }
        }

 上面的代碼會創建指定數量的Thread對象,然后開始等待任務處理。KestrelThread是對libuv線程處理的封裝。

這些工作都准備好后,就開始啟動監聽服務了

 foreach (var endPoint in listenOptions)
                {
                    try
                    {
                        _disposables.Push(engine.CreateServer(endPoint));
                    }
                    catch (AggregateException ex)
                    {
                        if ((ex.InnerException as UvException)?.StatusCode == Constants.EADDRINUSE)
                        {
                            throw new IOException($"Failed to bind to address {endPoint}: address already in use.", ex);
                        }

                        throw;
                    }

                    // If requested port was "0", replace with assigned dynamic port.
                    _serverAddresses.Addresses.Add(endPoint.ToString());
                }

  上面紅色字體代碼,就是創建監聽服務的方法,我們再詳細看下里面的詳細情況:

      public IDisposable CreateServer(ListenOptions listenOptions)
        {
            var listeners = new List<IAsyncDisposable>();

            try
            {
//如果前面創建的線程數量為1,直接創建listener對象,啟動監聽 if (Threads.Count == 1) { var listener = new Listener(ServiceContext); listeners.Add(listener); listener.StartAsync(listenOptions, Threads[0]).Wait(); } else {
            //如果線程數不為1的時候 var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n"); var pipeMessage = Guid.NewGuid().ToByteArray();            //先創建一個主監聽對象,這個Listenerprimary就是一個Listener,監聽socket就是在這里面創建的 var listenerPrimary = new ListenerPrimary(ServiceContext); listeners.Add(listenerPrimary);
            //啟動監聽 listenerPrimary.StartAsync(pipeName, pipeMessage, listenOptions, Threads[0]).Wait(); //為剩余的每個服務線程關聯一個ListenerSecondary對象,這個對象使用命名Pipe與主監聽對象通信,在主監聽對象接收到請求后,通過pipe把接受的socket對象發送給特定的線程處理 foreach (var thread in Threads.Skip(1)) { var listenerSecondary = new ListenerSecondary(ServiceContext); listeners.Add(listenerSecondary); listenerSecondary.StartAsync(pipeName, pipeMessage, listenOptions, thread).Wait(); } } return new Disposable(() => { DisposeListeners(listeners); }); } catch { DisposeListeners(listeners); throw; } }

  這個時候服務就開始接受http請求了,我們前面說到了,監聽socket在listener類中創建(ListenerPrimary也是一個Listener),下面是listener的start方法

      public Task StartAsync(
            ListenOptions listenOptions,
            KestrelThread thread)
        {
            ListenOptions = listenOptions;
            Thread = thread;

            var tcs = new TaskCompletionSource<int>(this);

            Thread.Post(state =>
            {
                var tcs2 = (TaskCompletionSource<int>) state;
                try
                {
                    var listener = ((Listener) tcs2.Task.AsyncState);
//創建監聽socket listener.ListenSocket = listener.CreateListenSocket();
//開始監聽,當有連接請求過來后,觸發ConnectionCallback方法 ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this); tcs2.SetResult(0); } catch (Exception ex) { tcs2.SetException(ex); } }, tcs); return tcs.Task; }

  ConnectionCallback:當連接請求過來后被觸發,在回調方法里,進行連接處理分發,連接分發代碼如下:

     protected virtual void DispatchConnection(UvStreamHandle socket)
        {
            var connection = new Connection(this, socket);
            connection.Start();
        }

  這個是listener類中的實現,我們前面看到,只有在線程數為1的情況下,才創建Listener對象進行監聽,否則創建ListenerPrimary監聽,ListenerPrimay里重寫了方法,它的實現如下:

     protected override void DispatchConnection(UvStreamHandle socket)
        {
//這里采用輪詢的方式,把連接請求依次分發給不同的線程進行處理 var index = _dispatchIndex++ % (_dispatchPipes.Count + 1); if (index == _dispatchPipes.Count) {
   // base.DispatchConnection(socket); } else { DetachFromIOCP(socket); var dispatchPipe = _dispatchPipes[index];
//這里就是通過命名pipe,傳遞socket給特定的線程 var write = new UvWriteReq(Log); write.Init(Thread.Loop); write.Write2( dispatchPipe, _dummyMessage, socket, (write2, status, error, state) => { write2.Dispose(); ((UvStreamHandle)state).Dispose(); }, socket); } }

  好了,連接請求找到處理線程后,后面就可以開始處理工作了。ListenerSecondary里的代碼比較復雜,其實最終都會調用下面的代碼完成Connection對象的創建

 var connection = new Connection(this, socket);
connection.Start();

  Connection表示的就是當前連接,下面是它的構造方法

public Connection(ListenerContext context, UvStreamHandle socket) : base(context)
        {
            _socket = socket;
            _connectionAdapters = context.ListenOptions.ConnectionAdapters;
            socket.Connection = this;
            ConnectionControl = this;

            ConnectionId = GenerateConnectionId(Interlocked.Increment(ref _lastConnectionId));

            if (ServerOptions.Limits.MaxRequestBufferSize.HasValue)
            {
                _bufferSizeControl = new BufferSizeControl(ServerOptions.Limits.MaxRequestBufferSize.Value, this);
            }
        //創建輸入輸出socket流
            Input = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
            Output = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);

            var tcpHandle = _socket as UvTcpHandle;
            if (tcpHandle != null)
            {
                RemoteEndPoint = tcpHandle.GetPeerIPEndPoint();
                LocalEndPoint = tcpHandle.GetSockIPEndPoint();
            }
        //創建處理frame,這里的framefactory就是前面創建KestrelEngine時創建的工廠
            _frame = FrameFactory(this);
            _lastTimestamp = Thread.Loop.Now();
        }

  然后調用Connection的Start方法開始進行處理,這里面直接把處理任務交給Frame處理,Start方法實現:

public void Start()
        {
            Reset();
       //啟動了異步處理任務開始進行處理 _requestProcessingTask = Task.Factory.StartNew( (o) => ((Frame)o).RequestProcessingAsync(),//具體的處理方法 this, default(CancellationToken), TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); _frameStartedTcs.SetResult(null); }

  

RequestProcessingAsync方法里不再詳細介紹了,把主要的代碼拿出來看一下:
。。。。。
//_application就是上一篇文章提到的HostApplication,首先調用CreateContext創建HttpContext對象 var context = _application.CreateContext(this); 。。。。。。
//進入處理管道 await _application.ProcessRequestAsync(context).ConfigureAwait(false); 。。。。。。

  

ProcessRequestAsync完成處理后,把結果輸出給客戶端,好到此介紹完畢。如果有問題,歡迎大家指點。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM