寫自己的Socket框架(一)


本系列僅介紹可用於生產環境的C#異步Socket框架,如果您在其他地方看到類似的代碼,不要驚訝,那可能就是我在參考開源代碼時,直接“剽竊”過來的。

 

1、在腦海里思考一下整個socket的鏈接的處理流程,於是便有了下圖。

2、首先就開始監聽,代碼如下:

public override bool Start()
        {
            this._socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            //設置KeeyAlive,如果客戶端不主動發消息時,Tcp本身會發一個心跳包,來通知服務器,這是一個保持通訊的鏈接。
            //避免等到下一次通訊時,才知道鏈接已經斷開。
            this._socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
            this._socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
            try
            {
                this._socket.Bind(base.SocketConfig.Point);
                this._socket.Listen(base.SocketConfig.Backlog);

                this._socket_args = new SocketAsyncEventArgs();
                this._socket_args.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptSocketCompleted);

                //在鏈接過來的時候,如果IO沒有掛起,則AcceptAsync為False,表明同步完成。
                if (!this._socket.AcceptAsync(this._socket_args))
                {
                    AcceptSocketCompleted(this._socket, this._socket_args);
                }
                return true;
            }
            catch (Exception ex)
            {
                return false;
            }
        }


void AcceptSocketCompleted(object sender, SocketAsyncEventArgs e)
        {
            System.Net.Sockets.Socket socket = null;
            if (e.SocketError != SocketError.Success)
            {
                    return;
            }
            else
            {
                socket = e.AcceptSocket;
            }
            e.AcceptSocket = null;
            bool willRaiseEvent = false;
            try
            {
                //繼續監聽該端口,在處理邏輯時,不影響其他鏈接的數據傳送。
                willRaiseEvent = this._socket.AcceptAsync(e);
            }
            catch (Exception ex)
            {
                willRaiseEvent = true;
            }

            if (socket != null)
                OnNewClientAccepted(socket, null);

            if (!willRaiseEvent)
                AcceptSocketCompleted(null, e);
        }
View Code

3、這個時候鏈接過來了,就要開始入隊列了,如果沒有這方面的需求,這一步可以忽略,代碼如下:

public class SocketProxy
    {
        public System.Net.Sockets.Socket Client;

        public DateTime Timeout = DateTime.Now;

    }


public class SocketConnectionQueue : IDisposable
    {
        private Queue<SocketProxy> _queue;

        private readonly object _syncObject = new object();

        private bool _isStop = false;

        private Thread _thread;

        public Action<SocketProxy> Connected;

        public SocketConnectionQueue()
        {
            if (_queue == null)
            {
                _queue = new Queue<SocketProxy>();
            }

            if (_thread == null)
            {
                _thread = new Thread(Thread_Work)
                {
                    IsBackground = true,
                    Priority = ThreadPriority.Highest

                };
                _thread.Start();
            }
        }

        public void Push(SocketProxy connect)
        {
            lock (_syncObject)
            {
                if (_queue != null)
                {
                    _queue.Enqueue(connect);
                }
            }
        }

        public void Thread_Work()
        {
            while (!_isStop)
            {
                SocketProxy[] socketConnect = null;
                lock (_syncObject)
                {
                    if (_queue.Count > 0)
                    {
                        socketConnect = new SocketProxy[_queue.Count];
                        _queue.CopyTo(socketConnect, 0);
                        _queue.Clear();
                    }
                }

                if (socketConnect != null && socketConnect.Length > 0)
                {
                    foreach (var client in socketConnect)
                    {
                        if (Connected != null)
                        {
                            Connected.Invoke(client);
                        }
                    }
                }
                Thread.Sleep(10);
            }
        }

        public void Dispose()
        {
            _isStop = true;
            if (_thread != null)
            {
                _thread.Join();
            }
        }
    }
View Code

4、入完隊列,就要開始從鏈接池子里面分配資源了,你也可以不做鏈接池,在每次請求過來的時候去實例化一個鏈接,然后將這個鏈接入池,我的做法是在程序初始化的時候就分配好一定的資源,代碼如下:

public class SocketConnectionPool : IDisposable
    {
        private ServerConfig _serverConfig;

        public IAppServer AppServer;

        private ConcurrentStack<SocketConnection> _connectPool;

        private long connect_id = 0;
        private byte[] _buffer;
        private readonly object _syncObject = new object();

        private SocketConnectionQueue _queue;

        public Action<System.Net.Sockets.Socket, SocketConnection> Connected;

        public long GenerateId()
        {
            if (connect_id == long.MaxValue)
            {
                connect_id = 0;
            }
            connect_id++;
            return connect_id;
        }

        public SocketConnectionPool(IAppServer server)
        {
            this.AppServer = server;
            this._serverConfig = server.AppConfig;

        }

        public void Init()
        {
            var connects = new List<SocketConnection>(this._serverConfig.MaxConnectionNumber);
            _buffer = new byte[this._serverConfig.BufferSize];
            SocketAsyncEventArgs arg;
            for (var i = 0; i < this._serverConfig.MaxConnectionNumber; i++)
            {
                arg = new SocketAsyncEventArgs();
                arg.SetBuffer(_buffer, 0, _buffer.Length);
                connects.Add(new SocketConnection(arg, this));
            }
            _connectPool = new ConcurrentStack<SocketConnection>(connects);
            if (_queue == null)
            {
                _queue = new SocketConnectionQueue();
            }
            
            _queue.Connected = OnConnected;
        }

        public void Push(System.Net.Sockets.Socket socket)
        {
            SocketProxy proxy = new SocketProxy()
            {
                Client = socket
            };
            _queue.Push(proxy);
        }

        public void OnConnected(SocketProxy proxy)
        {
            //如果發現隊列里面的鏈接,在Timeout時間內,都沒有分配到資源,則關掉鏈接並丟棄。
            int timeout = (int)(DateTime.Now - proxy.Timeout).TotalSeconds;
            if (timeout >= this._serverConfig.Timeout)
            {
                proxy.Client.Close();
                return;
            }
            else
            {
                //沒有分配到資源重新入列。
                SocketConnection connect = this.GetConnectionFromPool();
                if (connect == null)
                {
                    _queue.Push(proxy);
                }
                else
                {
                    if (this.Connected != null)
                    {
                        this.Connected(proxy.Client, connect);
                    }
                }
            }
        }

        /// <summary>
        /// 從鏈接池去取鏈接(LIFO)
        /// </summary>
        /// <returns></returns>
        public SocketConnection GetConnectionFromPool()
        {
            //_queue.Push();
            SocketConnection connect;
            if (!_connectPool.TryPop(out connect))
            {
                return null;
            }
            lock (_syncObject)
            {
                long connect_id = this.GenerateId();
                connect.ConnectId = connect_id;
            }
            return connect;
        }
        /// <summary>
        /// 釋放鏈接,並放回鏈接池
        /// </summary>
        /// <param name="connect"></param>
        public void ReleaseConnection(SocketConnection connect)
        {
            _connectPool.Push(connect);
            LogHelper.Debug(connect.ConnectId + "放回ConnectPool");
        }

        public void Dispose()
        {
            _queue.Dispose();
        }
    }
View Code

在Init()里面初始化了很多個SocketConnection,這個就是我們用來管理具體的單個鏈接的class,代碼如下:

public class SocketConnection
    {
        public SocketFlag Flag { get; private set; }
        
        public SocketConnectionPool Pool { get { return _pool; } private set { } }
        private SocketConnectionPool _pool;

        public SocketAsyncEventArgs RecevieEventArgs { get; set; }

        public long ConnectId { get; set; }

        public SocketConnection()
        {
            this.Flag = SocketFlag.Error;
        }

        public SocketConnection(SocketAsyncEventArgs args, SocketConnectionPool pool)
        {
            RecevieEventArgs = args;
            RecevieEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketEventArgs_Completed);
            
            this.Flag = SocketFlag.Busy;
            this._pool = pool;
        }

        void SocketEventArgs_Completed(object sender, SocketAsyncEventArgs e)
        {
            var socketSession = e.UserToken as SocketSession;
            if (socketSession == null)
            {
                this.Flag = SocketFlag.Error;
                this.Close();
                return;
            }

            switch (e.LastOperation)
            { 
                case SocketAsyncOperation.Receive:
                    socketSession.ReceiveData(e);
                    break;
                default:
                    break;
            }
        }

        public void Initialise(SocketSession session)
        {
            this.RecevieEventArgs.UserToken = session;
            this.Flag = SocketFlag.Busy;

            session.Closed += () =>
            {
                this.Close();
            };
        }

        public void Reset()
        {
            //ConnectId = 0;
            this.RecevieEventArgs.UserToken = null;
            this.Flag = SocketFlag.Idle;
        }

        private void Close()
        {
            this.Reset();
            LogHelper.Debug(ConnectId + " reset");
            this._pool.ReleaseConnection(this);
        }
    }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM