基於SocketAsyncEventArgs的版本


分享一下,項目中寫的簡單socket程序,不同方式的版本,這是個異步基於IOCP實現高性能的處理方式。IOCP就不多說了,高性能的完成端口,可以實現套接字對象的復用,降低開銷,且基於端口共享性能據說很高,因此把之前的項目做個改動。下面來個代碼走讀:

功能介紹:

    此socket程序類似一個交換機,對下統一采用報文來交換數據,什么心跳、回執等各式各樣十幾種類,他負責接收各個client發送來的消息,向各個client回發確認幀、發送對設備任務請求等工作;對上,向數據庫存入設備狀態等數據,將應用層的任務,組織合適的報文,下發給client。

    他所處的角色和位置:有時候,接收到client的請求后直接轉發給BS(本質就是存入數據庫啦);有時候,接收到client的請求后 思考 一下,向應用層拉取數據,裝配為 恰當的報文 后,回送給對應的client;輪詢BS提供的業務接口,發現有對設備請求的任務后,裝配為恰當的格式后,發送給對應的client。

    就像一個繁忙的機械手,有時對於收到的報文,僅僅是左手接,邏輯處理后,右手丟回去,必須滿足,從哪里接的、需送回給正確的client;有時候,只是充當一個轉發器,收到的內容不做任何邏輯處理,直接丟給第三方處理。雖然程序都已經運行多時,都沒有想好一個恰當的名字來命名程序,他更像是一個橋、一個轉換器。

    下面是主進程需要聲明的符號;收件箱存入接收的消息,不做任何處理;會有相應的進程不間斷的掃描收件隊列,一旦發現內容,便開始處理,處理結果會存入對應的發件箱,同樣有相應的進程不間斷的處理,通常為發送至client端,或者發送至BS接口。

 1  private readonly Config _config = InitConfig.ReadConfig();//配置信息 端口 IP
 2 
 3         private static IDictionary<int, WcsEndpoint<Socket>> _wcsList = new Dictionary<int, WcsEndpoint<Socket>>();//記錄有多少個端點連接進來
 4         private const int BufferSize = 1024;
 5         private static SocketTool _socketTool = new SocketTool();//完成分包、壓縮、加密關鍵報文內容
 6         private static MessageFactory _messageFactory = new MessageFactory();//用於構造不同的消息報文
 7         private static TaskSendDown _taskStockIn = new TaskSendDown(_socketTool);//用於橋接瀏覽器,通知web端一些事情發生
 8         private static ConcurrentQueue<ReceiveEntity> _receiveMailBox = new ConcurrentQueue<ReceiveEntity>();//收件箱
 9         private static ConcurrentQueue<ReceiveEntity> _sendMailBox = new ConcurrentQueue<ReceiveEntity>();//發件箱
10         private static object _lockwcsList = new object();//同步鎖
11         private static CachePool _sPoolyncDataPool = new CachePool();//擴展數據緩存
View Code
 1  var ip = IPAddress.Parse(_config.LocalIp);
 2             TcpListener.InstanceSocketTool = _socketTool;
 3             TcpListener.LockwcsList = _lockwcsList;
 4             TcpListener.WcsList = _wcsList;
 5             var core = new TcpListener
 6             {//將外部變量傳遞給核心處理類
 7                 MaxConnect = int.Parse(_config.MaxConnect),
 8                 ReceiveMailBox = _receiveMailBox,
 9                 SPoolyncDataPool = _sPoolyncDataPool,
10                 SendMailBox = _sendMailBox,
11                 TaskStockIn = _taskStockIn,
12             };
13             CallbackUpdateStrip("啟動wcs監聽;");//同步UI
14             var receiveBoxHandler = new Thread(core.HandReceiveMailbox); //收件郵箱偵聽
15             receiveBoxHandler.Start();
16 
17             var sendBoxHandler = new Thread(core.HandleSendMailbox); //發件箱偵聽
18             sendBoxHandler.Start();
19             core.Listen(new IPEndPoint(ip, int.Parse(_config.PortForWcs)));
View Code

核心socket實現,抄襲了別人的稍微改造一下,或者高端點重構了一下:

  1 using System;
  2 using System.Collections.Concurrent;
  3 using System.Collections.Generic;
  4 using System.Diagnostics;
  5 using System.Drawing;
  6 using System.Linq;
  7 using System.Net;
  8 using System.Net.Sockets;
  9 using System.Runtime.InteropServices;
 10 using System.Runtime.Remoting.Contexts;
 11 using System.Text;
 12 using System.Threading;
 13 using Newtonsoft.Json;
 14 using NovaMessageSwitch;
 15 using NovaMessageSwitch.Bll;
 16 using NovaMessageSwitch.message;
 17 using NovaMessageSwitch.Model;
 18 using NovaMessageSwitch.Tool;
 19 using NovaMessageSwitch.Tool.DataCache;
 20 using NovaMessageSwitch.Tool.Log;
 21 
 22 /****************************************************************
 23 *   作者:wxy
 24 *   CLR版本:4.0.30319.42000
 25 *   創建時間:2016/3/25 8:47:51
 26 *   2016
 27 *   描述說明:
 28 *   SocketAsyncEventArgs是異步的socket類,封裝了IOCP,可以很方便實現non-blocking IO
 29 非阻塞IO對server性能和吞吐量有很大好處
 30 *   修改歷史:
 31 *  IPAddress[] addressList = Dns.GetHostEntry(Environment.MachineName).AddressList;
 32                 new TcpListener().Listen(/new IPEndPoint(addressList[addressList.Length - 1]/, 9900));
 33      
 34                 Console.ReadKey();
 35 
 36 *
 37 *****************************************************************/
 38 namespace CompletePortB
 39 {
 40     public class TcpListener
 41     {
 42         private SocketAsyncEventArgs _args;
 43         private Socket _listenerSocket;
 44         private StringBuilder _buffers;
 45         private const int BufferSize = 1024;
 46 
 47         public TcpListener()
 48         {
 49             MaxConnect = 1000;
 50 
 51         }
 52 
 53         public int MaxConnect { get; set; }
 54         public static object LockwcsList { get; set; }
 55         public static IDictionary<int, WcsEndpoint<Socket>> WcsList { get; set; }
 56         public TaskSendDown TaskStockIn { get; set; }
 57         public CachePool SPoolyncDataPool { get; set; }
 58         public ConcurrentQueue<ReceiveEntity> ReceiveMailBox { get; set; }
 59         public ConcurrentQueue<ReceiveEntity> SendMailBox { get; set; }
 60 
 61         public static SocketTool InstanceSocketTool { get; set; }
 62 
 63         public void Listen(EndPoint e)
 64         {
 65             try
 66             {
 67                 _buffers = new StringBuilder();
 68                 _listenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 69                 _listenerSocket.Bind(e);
 70                 _listenerSocket.Listen(MaxConnect);
 71                 _args = new SocketAsyncEventArgs();
 72                 _args.Completed += new EventHandler<SocketAsyncEventArgs>(ProcessAccept);
 73                 BeginAccept(_args);
 74                 Console.Read();
 75             }
 76             catch (Exception ex)
 77             {
 78                 
 79             }
 80         }
 81 
 82         private void BeginAccept(SocketAsyncEventArgs e)
 83         {
 84             e.AcceptSocket = null;
 85             //異步操作完成,返回false
 86             if (!_listenerSocket.AcceptAsync(e))
 87                 ProcessAccept(_listenerSocket, e);
 88 
 89         }
 90         
 91         //異步操作完成時調用此方法
 92         private void ProcessAccept(object sender, SocketAsyncEventArgs e)
 93         {
 94             var s = e.AcceptSocket;
 95             e.AcceptSocket = null;
 96             var args = new SocketAsyncEventArgs();
 97             //繼續向下回掉函數
 98             args.Completed += new EventHandler<SocketAsyncEventArgs>(OnIoCompleted);
 99             //設定異步套接字方法的數據緩沖區
100             args.SetBuffer(new byte[BufferSize], 0, BufferSize);
101             args.AcceptSocket = s;
102             //IO操作同步完成,返回false,且不引發e參數的socketasynceventargs事件
103             //IO操作若被掛起,將返回true,且引發e參數的...
104             if (!s.ReceiveAsync(args))
105                 ProcessReceive(args);
106             BeginAccept(e);
107         }
108         void OnIoCompleted(object sender, SocketAsyncEventArgs e)
109         {
110             //獲取最近使用此上下文對象執行的套接字操作類型
111             switch (e.LastOperation)
112             {
113                 case SocketAsyncOperation.Receive:
114                     this.ProcessReceive(e);
115                     break;
116                 case SocketAsyncOperation.Send:
117                     this.ProcessSend(e);
118                     break;
119                 default:
120                     throw new ArgumentException("The last operation completed on the socket was not a receive or send");
121             }
122         }
123 
124         private void ProcessSend(SocketAsyncEventArgs e)
125         {
126             if (e.SocketError == SocketError.Success)
127             {
128                 //接收完畢開始下次接收
129                 if (!e.AcceptSocket.ReceiveAsync(e))
130                 {
131                     this.ProcessReceive(e);
132                 }
133             }
134             else
135             {
136 
137             }
138 
139         }
140      
141 
142         /// <summary>
143         /// 維護wcs端點列表
144         /// </summary>
145         /// <param name="clientId"></param>
146         /// <param name="socket"></param>
147         private static void AddDictWcs(int clientId, Socket socket)
148         {
149             lock (LockwcsList)
150             {
151                 if (WcsList.Keys.Contains(clientId))
152                 {
153                     var endPoint = WcsList[clientId];
154                     endPoint.EndPoint = socket;
155                     endPoint.RecentTimeOld = endPoint.RecentTime;
156                     endPoint.RecentTime = DateTime.Now;
157                     InstanceSocketTool.UpdateWcsDisplay(endPoint, UpdateUi.Post);
158                 }
159                 else
160                 {
161                     var newEndPoit = new WcsEndpoint<Socket>
162                     {
163                         RecentTime = DateTime.Now,
164                         RecentTimeOld = null,
165                         EndPoint = socket
166                     };
167                     WcsList.Add(clientId, newEndPoit);
168                 }
169 
170                 foreach (var wcs in WcsList.Where(wcs => (wcs.Value.RecentTime - DateTime.Now).Minutes > 2))
171                 {
172                     WcsList.Remove(wcs.Key);
173                 }
174                 InstanceSocketTool.UpdateWcsDisplay(WcsList, UpdateUi.Post);
175             }
176         }
177         private string ReplyAckWcs(Socket socket, string oriSerial)
178         {
179             var socketTool = InstanceSocketTool;
180             var ackMessage = new MessageData<ContentReply>
181             {
182                 infoType = 1,
183                 content = new ContentReply(),
184                 destination = DataFlowDirection.wcs.ToString(),
185                 source = DataFlowDirection.wms.ToString(),
186                 infoDesc = "反饋報文",
187                 serial = Guid.NewGuid().ToString("N")
188             };
189             ackMessage.content.oriSerial = oriSerial;
190             var sendStr = $"?{JsonConvert.SerializeObject(ackMessage)}$";
191            
192             var infoDisplay = socketTool.CreateInfoDisplay(socket);
193             infoDisplay.Message = sendStr;
194             infoDisplay.CustomColor = Color.DodgerBlue;
195             socketTool.PrintInfoConsole($"{sendStr}", ConsoleColor.Green, infoDisplay, UpdateUi.PostMessageInfo);
196             return sendStr;
197         }
198         private void ProcessReceive(SocketAsyncEventArgs e)
199         {
200             if (e.BytesTransferred > 0)
201             {
202                 if (e.SocketError == SocketError.Success)
203                 {
204                     var data = Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred);
205                     _buffers.Append(data);
206 
207                     if (e.AcceptSocket.Available == 0)
208                     {
209                         var receStr=_buffers.ToString();
210                         if (InstanceSocketTool.ValidateMessageJson(ref receStr) == false)
211                             return;
212                         var info = InstanceSocketTool.CreateInfoDisplay(e.AcceptSocket);
213                         info.Message = receStr;
214                         info.CustomColor = Color.Green;
215                         InstanceSocketTool.PrintInfoConsole($"遠處:{e.AcceptSocket} 發來消息:{receStr}", Console.ForegroundColor,
216                             info, UpdateUi.PostMessageInfo);
217                         var receiveContent = InstanceSocketTool.UnPackMessage(receStr);
218                         dynamic obj = JsonConvert.DeserializeObject(receiveContent);
219                         InstanceSocketTool.ValidateMessageObj(obj, receStr);
220                         AddDictWcs((int)obj.clientID.Value, e.AcceptSocket);
221                         if (obj.infoType.Value != 0)
222                         {
223                             ReceiveMailBox.Enqueue(new ReceiveEntity
224                             {
225                                 Client = e.AcceptSocket,
226                                 Message = obj
227                             });
228                         }
229                         //重置
230                         _buffers = new StringBuilder();
231                         //發送反饋
232                         var sendStr = ReplyAckWcs(e.AcceptSocket, Convert.ToString(obj.serial.Value));
233                         var sendBuffer = (byte[])Encoding.UTF8.GetBytes(sendStr);
234                         e.SetBuffer(sendBuffer, 0, sendBuffer.Length);
235                         if (!e.AcceptSocket.SendAsync(e))
236                         {
237                             SendRequest(obj, e.AcceptSocket, e);
238                             ProcessSend(e);
239                         }
240                     }
241                     else if (!e.AcceptSocket.ReceiveAsync(e))
242                     {
243                         ProcessReceive(e);
244                     }
245                 }
246                 else
247                 {
248                     //this.ProcessError(e);
249                     AppLogger.Error(e.SocketError.ToString());
250                 }
251             }
252             else
253             {
254                 //this.CloseClientSocket(e);
255             }
256 
257         }
258         static void Callback(IAsyncResult result)
259         {
260 
261 
262         }
263       
264         private void SendRequest(dynamic Message,Socket Client, SocketAsyncEventArgs e)
265         {
266             var wcsReceiver = new WcsReceiver(TaskStockIn)
267             {
268                 CacheSyncPool = SPoolyncDataPool,
269                 ClientId = Convert.ToString(Message.serial.Value)
270             };
271 
272             if (Message.infoType == 40 || Message.infoType == 42)
273             {
274                 wcsReceiver.ReplyBrowser(Message);
275                 return;
276             }
277             wcsReceiver.RecieiveRequest(Message, new Action(delegate
278             {
279 
280 
281             }));
282             
283             var packageList = new FrameHandlerTool().GetPackage(Message);
284             var sleepTime = 2000;
285             if (packageList.Count == 1)
286                 sleepTime = 1;
287             foreach (var message in packageList)
288             {
289                 Thread.Sleep(sleepTime);
290                 var sendStr = $"?{JsonConvert.SerializeObject(message)}$";
291                 var sendBuffer = (byte[])Encoding.UTF8.GetBytes(sendStr);
292                 e.SetBuffer(sendBuffer, 0, sendBuffer.Length);
293                 while (e.AcceptSocket.SendAsync(e))
294                 {
295                 }
296                 var infoDisplay = InstanceSocketTool.CreateInfoDisplay(e.AcceptSocket);
297                 infoDisplay.Message = sendStr;
298                 infoDisplay.CustomColor = Color.DodgerBlue;
299                 InstanceSocketTool.PrintInfoConsole($"{sendStr}", ConsoleColor.Green, infoDisplay, UpdateUi.PostMessageInfo);
300             }
301         }
302         public void HandReceiveMailbox()
303         {
304             while (true)
305             {
306                 if (ReceiveMailBox.IsEmpty)
307                 {
308                     Thread.Sleep(100);
309                     continue;
310                 }
311                 try
312                 {
313                     ReceiveEntity messageEntity;
314                     ReceiveMailBox.TryDequeue(out messageEntity);
315 
316                     Action a = () =>
317                     {
318                         var wcsReceiver = new WcsReceiver(TaskStockIn)
319                         {
320                             CacheSyncPool = SPoolyncDataPool,
321                             ClientId = Convert.ToString(messageEntity.Message.serial.Value)
322                         };
323 
324                         if (messageEntity.Message.infoType == 40 || messageEntity.Message.infoType == 42)
325                         {
326                             wcsReceiver.ReplyBrowser(messageEntity.Message);
327                             return;
328                         }
329                         wcsReceiver.RecieiveRequest(messageEntity.Message, new Action(delegate
330                         {
331                             }));
332                         SendMailBox.Enqueue(new ReceiveEntity
333                         {
334                             Client = messageEntity.Client,
335                             Message = wcsReceiver.Message
336                         });
337 
338                     };
339                     a.BeginInvoke(new AsyncCallback(Callback), null);
340                     if (ReceiveMailBox.IsEmpty)
341                     {
342                         ReceiveMailBox = new ConcurrentQueue<ReceiveEntity>();
343                         ClearMemory();
344                     }
345                 }
346                 catch (Exception ex)
347                 {
348                     AppLogger.Error($"{ex.Message} {ex.StackTrace}", ex);
349                 }
350             }
351         }
352 
353         public void HandleSendMailbox()
354         {
355             while (true)
356             {
357                 try
358                 {
359                     if (SendMailBox.IsEmpty)
360                     {
361                         Thread.Sleep(100);
362                         continue;
363                     }
364                     ReceiveEntity messageEntity;
365                     SendMailBox.TryDequeue(out messageEntity);
366                     var wcsReceiver = new WcsReceiver(TaskStockIn)
367                     {
368                         Message = messageEntity.Message
369                     };
370                     wcsReceiver.ReplyResponseToWcs(messageEntity.Client);
371                     if (SendMailBox.IsEmpty)
372                     {
373                         ClearMemory();
374                     }
375                 }
376                 catch (Exception ex)
377                 {
378                     AppLogger.Error($"HandleSendMailbox:{ex.StackTrace}【-】{ex.Message}", ex);
379                 }
380             }
381         }
382         #region 內存回收
383         [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")]
384         public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize);
385         /// <summary>
386         /// 釋放內存
387         /// </summary>
388         public static void ClearMemory()
389         {
390             GC.Collect();
391             GC.WaitForPendingFinalizers();
392             if (Environment.OSVersion.Platform == PlatformID.Win32NT)
393             {
394                 SetProcessWorkingSetSize(Process.GetCurrentProcess().Handle, -1, -1);
395             }
396         }
397         #endregion
398     }
399 }
View Code

    備注:把原來的收件與發件模型(采用同步的方式)給重構了,采用SendRequest一個函數搞定,程序測試下,慢在回發消息(即從第三方接口拉數據,需要一定時間),收到的消息,確認幀可以立發,性能貌似還可。

    備注2:前期需求不明確,都沒想到會有那么多代碼,所以前期思考不足,程序書寫沒考慮那么多,直接采用同步方式,來一個客戶端開啟一個進程,弊端是大大的,還好業務場景相對簡單,經過一番折騰,采用2個隊列來解耦,程序變的好看起來了。軟件嘛,就是兼具美感的藝術品,如果自己看着都不舒服,那真是到重構的時間了(哈哈哪本書好像說過)。隨着抽象出報文工廠、工具處理類、消息轉發路由,程序確實清晰很多,修改業務功能、重構(之前學習過模式設計以及重構,模式容易生搬硬套,要化有形於無形,重構對我幫助最大,在寫代碼時就考慮到重構,逼迫我寫的代碼不能一團漿糊,否則將來如何重構啊,能獨立出函數就函數,能抽象出類則用獨立類)socket處理方式時,基本不費吹灰之力,有時間就好好改代碼吧。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM