高性能 socket 框架


      最近用C# 寫了一個易用,高性能的socket,抗住了ServerTestTool.exe 壓力測試工具 

下載地址  GitHub       QQ群: 664740973

https://gitee.com/fengma_312/socket.core

https://www.nuget.org/packages/socket.core/


socket.core

這是一個基於C# .net standard2.0 寫的socket框架,可使用於.net Framework/dotnet core程序集,能在window(IOCP)/linux(epoll)運行.使用異步連接,異步發送,異步接收,性能爆表,並且通過壓力測試。

安裝NuGet:
Package Manager: Install-Package socket.core
.Net CLI :dotnet add package socket.core
Paket CLI:paket add socket.core

服務端所在socket.core.Server命名空間下,分別為三種模式 push/pull/pack
客戶端所在socket.core.Client命名空間下,分別為三種模式 push/pull/pack

主要流程與對應的方法和事件介紹.
注:connectId(int)代表着一個連接對象,data(byte[]),success(bool)

  • 1.初始化socket(對應的三種模式)

    實例化服務端類 TcpPushServer/TcpPullServer/TcpPackServer
    實例化客戶端類 TcpPushClient/TcpPullClient/TcpPackClient
    參數介紹int numConnections同時處理的最大連接數,int receiveBufferSize用於每個套接字I/O操作的緩沖區大小(接收端), int overtime超時時長,單位秒.(每10秒檢查一次),當值為0時,不設置超時,uint headerFlag包頭標記范圍0~1023(0x3FF),當包頭標識等於0時,不校驗包頭

  • 2.啟動監聽/連接服務器

    服務端 server.Start(port);
    客戶端 client.Connect(ip,port);

  • 3.觸發連接事件

    服務端 server.OnAccept(connectId); 接收到一個連接id,可用他來發送,接收,關閉的標記
    客戶端 client.OnConnect(success); 接收是否成功連接到服務器

  • 4.發送消息

    服務端 server.Send(connectId,data,offset,length);
    客戶端 client.Send(data,offset,length);

  • 5.觸發已發送事件

    服務端 server.OnSend(connectId,length);
    客戶端 client.OnSend(length);

  • 6.觸發接收事件

    服務端 server.OnReceive(connectId, data);
    客戶端 client.OnReceive(data);

  • 7.關閉連接

    服務端 server.Close(connectId);
    客戶端 client.Close();

  • 8.觸發關閉連接事件

    服務端 server.OnClose(connectId);
    客戶端 client.OnClose();

三種模型簡介

  • 一:push

    當接收到數據時會觸發監聽事件OnReceive(connectId,data);把數據立馬“推”給應用程序

  • 二:pull

    當接收到數據時會觸發監聽事件OnReceive(connectId,length),告訴應用程序當前已經接收到了多少數據長度,應用程序可使用GetLength(connectId)方法檢查已接收的數據的長度,如果滿足則調用組件的Fetch(connectId,length)方法,把需要的數據“拉”出來

  • 三:pack

    pack模型組件是push和pull模型的結合體,應用程序不必要處理分包/合包,組件保證每個server.OnReceive(connectId,data)/client.OnReceive(data)事件都向應用程序提供一個完整的數據包
    注:pack模型組件會對應用程序發送的每個數據包自動加上4個字節(32bit)的包頭,組件接收到數據時,根據包頭信息自動分包,每個完整的數據包通過OnReceive(connectId, data)事件發送給應用程序
    PACK包頭格式(4字節)4*8=32
    XXXXXXXXXXYYYYYYYYYYYYYYYYYYYYYY
    前10位X為包頭標識位,用於數據包校驗,有效包頭標識取值范圍0~1023(0x3FF),當包頭標識等於0時,不校驗包頭,后22位Y為長度位,記錄包體長度。有效數據包最大長度不能超過4194303(0x3FFFFF)字節(byte),應用程序可以通過TcpPackServer/TcpPackClient構造函數參數headerFlag設置

服務端其它方法介紹

      1. bool SetAttached(int connectId, object data)

      服務端為每個客戶端設置附加數據,避免用戶自己再建立用戶映射表

      1. T GetAttached(int connectId)

      獲取指定客戶端的附加數據

      2017/12/27

      技術在於分享,大家共同進步

 核心類 

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Net;
  4 using System.Net.Sockets;
  5 using System.Text;
  6 using System.Threading;
  7 using System.Linq;
  8 using System.Collections.Concurrent;
  9 using socket.core.Common;
 10 
 11 namespace socket.core.Server
 12 {
 13     /// <summary>
 14     /// tcp Socket監聽基庫
 15     /// </summary>
 16     internal class TcpServer
 17     {
 18         /// <summary>
 19         /// 連接標示 自增長
 20         /// </summary>
 21         private int connectId;
 22         /// <summary>
 23         /// 同時處理的最大連接數
 24         /// </summary>
 25         private int m_numConnections;
 26         /// <summary>
 27         /// 用於每個套接字I/O操作的緩沖區大小
 28         /// </summary>
 29         private int m_receiveBufferSize;
 30         /// <summary>
 31         /// 所有套接字接收操作的一個可重用的大型緩沖區集合。
 32         /// </summary>
 33         private BufferManager m_bufferManager;
 34         /// <summary>
 35         /// 用於監聽傳入連接請求的套接字
 36         /// </summary>
 37         private Socket listenSocket;
 38         /// <summary>
 39         /// 接受端SocketAsyncEventArgs對象重用池,接受套接字操作
 40         /// </summary>
 41         private SocketAsyncEventArgsPool m_receivePool;
 42         /// <summary>
 43         /// 發送端SocketAsyncEventArgs對象重用池,發送套接字操作
 44         /// </summary>
 45         private SocketAsyncEventArgsPool m_sendPool;
 46         /// <summary>
 47         /// 超時,如果超時,服務端斷開連接,客戶端需要重連操作
 48         /// </summary>
 49         private int overtime;
 50         /// <summary>
 51         /// 超時檢查間隔時間(秒)
 52         /// </summary>
 53         private int overtimecheck = 1;
 54         /// <summary>
 55         /// 能接到最多客戶端個數的原子操作
 56         /// </summary>
 57         private Semaphore m_maxNumberAcceptedClients;
 58         /// <summary>
 59         /// 已經連接的對象池
 60         /// </summary>
 61         internal ConcurrentDictionary<int, ConnectClient> connectClient;
 62         /// <summary>
 63         /// 發送線程數
 64         /// </summary>
 65         private int sendthread = 10;
 66         /// <summary>
 67         /// 需要發送的數據
 68         /// </summary>
 69         private ConcurrentQueue<SendingQueue>[] sendQueues;      
 70         /// <summary>
 71         /// 72         /// </summary>
 73         private Mutex mutex = new Mutex();
 74         /// <summary>
 75         /// 連接成功事件
 76         /// </summary>
 77         internal event Action<int> OnAccept;
 78         /// <summary>
 79         /// 接收通知事件
 80         /// </summary>
 81         internal event Action<int, byte[], int, int> OnReceive;
 82         /// <summary>
 83         /// 已送通知事件
 84         /// </summary>
 85         internal event Action<int, int> OnSend;
 86         /// <summary>
 87         /// 斷開連接通知事件
 88         /// </summary>
 89         internal event Action<int> OnClose;
 90 
 91         /// <summary>
 92         /// 設置基本配置
 93         /// </summary>   
 94         /// <param name="numConnections">同時處理的最大連接數</param>
 95         /// <param name="receiveBufferSize">用於每個套接字I/O操作的緩沖區大小(接收端)</param>
 96         /// <param name="overTime">超時時長,單位秒.(每10秒檢查一次),當值為0時,不設置超時</param>
 97         internal TcpServer(int numConnections, int receiveBufferSize, int overTime)
 98         {
 99             overtime = overTime;
100             m_numConnections = numConnections;
101             m_receiveBufferSize = receiveBufferSize;
102             m_bufferManager = new BufferManager(receiveBufferSize * m_numConnections, receiveBufferSize);
103             m_receivePool = new SocketAsyncEventArgsPool(m_numConnections);
104             m_sendPool = new SocketAsyncEventArgsPool(m_numConnections);
105             m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections);
106             Init();
107         }
108 
109         /// <summary>
110         /// 初始化服務器通過預先分配的可重復使用的緩沖區和上下文對象。這些對象不需要預先分配或重用,但這樣做是為了說明API如何可以易於用於創建可重用對象以提高服務器性能。
111         /// </summary>
112         private void Init()
113         {
114             connectClient = new ConcurrentDictionary<int, ConnectClient>();
115             sendQueues = new ConcurrentQueue<SendingQueue>[sendthread];
116             for (int i = 0; i < sendthread; i++)
117             {
118                 sendQueues[i] = new ConcurrentQueue<SendingQueue>();
119             }
120             //分配一個大字節緩沖區,所有I/O操作都使用一個。這個侍衛對內存碎片
121             m_bufferManager.InitBuffer();
122             //預分配的接受對象池socketasynceventargs,並分配緩存
123             SocketAsyncEventArgs saea_receive;
124             //分配的發送對象池socketasynceventargs,但是不分配緩存
125             SocketAsyncEventArgs saea_send;
126             for (int i = 0; i < m_numConnections; i++)
127             {
128                 //預先接受端分配一組可重用的消息
129                 saea_receive = new SocketAsyncEventArgs();
130                 saea_receive.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
131                 //分配緩沖池中的字節緩沖區的socketasynceventarg對象
132                 m_bufferManager.SetBuffer(saea_receive);
133                 m_receivePool.Push(saea_receive);
134                 //預先發送端分配一組可重用的消息
135                 saea_send = new SocketAsyncEventArgs();
136                 saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
137                 m_sendPool.Push(saea_send);
138             }
139         }
140 
141         /// <summary>
142         /// 啟動tcp服務偵聽
143         /// </summary>       
144         /// <param name="port">監聽端口</param>
145         internal void Start(int port)
146         {
147             IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port);
148             //創建listens是傳入的套接字。
149             listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
150             listenSocket.NoDelay = true;
151             //綁定端口
152             listenSocket.Bind(localEndPoint);
153             //掛起的連接隊列的最大長度。
154             listenSocket.Listen(1000);
155             //在監聽套接字上接受
156             StartAccept(null);
157             //發送線程
158             for (int i = 0; i < sendthread; i++)
159             {
160                 Thread thread = new Thread(StartSend);
161                 thread.IsBackground = true;
162                 thread.Priority = ThreadPriority.AboveNormal;
163                 thread.Start(i);
164             }
165             //超時機制
166             if (overtime > 0)
167             {
168                 Thread heartbeat = new Thread(new ThreadStart(() =>
169                 {
170                     Heartbeat();
171                 }));
172                 heartbeat.IsBackground = true;
173                 heartbeat.Priority = ThreadPriority.Lowest;
174                 heartbeat.Start();
175             }
176         }
177 
178         /// <summary>
179         /// 超時機制
180         /// </summary>
181         private void Heartbeat()
182         {
183             //計算超時次數 ,超過count就當客戶端斷開連接。服務端清除該連接資源
184             int count = overtime / overtimecheck;
185             while (true)
186             {
187                 foreach (var item in connectClient.Values)
188                 {
189                     if (item.keep_alive >= count)
190                     {
191                         item.keep_alive = 0;
192                         CloseClientSocket(item.saea_receive);
193                     }
194                 }
195                 foreach (var item in connectClient.Values)
196                 {
197                     item.keep_alive++;
198                 }
199                 Thread.Sleep(overtimecheck * 1000);
200             }
201         }
202 
203         #region Accept
204 
205         /// <summary>
206         /// 開始接受客戶端的連接請求的操作。
207         /// </summary>
208         /// <param name="acceptEventArg">發布時要使用的上下文對象服務器偵聽套接字上的接受操作</param>
209         private void StartAccept(SocketAsyncEventArgs acceptEventArg)
210         {
211             if (acceptEventArg == null)
212             {
213                 acceptEventArg = new SocketAsyncEventArgs();
214                 acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
215             }
216             else
217             {
218                 // 套接字必須被清除,因為上下文對象正在被重用。
219                 acceptEventArg.AcceptSocket = null;
220             }
221             m_maxNumberAcceptedClients.WaitOne();
222             //准備一個客戶端接入
223             if (!listenSocket.AcceptAsync(acceptEventArg))
224             {
225                 ProcessAccept(acceptEventArg);
226             }
227         }
228 
229         /// <summary>
230         /// 當異步連接完成時調用此方法
231         /// </summary>
232         /// <param name="e">操作對象</param>
233         private void ProcessAccept(SocketAsyncEventArgs e)
234         {
235             connectId++;
236             //把連接到的客戶端信息添加到集合中
237             ConnectClient connecttoken = new ConnectClient();
238             connecttoken.socket = e.AcceptSocket;
239             //從接受端重用池獲取一個新的SocketAsyncEventArgs對象
240             connecttoken.saea_receive = m_receivePool.Pop();
241             connecttoken.saea_receive.UserToken = connectId;
242             connecttoken.saea_receive.AcceptSocket = e.AcceptSocket;
243             connectClient.TryAdd(connectId, connecttoken);
244             //一旦客戶機連接,就准備接收。
245             if (!e.AcceptSocket.ReceiveAsync(connecttoken.saea_receive))
246             {
247                 ProcessReceive(connecttoken.saea_receive);
248             }
249             //事件回調
250             if (OnAccept != null)
251             {
252                 OnAccept(connectId);
253             }
254             //接受第二連接的請求
255             StartAccept(e);
256         }
257 
258         #endregion
259 
260         #region 接受處理 receive
261 
262         /// <summary>
263         /// 接受處理回調
264         /// </summary>
265         /// <param name="e">操作對象</param>
266         private void ProcessReceive(SocketAsyncEventArgs e)
267         {
268             //檢查遠程主機是否關閉連接
269             if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
270             {
271                 int connectId = (int)e.UserToken;
272                 ConnectClient client;
273                 if (!connectClient.TryGetValue(connectId, out client))
274                 {
275                     return;
276                 }
277                 //如果接收到數據,超時記錄設置為0
278                 if (overtime > 0)
279                 {
280                     if (client != null)
281                     {
282                         client.keep_alive = 0;
283                     }
284                 }
285                 //回調               
286                 if (OnReceive != null)
287                 {
288                     if (client != null)
289                     {
290                         OnReceive(connectId, e.Buffer, e.Offset, e.BytesTransferred);
291                     }
292                 }
293                 //准備下次接收數據      
294                 try
295                 {
296                     if (!e.AcceptSocket.ReceiveAsync(e))
297                     {
298                         ProcessReceive(e);
299                     }
300                 }
301                 catch (ObjectDisposedException ex)
302                 {
303                     if (OnClose != null)
304                     {
305                         OnClose(connectId);
306                     }
307                 }
308             }
309             else
310             {
311                 CloseClientSocket(e);
312             }
313         }
314 
315         #endregion
316 
317         #region 發送處理 send
318 
319         /// <summary>
320         /// 開始啟用發送
321         /// </summary>
322         private void StartSend(object thread)
323         {
324             while (true)
325             {
326                 SendingQueue sending;
327                 if (sendQueues[(int)thread].TryDequeue(out sending))
328                 {
329                     Send(sending);
330                 }
331                 else
332                 {
333                     Thread.Sleep(100);
334                 }
335             }
336         }
337 
338         /// <summary>
339         /// 異步發送消息 
340         /// </summary>
341         /// <param name="connectId">連接ID</param>
342         /// <param name="data">數據</param>
343         /// <param name="offset">偏移位</param>
344         /// <param name="length">長度</param>
345         internal void Send(int connectId, byte[] data, int offset, int length)
346         {
347             sendQueues[connectId % sendthread].Enqueue(new SendingQueue() { connectId = connectId, data = data, offset = offset, length = length });
348         }
349 
350         /// <summary>
351         /// 異步發送消息 
352         /// </summary>
353         /// <param name="sendQuere">發送消息體</param>
354         private void Send(SendingQueue sendQuere)
355         {
356             ConnectClient client;
357             if (!connectClient.TryGetValue(sendQuere.connectId, out client))
358             {
359                 return;
360             }
361             //如果發送池為空時,臨時新建一個放入池中
362             mutex.WaitOne();
363             if (m_sendPool.Count == 0)
364             {
365                 SocketAsyncEventArgs saea_send = new SocketAsyncEventArgs();
366                 saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
367                 m_sendPool.Push(saea_send);
368             }
369             SocketAsyncEventArgs sendEventArgs = m_sendPool.Pop();
370             mutex.ReleaseMutex();
371             sendEventArgs.UserToken = sendQuere.connectId;
372             sendEventArgs.SetBuffer(sendQuere.data, sendQuere.offset, sendQuere.length);
373             try
374             {
375                 if (!client.socket.SendAsync(sendEventArgs))
376                 {
377                     ProcessSend(sendEventArgs);
378                 }
379             }
380             catch (ObjectDisposedException ex)
381             {
382                 if (OnClose != null)
383                 {
384                     OnClose(sendQuere.connectId);
385                 }
386             }
387             sendQuere = null;
388         }
389 
390         /// <summary>
391         /// 發送回調
392         /// </summary>
393         /// <param name="e">操作對象</param>
394         private void ProcessSend(SocketAsyncEventArgs e)
395         {
396             if (e.SocketError == SocketError.Success)
397             {
398                 m_sendPool.Push(e);
399                 if (OnSend != null)
400                 {
401                     OnSend((int)e.UserToken, e.BytesTransferred);
402                 }
403             }
404             else
405             {
406                 CloseClientSocket(e);
407             }
408         }
409 
410         #endregion
411 
412         /// <summary>
413         /// 每當套接字上完成接收或發送操作時,都會調用此方法。
414         /// </summary>
415         /// <param name="sender"></param>
416         /// <param name="e">與完成的接收操作關聯的SocketAsyncEventArg</param>
417         private void IO_Completed(object sender, SocketAsyncEventArgs e)
418         {
419             //確定剛剛完成哪種類型的操作並調用相關的處理程序
420             switch (e.LastOperation)
421             {
422                 case SocketAsyncOperation.Receive:
423                     ProcessReceive(e);
424                     break;
425                 case SocketAsyncOperation.Send:
426                     ProcessSend(e);
427                     break;
428                 case SocketAsyncOperation.Accept:
429                     ProcessAccept(e);
430                     break;
431                 default:
432                     break;
433             }
434         }
435 
436         #region 斷開連接處理
437 
438 
439         /// <summary>
440         /// 客戶端斷開一個連接
441         /// </summary>
442         /// <param name="connectId">連接標記</param>
443         internal void Close(int connectId)
444         {
445             ConnectClient client;
446             if (!connectClient.TryGetValue(connectId, out client))
447             {
448                 return;
449             }
450             CloseClientSocket(client.saea_receive);
451         }
452 
453         /// <summary>
454         /// 斷開一個連接
455         /// </summary>
456         /// <param name="e">操作對象</param>
457         private void CloseClientSocket(SocketAsyncEventArgs e)
458         {
459             if (e.LastOperation == SocketAsyncOperation.Receive)
460             {
461                 int connectId = (int)e.UserToken;
462                 ConnectClient client;
463                 if (!connectClient.TryGetValue(connectId, out client))
464                 {
465                     return;
466                 }
467                 if (client.socket.Connected == false)
468                 {
469                     return;
470                 }
471                 try
472                 {
473                     client.socket.Shutdown(SocketShutdown.Both);
474                 }
475                 // 拋出客戶端進程已經關閉
476                 catch (Exception) { }
477                 client.socket.Close();
478                 m_receivePool.Push(e);
479                 m_maxNumberAcceptedClients.Release();
480                 if (OnClose != null)
481                 {
482                     OnClose(connectId);
483                 }
484                 connectClient.TryRemove((int)e.UserToken, out client);
485                 client = null;
486             }
487         }
488 
489         #endregion
490 
491         #region 附加數據
492 
493         /// <summary>
494         /// 給連接對象設置附加數據
495         /// </summary>
496         /// <param name="connectId">連接標識</param>
497         /// <param name="data">附加數據</param>
498         /// <returns>true:設置成功,false:設置失敗</returns>
499         internal bool SetAttached(int connectId, object data)
500         {
501             ConnectClient client;
502             if (!connectClient.TryGetValue(connectId, out client))
503             {
504                 return false;
505             }
506             client.attached = data;
507             return true;
508         }
509 
510         /// <summary>
511         /// 獲取連接對象的附加數據
512         /// </summary>
513         /// <param name="connectId">連接標識</param>
514         /// <returns>附加數據,如果沒有找到則返回null</returns>
515         internal T GetAttached<T>(int connectId)
516         {
517             ConnectClient client;
518             if (!connectClient.TryGetValue(connectId, out client))
519             {
520                 return default(T);
521             }
522             else
523             {
524                 return (T)client.attached;
525             }
526         }
527         #endregion
528     }
529 
530 }
socket核心類

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM