分享一下,項目中寫的簡單socket程序,不同方式的版本,這是個異步基於IOCP實現高性能的處理方式。IOCP就不多說了,高性能的完成端口,可以實現套接字對象的復用,降低開銷,且基於端口共享性能據說很高,因此把之前的項目做個改動。下面來個代碼走讀:
功能介紹:
此socket程序類似一個交換機,對下統一采用報文來交換數據,什么心跳、回執等各式各樣十幾種類,他負責接收各個client發送來的消息,向各個client回發確認幀、發送對設備任務請求等工作;對上,向數據庫存入設備狀態等數據,將應用層的任務,組織合適的報文,下發給client。
他所處的角色和位置:有時候,接收到client的請求后直接轉發給BS(本質就是存入數據庫啦);有時候,接收到client的請求后 思考 一下,向應用層拉取數據,裝配為 恰當的報文 后,回送給對應的client;輪詢BS提供的業務接口,發現有對設備請求的任務后,裝配為恰當的格式后,發送給對應的client。
就像一個繁忙的機械手,有時對於收到的報文,僅僅是左手接,邏輯處理后,右手丟回去,必須滿足,從哪里接的、需送回給正確的client;有時候,只是充當一個轉發器,收到的內容不做任何邏輯處理,直接丟給第三方處理。雖然程序都已經運行多時,都沒有想好一個恰當的名字來命名程序,他更像是一個橋、一個轉換器。
下面是主進程需要聲明的符號;收件箱存入接收的消息,不做任何處理;會有相應的進程不間斷的掃描收件隊列,一旦發現內容,便開始處理,處理結果會存入對應的發件箱,同樣有相應的進程不間斷的處理,通常為發送至client端,或者發送至BS接口。
1 private readonly Config _config = InitConfig.ReadConfig();//配置信息 端口 IP 2 3 private static IDictionary<int, WcsEndpoint<Socket>> _wcsList = new Dictionary<int, WcsEndpoint<Socket>>();//記錄有多少個端點連接進來 4 private const int BufferSize = 1024; 5 private static SocketTool _socketTool = new SocketTool();//完成分包、壓縮、加密關鍵報文內容 6 private static MessageFactory _messageFactory = new MessageFactory();//用於構造不同的消息報文 7 private static TaskSendDown _taskStockIn = new TaskSendDown(_socketTool);//用於橋接瀏覽器,通知web端一些事情發生 8 private static ConcurrentQueue<ReceiveEntity> _receiveMailBox = new ConcurrentQueue<ReceiveEntity>();//收件箱 9 private static ConcurrentQueue<ReceiveEntity> _sendMailBox = new ConcurrentQueue<ReceiveEntity>();//發件箱 10 private static object _lockwcsList = new object();//同步鎖 11 private static CachePool _sPoolyncDataPool = new CachePool();//擴展數據緩存
1 var ip = IPAddress.Parse(_config.LocalIp); 2 TcpListener.InstanceSocketTool = _socketTool; 3 TcpListener.LockwcsList = _lockwcsList; 4 TcpListener.WcsList = _wcsList; 5 var core = new TcpListener 6 {//將外部變量傳遞給核心處理類 7 MaxConnect = int.Parse(_config.MaxConnect), 8 ReceiveMailBox = _receiveMailBox, 9 SPoolyncDataPool = _sPoolyncDataPool, 10 SendMailBox = _sendMailBox, 11 TaskStockIn = _taskStockIn, 12 }; 13 CallbackUpdateStrip("啟動wcs監聽;");//同步UI 14 var receiveBoxHandler = new Thread(core.HandReceiveMailbox); //收件郵箱偵聽 15 receiveBoxHandler.Start(); 16 17 var sendBoxHandler = new Thread(core.HandleSendMailbox); //發件箱偵聽 18 sendBoxHandler.Start(); 19 core.Listen(new IPEndPoint(ip, int.Parse(_config.PortForWcs)));
核心socket實現,抄襲了別人的稍微改造一下,或者高端點重構了一下:
1 using System; 2 using System.Collections.Concurrent; 3 using System.Collections.Generic; 4 using System.Diagnostics; 5 using System.Drawing; 6 using System.Linq; 7 using System.Net; 8 using System.Net.Sockets; 9 using System.Runtime.InteropServices; 10 using System.Runtime.Remoting.Contexts; 11 using System.Text; 12 using System.Threading; 13 using Newtonsoft.Json; 14 using NovaMessageSwitch; 15 using NovaMessageSwitch.Bll; 16 using NovaMessageSwitch.message; 17 using NovaMessageSwitch.Model; 18 using NovaMessageSwitch.Tool; 19 using NovaMessageSwitch.Tool.DataCache; 20 using NovaMessageSwitch.Tool.Log; 21 22 /**************************************************************** 23 * 作者:wxy 24 * CLR版本:4.0.30319.42000 25 * 創建時間:2016/3/25 8:47:51 26 * 2016 27 * 描述說明: 28 * SocketAsyncEventArgs是異步的socket類,封裝了IOCP,可以很方便實現non-blocking IO 29 非阻塞IO對server性能和吞吐量有很大好處 30 * 修改歷史: 31 * IPAddress[] addressList = Dns.GetHostEntry(Environment.MachineName).AddressList; 32 new TcpListener().Listen(/new IPEndPoint(addressList[addressList.Length - 1]/, 9900)); 33 34 Console.ReadKey(); 35 36 * 37 *****************************************************************/ 38 namespace CompletePortB 39 { 40 public class TcpListener 41 { 42 private SocketAsyncEventArgs _args; 43 private Socket _listenerSocket; 44 private StringBuilder _buffers; 45 private const int BufferSize = 1024; 46 47 public TcpListener() 48 { 49 MaxConnect = 1000; 50 51 } 52 53 public int MaxConnect { get; set; } 54 public static object LockwcsList { get; set; } 55 public static IDictionary<int, WcsEndpoint<Socket>> WcsList { get; set; } 56 public TaskSendDown TaskStockIn { get; set; } 57 public CachePool SPoolyncDataPool { get; set; } 58 public ConcurrentQueue<ReceiveEntity> ReceiveMailBox { get; set; } 59 public ConcurrentQueue<ReceiveEntity> SendMailBox { get; set; } 60 61 public static SocketTool InstanceSocketTool { get; set; } 62 63 public void Listen(EndPoint e) 64 { 65 try 66 { 67 _buffers = new StringBuilder(); 68 _listenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 69 _listenerSocket.Bind(e); 70 _listenerSocket.Listen(MaxConnect); 71 _args = new SocketAsyncEventArgs(); 72 _args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept); 73 BeginAccept(_args); 74 Console.Read(); 75 } 76 catch (Exception ex) 77 { 78 79 } 80 } 81 82 private void BeginAccept(SocketAsyncEventArgs e) 83 { 84 e.AcceptSocket = null; 85 //異步操作完成,返回false 86 if (!_listenerSocket.AcceptAsync(e)) 87 ProcessAccept(_listenerSocket, e); 88 89 } 90 91 //異步操作完成時調用此方法 92 private void ProcessAccept(object sender, SocketAsyncEventArgs e) 93 { 94 var s = e.AcceptSocket; 95 e.AcceptSocket = null; 96 var args = new SocketAsyncEventArgs(); 97 //繼續向下回掉函數 98 args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIoCompleted); 99 //設定異步套接字方法的數據緩沖區 100 args.SetBuffer(new byte[BufferSize], 0, BufferSize); 101 args.AcceptSocket = s; 102 //IO操作同步完成,返回false,且不引發e參數的socketasynceventargs事件 103 //IO操作若被掛起,將返回true,且引發e參數的... 104 if (!s.ReceiveAsync(args)) 105 ProcessReceive(args); 106 BeginAccept(e); 107 } 108 void OnIoCompleted(object sender, SocketAsyncEventArgs e) 109 { 110 //獲取最近使用此上下文對象執行的套接字操作類型 111 switch (e.LastOperation) 112 { 113 case SocketAsyncOperation.Receive: 114 this.ProcessReceive(e); 115 break; 116 case SocketAsyncOperation.Send: 117 this.ProcessSend(e); 118 break; 119 default: 120 throw new ArgumentException("The last operation completed on the socket was not a receive or send"); 121 } 122 } 123 124 private void ProcessSend(SocketAsyncEventArgs e) 125 { 126 if (e.SocketError == SocketError.Success) 127 { 128 //接收完畢開始下次接收 129 if (!e.AcceptSocket.ReceiveAsync(e)) 130 { 131 this.ProcessReceive(e); 132 } 133 } 134 else 135 { 136 137 } 138 139 } 140 141 142 /// <summary> 143 /// 維護wcs端點列表 144 /// </summary> 145 /// <param name="clientId"></param> 146 /// <param name="socket"></param> 147 private static void AddDictWcs(int clientId, Socket socket) 148 { 149 lock (LockwcsList) 150 { 151 if (WcsList.Keys.Contains(clientId)) 152 { 153 var endPoint = WcsList[clientId]; 154 endPoint.EndPoint = socket; 155 endPoint.RecentTimeOld = endPoint.RecentTime; 156 endPoint.RecentTime = DateTime.Now; 157 InstanceSocketTool.UpdateWcsDisplay(endPoint, UpdateUi.Post); 158 } 159 else 160 { 161 var newEndPoit = new WcsEndpoint<Socket> 162 { 163 RecentTime = DateTime.Now, 164 RecentTimeOld = null, 165 EndPoint = socket 166 }; 167 WcsList.Add(clientId, newEndPoit); 168 } 169 170 foreach (var wcs in WcsList.Where(wcs => (wcs.Value.RecentTime - DateTime.Now).Minutes > 2)) 171 { 172 WcsList.Remove(wcs.Key); 173 } 174 InstanceSocketTool.UpdateWcsDisplay(WcsList, UpdateUi.Post); 175 } 176 } 177 private string ReplyAckWcs(Socket socket, string oriSerial) 178 { 179 var socketTool = InstanceSocketTool; 180 var ackMessage = new MessageData<ContentReply> 181 { 182 infoType = 1, 183 content = new ContentReply(), 184 destination = DataFlowDirection.wcs.ToString(), 185 source = DataFlowDirection.wms.ToString(), 186 infoDesc = "反饋報文", 187 serial = Guid.NewGuid().ToString("N") 188 }; 189 ackMessage.content.oriSerial = oriSerial; 190 var sendStr = $"?{JsonConvert.SerializeObject(ackMessage)}$"; 191 192 var infoDisplay = socketTool.CreateInfoDisplay(socket); 193 infoDisplay.Message = sendStr; 194 infoDisplay.CustomColor = Color.DodgerBlue; 195 socketTool.PrintInfoConsole($"{sendStr}", ConsoleColor.Green, infoDisplay, UpdateUi.PostMessageInfo); 196 return sendStr; 197 } 198 private void ProcessReceive(SocketAsyncEventArgs e) 199 { 200 if (e.BytesTransferred > 0) 201 { 202 if (e.SocketError == SocketError.Success) 203 { 204 var data = Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred); 205 _buffers.Append(data); 206 207 if (e.AcceptSocket.Available == 0) 208 { 209 var receStr=_buffers.ToString(); 210 if (InstanceSocketTool.ValidateMessageJson(ref receStr) == false) 211 return; 212 var info = InstanceSocketTool.CreateInfoDisplay(e.AcceptSocket); 213 info.Message = receStr; 214 info.CustomColor = Color.Green; 215 InstanceSocketTool.PrintInfoConsole($"遠處:{e.AcceptSocket} 發來消息:{receStr}", Console.ForegroundColor, 216 info, UpdateUi.PostMessageInfo); 217 var receiveContent = InstanceSocketTool.UnPackMessage(receStr); 218 dynamic obj = JsonConvert.DeserializeObject(receiveContent); 219 InstanceSocketTool.ValidateMessageObj(obj, receStr); 220 AddDictWcs((int)obj.clientID.Value, e.AcceptSocket); 221 if (obj.infoType.Value != 0) 222 { 223 ReceiveMailBox.Enqueue(new ReceiveEntity 224 { 225 Client = e.AcceptSocket, 226 Message = obj 227 }); 228 } 229 //重置 230 _buffers = new StringBuilder(); 231 //發送反饋 232 var sendStr = ReplyAckWcs(e.AcceptSocket, Convert.ToString(obj.serial.Value)); 233 var sendBuffer = (byte[])Encoding.UTF8.GetBytes(sendStr); 234 e.SetBuffer(sendBuffer, 0, sendBuffer.Length); 235 if (!e.AcceptSocket.SendAsync(e)) 236 { 237 SendRequest(obj, e.AcceptSocket, e); 238 ProcessSend(e); 239 } 240 } 241 else if (!e.AcceptSocket.ReceiveAsync(e)) 242 { 243 ProcessReceive(e); 244 } 245 } 246 else 247 { 248 //this.ProcessError(e); 249 AppLogger.Error(e.SocketError.ToString()); 250 } 251 } 252 else 253 { 254 //this.CloseClientSocket(e); 255 } 256 257 } 258 static void Callback(IAsyncResult result) 259 { 260 261 262 } 263 264 private void SendRequest(dynamic Message,Socket Client, SocketAsyncEventArgs e) 265 { 266 var wcsReceiver = new WcsReceiver(TaskStockIn) 267 { 268 CacheSyncPool = SPoolyncDataPool, 269 ClientId = Convert.ToString(Message.serial.Value) 270 }; 271 272 if (Message.infoType == 40 || Message.infoType == 42) 273 { 274 wcsReceiver.ReplyBrowser(Message); 275 return; 276 } 277 wcsReceiver.RecieiveRequest(Message, new Action(delegate 278 { 279 280 281 })); 282 283 var packageList = new FrameHandlerTool().GetPackage(Message); 284 var sleepTime = 2000; 285 if (packageList.Count == 1) 286 sleepTime = 1; 287 foreach (var message in packageList) 288 { 289 Thread.Sleep(sleepTime); 290 var sendStr = $"?{JsonConvert.SerializeObject(message)}$"; 291 var sendBuffer = (byte[])Encoding.UTF8.GetBytes(sendStr); 292 e.SetBuffer(sendBuffer, 0, sendBuffer.Length); 293 while (e.AcceptSocket.SendAsync(e)) 294 { 295 } 296 var infoDisplay = InstanceSocketTool.CreateInfoDisplay(e.AcceptSocket); 297 infoDisplay.Message = sendStr; 298 infoDisplay.CustomColor = Color.DodgerBlue; 299 InstanceSocketTool.PrintInfoConsole($"{sendStr}", ConsoleColor.Green, infoDisplay, UpdateUi.PostMessageInfo); 300 } 301 } 302 public void HandReceiveMailbox() 303 { 304 while (true) 305 { 306 if (ReceiveMailBox.IsEmpty) 307 { 308 Thread.Sleep(100); 309 continue; 310 } 311 try 312 { 313 ReceiveEntity messageEntity; 314 ReceiveMailBox.TryDequeue(out messageEntity); 315 316 Action a = () => 317 { 318 var wcsReceiver = new WcsReceiver(TaskStockIn) 319 { 320 CacheSyncPool = SPoolyncDataPool, 321 ClientId = Convert.ToString(messageEntity.Message.serial.Value) 322 }; 323 324 if (messageEntity.Message.infoType == 40 || messageEntity.Message.infoType == 42) 325 { 326 wcsReceiver.ReplyBrowser(messageEntity.Message); 327 return; 328 } 329 wcsReceiver.RecieiveRequest(messageEntity.Message, new Action(delegate 330 { 331 })); 332 SendMailBox.Enqueue(new ReceiveEntity 333 { 334 Client = messageEntity.Client, 335 Message = wcsReceiver.Message 336 }); 337 338 }; 339 a.BeginInvoke(new AsyncCallback(Callback), null); 340 if (ReceiveMailBox.IsEmpty) 341 { 342 ReceiveMailBox = new ConcurrentQueue<ReceiveEntity>(); 343 ClearMemory(); 344 } 345 } 346 catch (Exception ex) 347 { 348 AppLogger.Error($"{ex.Message} {ex.StackTrace}", ex); 349 } 350 } 351 } 352 353 public void HandleSendMailbox() 354 { 355 while (true) 356 { 357 try 358 { 359 if (SendMailBox.IsEmpty) 360 { 361 Thread.Sleep(100); 362 continue; 363 } 364 ReceiveEntity messageEntity; 365 SendMailBox.TryDequeue(out messageEntity); 366 var wcsReceiver = new WcsReceiver(TaskStockIn) 367 { 368 Message = messageEntity.Message 369 }; 370 wcsReceiver.ReplyResponseToWcs(messageEntity.Client); 371 if (SendMailBox.IsEmpty) 372 { 373 ClearMemory(); 374 } 375 } 376 catch (Exception ex) 377 { 378 AppLogger.Error($"HandleSendMailbox:{ex.StackTrace}【-】{ex.Message}", ex); 379 } 380 } 381 } 382 #region 內存回收 383 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] 384 public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); 385 /// <summary> 386 /// 釋放內存 387 /// </summary> 388 public static void ClearMemory() 389 { 390 GC.Collect(); 391 GC.WaitForPendingFinalizers(); 392 if (Environment.OSVersion.Platform == PlatformID.Win32NT) 393 { 394 SetProcessWorkingSetSize(Process.GetCurrentProcess().Handle, -1, -1); 395 } 396 } 397 #endregion 398 } 399 }
備注:把原來的收件與發件模型(采用同步的方式)給重構了,采用SendRequest一個函數搞定,程序測試下,慢在回發消息(即從第三方接口拉數據,需要一定時間),收到的消息,確認幀可以立發,性能貌似還可。
備注2:前期需求不明確,都沒想到會有那么多代碼,所以前期思考不足,程序書寫沒考慮那么多,直接采用同步方式,來一個客戶端開啟一個進程,弊端是大大的,還好業務場景相對簡單,經過一番折騰,采用2個隊列來解耦,程序變的好看起來了。軟件嘛,就是兼具美感的藝術品,如果自己看着都不舒服,那真是到重構的時間了(哈哈哪本書好像說過)。隨着抽象出報文工廠、工具處理類、消息轉發路由,程序確實清晰很多,修改業務功能、重構(之前學習過模式設計以及重構,模式容易生搬硬套,要化有形於無形,重構對我幫助最大,在寫代碼時就考慮到重構,逼迫我寫的代碼不能一團漿糊,否則將來如何重構啊,能獨立出函數就函數,能抽象出類則用獨立類)socket處理方式時,基本不費吹灰之力,有時間就好好改代碼吧。
