最近用C# 寫了一個易用,高性能的socket,抗住了ServerTestTool.exe 壓力測試工具
下載地址 GitHub QQ群: 664740973
https://gitee.com/fengma_312/socket.core
https://www.nuget.org/packages/socket.core/
socket.core
這是一個基於C# .net standard2.0 寫的socket框架,可使用於.net Framework/dotnet core程序集,能在window(IOCP)/linux(epoll)運行.使用異步連接,異步發送,異步接收,性能爆表,並且通過壓力測試。
安裝NuGet:
Package Manager: Install-Package socket.core
.Net CLI :dotnet add package socket.core
Paket CLI:paket add socket.core
服務端所在socket.core.Server命名空間下,分別為三種模式 push/pull/pack
客戶端所在socket.core.Client命名空間下,分別為三種模式 push/pull/pack
主要流程與對應的方法和事件介紹.
注:connectId(int)代表着一個連接對象,data(byte[]),success(bool)
- 1.初始化socket(對應的三種模式)
實例化服務端類 TcpPushServer/TcpPullServer/TcpPackServer
實例化客戶端類 TcpPushClient/TcpPullClient/TcpPackClient
參數介紹int numConnections同時處理的最大連接數,int receiveBufferSize用於每個套接字I/O操作的緩沖區大小(接收端), int overtime超時時長,單位秒.(每10秒檢查一次),當值為0時,不設置超時,uint headerFlag包頭標記范圍0~1023(0x3FF),當包頭標識等於0時,不校驗包頭 - 2.啟動監聽/連接服務器
服務端 server.Start(port);
客戶端 client.Connect(ip,port); - 3.觸發連接事件
服務端 server.OnAccept(connectId); 接收到一個連接id,可用他來發送,接收,關閉的標記
客戶端 client.OnConnect(success); 接收是否成功連接到服務器 - 4.發送消息
服務端 server.Send(connectId,data,offset,length);
客戶端 client.Send(data,offset,length); - 5.觸發已發送事件
服務端 server.OnSend(connectId,length);
客戶端 client.OnSend(length); - 6.觸發接收事件
服務端 server.OnReceive(connectId, data);
客戶端 client.OnReceive(data); - 7.關閉連接
服務端 server.Close(connectId);
客戶端 client.Close(); - 8.觸發關閉連接事件
服務端 server.OnClose(connectId);
客戶端 client.OnClose();
三種模型簡介
- 一:push
當接收到數據時會觸發監聽事件OnReceive(connectId,data);把數據立馬“推”給應用程序
- 二:pull
當接收到數據時會觸發監聽事件OnReceive(connectId,length),告訴應用程序當前已經接收到了多少數據長度,應用程序可使用GetLength(connectId)方法檢查已接收的數據的長度,如果滿足則調用組件的Fetch(connectId,length)方法,把需要的數據“拉”出來
- 三:pack
pack模型組件是push和pull模型的結合體,應用程序不必要處理分包/合包,組件保證每個server.OnReceive(connectId,data)/client.OnReceive(data)事件都向應用程序提供一個完整的數據包
注:pack模型組件會對應用程序發送的每個數據包自動加上4個字節(32bit)的包頭,組件接收到數據時,根據包頭信息自動分包,每個完整的數據包通過OnReceive(connectId, data)事件發送給應用程序
PACK包頭格式(4字節)4*8=32
XXXXXXXXXXYYYYYYYYYYYYYYYYYYYYYY
前10位X為包頭標識位,用於數據包校驗,有效包頭標識取值范圍0~1023(0x3FF),當包頭標識等於0時,不校驗包頭,后22位Y為長度位,記錄包體長度。有效數據包最大長度不能超過4194303(0x3FFFFF)字節(byte),應用程序可以通過TcpPackServer/TcpPackClient構造函數參數headerFlag設置
服務端其它方法介紹
-
- bool SetAttached(int connectId, object data)
服務端為每個客戶端設置附加數據,避免用戶自己再建立用戶映射表
-
- T GetAttached(int connectId)
獲取指定客戶端的附加數據
2017/12/27
技術在於分享,大家共同進步
核心類
1 using System; 2 using System.Collections.Generic; 3 using System.Net; 4 using System.Net.Sockets; 5 using System.Text; 6 using System.Threading; 7 using System.Linq; 8 using System.Collections.Concurrent; 9 using socket.core.Common; 10 11 namespace socket.core.Server 12 { 13 /// <summary> 14 /// tcp Socket監聽基庫 15 /// </summary> 16 internal class TcpServer 17 { 18 /// <summary> 19 /// 連接標示 自增長 20 /// </summary> 21 private int connectId; 22 /// <summary> 23 /// 同時處理的最大連接數 24 /// </summary> 25 private int m_numConnections; 26 /// <summary> 27 /// 用於每個套接字I/O操作的緩沖區大小 28 /// </summary> 29 private int m_receiveBufferSize; 30 /// <summary> 31 /// 所有套接字接收操作的一個可重用的大型緩沖區集合。 32 /// </summary> 33 private BufferManager m_bufferManager; 34 /// <summary> 35 /// 用於監聽傳入連接請求的套接字 36 /// </summary> 37 private Socket listenSocket; 38 /// <summary> 39 /// 接受端SocketAsyncEventArgs對象重用池,接受套接字操作 40 /// </summary> 41 private SocketAsyncEventArgsPool m_receivePool; 42 /// <summary> 43 /// 發送端SocketAsyncEventArgs對象重用池,發送套接字操作 44 /// </summary> 45 private SocketAsyncEventArgsPool m_sendPool; 46 /// <summary> 47 /// 超時,如果超時,服務端斷開連接,客戶端需要重連操作 48 /// </summary> 49 private int overtime; 50 /// <summary> 51 /// 超時檢查間隔時間(秒) 52 /// </summary> 53 private int overtimecheck = 1; 54 /// <summary> 55 /// 能接到最多客戶端個數的原子操作 56 /// </summary> 57 private Semaphore m_maxNumberAcceptedClients; 58 /// <summary> 59 /// 已經連接的對象池 60 /// </summary> 61 internal ConcurrentDictionary<int, ConnectClient> connectClient; 62 /// <summary> 63 /// 發送線程數 64 /// </summary> 65 private int sendthread = 10; 66 /// <summary> 67 /// 需要發送的數據 68 /// </summary> 69 private ConcurrentQueue<SendingQueue>[] sendQueues; 70 /// <summary> 71 /// 鎖 72 /// </summary> 73 private Mutex mutex = new Mutex(); 74 /// <summary> 75 /// 連接成功事件 76 /// </summary> 77 internal event Action<int> OnAccept; 78 /// <summary> 79 /// 接收通知事件 80 /// </summary> 81 internal event Action<int, byte[], int, int> OnReceive; 82 /// <summary> 83 /// 已送通知事件 84 /// </summary> 85 internal event Action<int, int> OnSend; 86 /// <summary> 87 /// 斷開連接通知事件 88 /// </summary> 89 internal event Action<int> OnClose; 90 91 /// <summary> 92 /// 設置基本配置 93 /// </summary> 94 /// <param name="numConnections">同時處理的最大連接數</param> 95 /// <param name="receiveBufferSize">用於每個套接字I/O操作的緩沖區大小(接收端)</param> 96 /// <param name="overTime">超時時長,單位秒.(每10秒檢查一次),當值為0時,不設置超時</param> 97 internal TcpServer(int numConnections, int receiveBufferSize, int overTime) 98 { 99 overtime = overTime; 100 m_numConnections = numConnections; 101 m_receiveBufferSize = receiveBufferSize; 102 m_bufferManager = new BufferManager(receiveBufferSize * m_numConnections, receiveBufferSize); 103 m_receivePool = new SocketAsyncEventArgsPool(m_numConnections); 104 m_sendPool = new SocketAsyncEventArgsPool(m_numConnections); 105 m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections); 106 Init(); 107 } 108 109 /// <summary> 110 /// 初始化服務器通過預先分配的可重復使用的緩沖區和上下文對象。這些對象不需要預先分配或重用,但這樣做是為了說明API如何可以易於用於創建可重用對象以提高服務器性能。 111 /// </summary> 112 private void Init() 113 { 114 connectClient = new ConcurrentDictionary<int, ConnectClient>(); 115 sendQueues = new ConcurrentQueue<SendingQueue>[sendthread]; 116 for (int i = 0; i < sendthread; i++) 117 { 118 sendQueues[i] = new ConcurrentQueue<SendingQueue>(); 119 } 120 //分配一個大字節緩沖區,所有I/O操作都使用一個。這個侍衛對內存碎片 121 m_bufferManager.InitBuffer(); 122 //預分配的接受對象池socketasynceventargs,並分配緩存 123 SocketAsyncEventArgs saea_receive; 124 //分配的發送對象池socketasynceventargs,但是不分配緩存 125 SocketAsyncEventArgs saea_send; 126 for (int i = 0; i < m_numConnections; i++) 127 { 128 //預先接受端分配一組可重用的消息 129 saea_receive = new SocketAsyncEventArgs(); 130 saea_receive.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); 131 //分配緩沖池中的字節緩沖區的socketasynceventarg對象 132 m_bufferManager.SetBuffer(saea_receive); 133 m_receivePool.Push(saea_receive); 134 //預先發送端分配一組可重用的消息 135 saea_send = new SocketAsyncEventArgs(); 136 saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); 137 m_sendPool.Push(saea_send); 138 } 139 } 140 141 /// <summary> 142 /// 啟動tcp服務偵聽 143 /// </summary> 144 /// <param name="port">監聽端口</param> 145 internal void Start(int port) 146 { 147 IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port); 148 //創建listens是傳入的套接字。 149 listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); 150 listenSocket.NoDelay = true; 151 //綁定端口 152 listenSocket.Bind(localEndPoint); 153 //掛起的連接隊列的最大長度。 154 listenSocket.Listen(1000); 155 //在監聽套接字上接受 156 StartAccept(null); 157 //發送線程 158 for (int i = 0; i < sendthread; i++) 159 { 160 Thread thread = new Thread(StartSend); 161 thread.IsBackground = true; 162 thread.Priority = ThreadPriority.AboveNormal; 163 thread.Start(i); 164 } 165 //超時機制 166 if (overtime > 0) 167 { 168 Thread heartbeat = new Thread(new ThreadStart(() => 169 { 170 Heartbeat(); 171 })); 172 heartbeat.IsBackground = true; 173 heartbeat.Priority = ThreadPriority.Lowest; 174 heartbeat.Start(); 175 } 176 } 177 178 /// <summary> 179 /// 超時機制 180 /// </summary> 181 private void Heartbeat() 182 { 183 //計算超時次數 ,超過count就當客戶端斷開連接。服務端清除該連接資源 184 int count = overtime / overtimecheck; 185 while (true) 186 { 187 foreach (var item in connectClient.Values) 188 { 189 if (item.keep_alive >= count) 190 { 191 item.keep_alive = 0; 192 CloseClientSocket(item.saea_receive); 193 } 194 } 195 foreach (var item in connectClient.Values) 196 { 197 item.keep_alive++; 198 } 199 Thread.Sleep(overtimecheck * 1000); 200 } 201 } 202 203 #region Accept 204 205 /// <summary> 206 /// 開始接受客戶端的連接請求的操作。 207 /// </summary> 208 /// <param name="acceptEventArg">發布時要使用的上下文對象服務器偵聽套接字上的接受操作</param> 209 private void StartAccept(SocketAsyncEventArgs acceptEventArg) 210 { 211 if (acceptEventArg == null) 212 { 213 acceptEventArg = new SocketAsyncEventArgs(); 214 acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); 215 } 216 else 217 { 218 // 套接字必須被清除,因為上下文對象正在被重用。 219 acceptEventArg.AcceptSocket = null; 220 } 221 m_maxNumberAcceptedClients.WaitOne(); 222 //准備一個客戶端接入 223 if (!listenSocket.AcceptAsync(acceptEventArg)) 224 { 225 ProcessAccept(acceptEventArg); 226 } 227 } 228 229 /// <summary> 230 /// 當異步連接完成時調用此方法 231 /// </summary> 232 /// <param name="e">操作對象</param> 233 private void ProcessAccept(SocketAsyncEventArgs e) 234 { 235 connectId++; 236 //把連接到的客戶端信息添加到集合中 237 ConnectClient connecttoken = new ConnectClient(); 238 connecttoken.socket = e.AcceptSocket; 239 //從接受端重用池獲取一個新的SocketAsyncEventArgs對象 240 connecttoken.saea_receive = m_receivePool.Pop(); 241 connecttoken.saea_receive.UserToken = connectId; 242 connecttoken.saea_receive.AcceptSocket = e.AcceptSocket; 243 connectClient.TryAdd(connectId, connecttoken); 244 //一旦客戶機連接,就准備接收。 245 if (!e.AcceptSocket.ReceiveAsync(connecttoken.saea_receive)) 246 { 247 ProcessReceive(connecttoken.saea_receive); 248 } 249 //事件回調 250 if (OnAccept != null) 251 { 252 OnAccept(connectId); 253 } 254 //接受第二連接的請求 255 StartAccept(e); 256 } 257 258 #endregion 259 260 #region 接受處理 receive 261 262 /// <summary> 263 /// 接受處理回調 264 /// </summary> 265 /// <param name="e">操作對象</param> 266 private void ProcessReceive(SocketAsyncEventArgs e) 267 { 268 //檢查遠程主機是否關閉連接 269 if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) 270 { 271 int connectId = (int)e.UserToken; 272 ConnectClient client; 273 if (!connectClient.TryGetValue(connectId, out client)) 274 { 275 return; 276 } 277 //如果接收到數據,超時記錄設置為0 278 if (overtime > 0) 279 { 280 if (client != null) 281 { 282 client.keep_alive = 0; 283 } 284 } 285 //回調 286 if (OnReceive != null) 287 { 288 if (client != null) 289 { 290 OnReceive(connectId, e.Buffer, e.Offset, e.BytesTransferred); 291 } 292 } 293 //准備下次接收數據 294 try 295 { 296 if (!e.AcceptSocket.ReceiveAsync(e)) 297 { 298 ProcessReceive(e); 299 } 300 } 301 catch (ObjectDisposedException ex) 302 { 303 if (OnClose != null) 304 { 305 OnClose(connectId); 306 } 307 } 308 } 309 else 310 { 311 CloseClientSocket(e); 312 } 313 } 314 315 #endregion 316 317 #region 發送處理 send 318 319 /// <summary> 320 /// 開始啟用發送 321 /// </summary> 322 private void StartSend(object thread) 323 { 324 while (true) 325 { 326 SendingQueue sending; 327 if (sendQueues[(int)thread].TryDequeue(out sending)) 328 { 329 Send(sending); 330 } 331 else 332 { 333 Thread.Sleep(100); 334 } 335 } 336 } 337 338 /// <summary> 339 /// 異步發送消息 340 /// </summary> 341 /// <param name="connectId">連接ID</param> 342 /// <param name="data">數據</param> 343 /// <param name="offset">偏移位</param> 344 /// <param name="length">長度</param> 345 internal void Send(int connectId, byte[] data, int offset, int length) 346 { 347 sendQueues[connectId % sendthread].Enqueue(new SendingQueue() { connectId = connectId, data = data, offset = offset, length = length }); 348 } 349 350 /// <summary> 351 /// 異步發送消息 352 /// </summary> 353 /// <param name="sendQuere">發送消息體</param> 354 private void Send(SendingQueue sendQuere) 355 { 356 ConnectClient client; 357 if (!connectClient.TryGetValue(sendQuere.connectId, out client)) 358 { 359 return; 360 } 361 //如果發送池為空時,臨時新建一個放入池中 362 mutex.WaitOne(); 363 if (m_sendPool.Count == 0) 364 { 365 SocketAsyncEventArgs saea_send = new SocketAsyncEventArgs(); 366 saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); 367 m_sendPool.Push(saea_send); 368 } 369 SocketAsyncEventArgs sendEventArgs = m_sendPool.Pop(); 370 mutex.ReleaseMutex(); 371 sendEventArgs.UserToken = sendQuere.connectId; 372 sendEventArgs.SetBuffer(sendQuere.data, sendQuere.offset, sendQuere.length); 373 try 374 { 375 if (!client.socket.SendAsync(sendEventArgs)) 376 { 377 ProcessSend(sendEventArgs); 378 } 379 } 380 catch (ObjectDisposedException ex) 381 { 382 if (OnClose != null) 383 { 384 OnClose(sendQuere.connectId); 385 } 386 } 387 sendQuere = null; 388 } 389 390 /// <summary> 391 /// 發送回調 392 /// </summary> 393 /// <param name="e">操作對象</param> 394 private void ProcessSend(SocketAsyncEventArgs e) 395 { 396 if (e.SocketError == SocketError.Success) 397 { 398 m_sendPool.Push(e); 399 if (OnSend != null) 400 { 401 OnSend((int)e.UserToken, e.BytesTransferred); 402 } 403 } 404 else 405 { 406 CloseClientSocket(e); 407 } 408 } 409 410 #endregion 411 412 /// <summary> 413 /// 每當套接字上完成接收或發送操作時,都會調用此方法。 414 /// </summary> 415 /// <param name="sender"></param> 416 /// <param name="e">與完成的接收操作關聯的SocketAsyncEventArg</param> 417 private void IO_Completed(object sender, SocketAsyncEventArgs e) 418 { 419 //確定剛剛完成哪種類型的操作並調用相關的處理程序 420 switch (e.LastOperation) 421 { 422 case SocketAsyncOperation.Receive: 423 ProcessReceive(e); 424 break; 425 case SocketAsyncOperation.Send: 426 ProcessSend(e); 427 break; 428 case SocketAsyncOperation.Accept: 429 ProcessAccept(e); 430 break; 431 default: 432 break; 433 } 434 } 435 436 #region 斷開連接處理 437 438 439 /// <summary> 440 /// 客戶端斷開一個連接 441 /// </summary> 442 /// <param name="connectId">連接標記</param> 443 internal void Close(int connectId) 444 { 445 ConnectClient client; 446 if (!connectClient.TryGetValue(connectId, out client)) 447 { 448 return; 449 } 450 CloseClientSocket(client.saea_receive); 451 } 452 453 /// <summary> 454 /// 斷開一個連接 455 /// </summary> 456 /// <param name="e">操作對象</param> 457 private void CloseClientSocket(SocketAsyncEventArgs e) 458 { 459 if (e.LastOperation == SocketAsyncOperation.Receive) 460 { 461 int connectId = (int)e.UserToken; 462 ConnectClient client; 463 if (!connectClient.TryGetValue(connectId, out client)) 464 { 465 return; 466 } 467 if (client.socket.Connected == false) 468 { 469 return; 470 } 471 try 472 { 473 client.socket.Shutdown(SocketShutdown.Both); 474 } 475 // 拋出客戶端進程已經關閉 476 catch (Exception) { } 477 client.socket.Close(); 478 m_receivePool.Push(e); 479 m_maxNumberAcceptedClients.Release(); 480 if (OnClose != null) 481 { 482 OnClose(connectId); 483 } 484 connectClient.TryRemove((int)e.UserToken, out client); 485 client = null; 486 } 487 } 488 489 #endregion 490 491 #region 附加數據 492 493 /// <summary> 494 /// 給連接對象設置附加數據 495 /// </summary> 496 /// <param name="connectId">連接標識</param> 497 /// <param name="data">附加數據</param> 498 /// <returns>true:設置成功,false:設置失敗</returns> 499 internal bool SetAttached(int connectId, object data) 500 { 501 ConnectClient client; 502 if (!connectClient.TryGetValue(connectId, out client)) 503 { 504 return false; 505 } 506 client.attached = data; 507 return true; 508 } 509 510 /// <summary> 511 /// 獲取連接對象的附加數據 512 /// </summary> 513 /// <param name="connectId">連接標識</param> 514 /// <returns>附加數據,如果沒有找到則返回null</returns> 515 internal T GetAttached<T>(int connectId) 516 { 517 ConnectClient client; 518 if (!connectClient.TryGetValue(connectId, out client)) 519 { 520 return default(T); 521 } 522 else 523 { 524 return (T)client.attached; 525 } 526 } 527 #endregion 528 } 529 530 }
