helios架構詳解(一)服務器端架構


看了“菜鳥耕地”的”.NET開源高性能Socket通信中間件Helios介紹及演示“,覺得這個東西不錯。但是由於沒有網絡編程知識,所以高性能部分我就講不出來了,主要是想根據開源代碼跟大家分享下Helios的架構。

源代碼下載地址:https://github.com/helios-io/helios

 首先我們獻上服務器端結構圖:

這樣的一個大圖片,估計很多地方都挺迷糊的,我們就詳細的講解下期中的邏輯。

ServerBootstrap類

該類是服務器端的核心類,服務器端提供服務的就是ServerBootstrap對象(實際上是它的子類,並且子類是由這個對象創建的)。

 創建代碼時我們會使用代碼

 var serverFactory =
                new ServerBootstrap()
                    .SetTransport(TransportType.Tcp)
                    .Build();
  • 該類有三個核心屬性:IExecutor 、IServerFactory(IConnectionFactory)、NetworkEventLoop。
    public class ServerBootstrap : AbstractBootstrap
    {
        protected IExecutor InternalExecutor { get; set; }

        protected NetworkEventLoop EventLoop
        {
            get
            {
                return EventLoopFactory.CreateNetworkEventLoop(Workers, InternalExecutor);
            }
        }
        protected override IConnectionFactory BuildInternal()
        {
            switch (Type)
            {
                case TransportType.Tcp:
                    return new TcpServerFactory(this);
                case TransportType.Udp:
                    return new UdpServerFactory(this);
                default:
                    throw new InvalidOperationException("This shouldn't happen");
            }
        }

        public new IServerFactory Build()
        {
            return (IServerFactory) BuildInternal();
        }

    }
核心屬性和方法
  • 另外一個有特點的地方就是鏈式編程(可能借鑒於jquery),設置對象都返回個this指針。
    public class ServerBootstrap : AbstractBootstrap
    {
       public ServerBootstrap WorkersShareFiber(bool shareFiber)
        {
            UseSharedFiber = shareFiber;
            SetOption("proxiesShareFiber", UseSharedFiber);
            return this;
        }

        public new ServerBootstrap SetTransport(TransportType type)
        {
            base.SetTransport(type);
            return this;
        }

        public ServerBootstrap WorkerThreads(int workerThreadCount)
        {
            if (workerThreadCount < 1) throw new ArgumentException("Can't be below 1", "workerThreadCount");
            Workers = workerThreadCount;
            return this;
        }

        public ServerBootstrap BufferSize(int bufferSize)
        {
            if (bufferSize < 1024) throw new ArgumentException("Can't be below 1024", "bufferSize");
            BufferBytes = bufferSize;
            return this;
        }

        public ServerBootstrap WorkersAreProxies(bool useProxies)
        {
            UseProxies = useProxies;
            return this;
        }

        public ServerBootstrap Executor(IExecutor executor)
        {
            if (executor == null) throw new ArgumentNullException("executor");
            InternalExecutor = executor;
            return this;
        }

        public new ServerBootstrap SetConfig(IConnectionConfig config)
        {
            base.SetConfig(config);
            return this;
        }

        public new ServerBootstrap SetDecoder(IMessageDecoder decoder)
        {
            base.SetDecoder(decoder);
            return this;
        }

        public new ServerBootstrap SetEncoder(IMessageEncoder encoder)
        {
            base.SetEncoder(encoder);
            return this;
        }

        public new ServerBootstrap SetAllocator(IByteBufAllocator allocator)
        {
            base.SetAllocator(allocator);
            return this;
        }

        public new ServerBootstrap OnConnect(ConnectionEstablishedCallback connectionEstablishedCallback)
        {
            base.OnConnect(connectionEstablishedCallback);
            return this;
        }

        public new ServerBootstrap OnDisconnect(ConnectionTerminatedCallback connectionTerminatedCallback)
        {
            base.OnDisconnect(connectionTerminatedCallback);
            return this;
        }

        public new ServerBootstrap OnReceive(ReceivedDataCallback receivedDataCallback)
        {
            base.OnReceive(receivedDataCallback);
            return this;
        }
        public new ServerBootstrap OnError(ExceptionCallback exceptionCallback)
        {
            base.OnError(exceptionCallback);
            return this;
        }

        public new ServerBootstrap SetOption(string optionKey, object optionValue)
        {
            base.SetOption(optionKey, optionValue);
            return this;
        }
}
鏈式編程

我們調用最后,肯定是使用build方法,而build方法實際上調用的是BuildInternal內部方法,而該這又是一個工廠模式(和后滿ServerFactory組成抽象工廠??),會返回TcpServerFactory或者UdpServerFactory。

TcpServerFactory和UdpServerFactory

這倆個類其實沒有什么核心代碼,但是你網上追溯父類的時候你會發現TcpServerFactory(UdpServerFactory)=>ServerFactoryBase => ServerBootstrap。它們依舊是ServerBootstrap對象。不過不同的地方就是,他們除了爹還有了一個媽媽ServerFactoryBase =>IServerFactory =>IConnectionFactory。

我們看下ServerFactoryBase 源碼:

    public abstract class ServerFactoryBase : ServerBootstrap, IServerFactory
    {
        protected ServerFactoryBase(ServerBootstrap other)
            : base(other)
        {
        }

        protected abstract ReactorBase NewReactorInternal(INode listenAddress);

        public IReactor NewReactor(INode listenAddress)
        {
            var reactor = NewReactorInternal(listenAddress);
            reactor.Configure(Config);

            if (ReceivedData != null)
                reactor.OnReceive += (ReceivedDataCallback)ReceivedData.Clone();
            if (ConnectionEstablishedCallback != null)
                reactor.OnConnection += (ConnectionEstablishedCallback)ConnectionEstablishedCallback.Clone();
            if (ConnectionTerminatedCallback != null)
                reactor.OnDisconnection += (ConnectionTerminatedCallback)ConnectionTerminatedCallback.Clone();
            if (ExceptionCallback != null)
                reactor.OnError += (ExceptionCallback) ExceptionCallback.Clone();

            return reactor;
        }

        public IConnection NewConnection()
        {
            return NewConnection(Node.Any());
        }

        public IConnection NewConnection(INode localEndpoint)
        {
            var reactor = (ReactorBase)NewReactor(localEndpoint);
            return reactor.ConnectionAdapter;
        }

        public IConnection NewConnection(INode localEndpoint, INode remoteEndpoint)
        {
            return NewConnection(localEndpoint);
        }
    }

 

發現它們母親(IConnectionFactory)要做的事都是通過IReactor來完成的。而它們(TcpServerFactory和UdpServerFactory)只是找到合適的IReactor對象而已,另一方面我們也可以看出真正負責網絡連接的就是IReactor對象。它就是保證底層通訊的邏輯。

    public sealed class TcpServerFactory : ServerFactoryBase
    {
        public TcpServerFactory(ServerBootstrap other)
            : base(other)
        {
        }

        protected override ReactorBase NewReactorInternal(INode listenAddress)
        {
            if (UseProxies)
                return new TcpProxyReactor(listenAddress.Host, listenAddress.Port, EventLoop, Encoder, Decoder,
                    Allocator, BufferBytes);
            else
                throw new NotImplementedException("Have not implemented non-TCP proxies");
        }
    }
TcpServerFactory
    public sealed class UdpServerFactory : ServerFactoryBase
    {
        public UdpServerFactory(ServerBootstrap other) : base(other)
        {
        }

        protected override ReactorBase NewReactorInternal(INode listenAddress)
        {
            return new UdpProxyReactor(listenAddress.Host, listenAddress.Port, EventLoop, Encoder, Decoder, Allocator, BufferBytes);
        }
    }
UdpServerFactory

 IReactor們

 這里包含TcpServerFactory內部使用的TcpProxyReactor、UdpServerFactory使用的UdpProxyReactor,以及他們的基類ProxyReactorBase、ReactorBase。他們之間的關系為:

  • TcpProxyReactor => ProxyReactorBase => ReactorBase =>IReactor
  • UdpProxyReactor => ProxyReactorBase => ReactorBase =>IReactor
  • ReactorConnectionAdapter =>IConnection(適配器模式,內部封裝IReactor)

*嚴格說ReactorConnectionAdapter 不算是IReactor,它只是適配器模式,使得IReactor對象能夠和IConnection對象模式適配 

雖然類不是很多,但估計helios的高效可能核心就和這部分有關系。但是我不太了解通訊相關內容,只能從構建的方式大致的講下,有興趣的人可以自己深入研究。

  • ReactorBase :定義了基本操作、事件。對於接收,發送提供默認操作
  • ProxyReactorBase :增加了ReactorResponseChannel對象,重載接收方法(ReceivedData,調用的是ReactorResponseChannel的OnReceive)
  • TcpProxyReactor :重載StartInternal方法,使用TcpReactorResponseChannel進行數據接收
  • UdpProxyReactor :重載StartInternal方法,使用ReactorProxyResponseChannel進行數據接收

ReactorResponseChannel們

 此處包含三個類ReactorResponseChannel、TcpReactorResponseChannel、ReactorProxyResponseChannel。

  • ReactorResponseChannel 基類,定義基礎操作。主要是Send方法
  • TcpReactorResponseChannel,TCP協議下的ReactorResponseChannel實現。
  • ReactorProxyResponseChannel,ReactorResponseChannel的代理,實際上就是把ReactorResponseChannel虛方法變成空方法而已。

ReactorResponseChannel中OnReceive方法調用的是”NetworkEventLoop.Receive(data, this);“,將數據發送到EventLoop消息隊列)中。

EventLoop(消息隊列)

消息隊列一共有三個層次繼承,分別是:NetworkEventLoop、ThreadedEventLoop、AbstractEventLoop

繼承關系為:NetworkEventLoop=> ThreadedEventLoop=> AbstractEventLoop。

  • AbstractEventLoop:內部使用IFiber對象,進行消息處理。所有的處理方法最終走的都是IFiber對象(實際上IFiber中維護一個列表,之后由IFiber對象決定如何處理)
  • ThreadedEventLoop:構造函數構建自己的IFiber對象(默認使用的是:DedicatedThreadPoolFiber)
  • NetworkEventLoop:將網絡事件、數據接收事件用IFiber對象處理

IFiber們

 IFiber的作用不是處理接收的數據,而是在乎用什么樣的方式處理數據,比如起幾個線程,同步還是異步的處理。IFiber對象有好幾個,但是實際上真正用的只有1個(DedicatedThreadPoolFiber),但是不妨礙我們去看看這些對象。

  • DedicatedThreadPoolFiber使用hebios自己的線程池技術(DedicatedThreadPool),底層通過線程池來處理數據。
  • SynchronousFiber同步處理,當一個操作進入消息隊列的時候立即處理
  • ThreadPoolFiber使用線程池技術,底層通過線程池來處理數據
  • SharedFiber共享Fiber,當NetworkEventLoop.clone()的時候,只是簡單的將NetworkEventLoop的IFiber對象傳遞過來,以達到多個NetworkEventLoop共享IFiber的目的

 最后的處理類:BasicExecutor/TryCatchExecutor

在IFiber里面,我們會默認構造BasicExecutor對象(TryCatchExecutor繼承自BasicExecutor,可以catch住異常),這個類會最終處理服務器端的數據請求。

總結:服務器端處理數據的順序為:創建ServerBootstrap對象,構建出它的子類(IConnectionFactory),之后分別進行網絡通訊(IReactor),通訊管道(ReactorResponseChannel)對數據接收發送管理,之后數據進入消息隊列(EventLoop),服務器端決定處理數據的線程技術(IFiber),最終將數據處理(BasicExecutor

*這不是真實的接送邏輯,而是我們沿着源代碼求索邏輯的順序。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM