“池”這個概念好像最早是在操作系統的課上聽過的,到后來出來工作的第二天組長也跟我提起“池”這個東東。它給我的感覺是某種對象的集合,如果要用的話就取出,不用的話就放回。在學多線程的時候有接觸過線程池,在寫《Socket 一對多通信》的時候想到了Socket連接池這回事,不過在網上谷歌了一下,發現這類的文章貌似不多,看了一下園友的博文《高性能Socket設計實現》,獲益良多,下了份源碼來看,雖然有一部分看不明白,而且由於個人水平跑不了那份代碼,但是從中我學到了不少,至少我寫的“池”有一部分是用了這位田大哥的思想。
先來分析各個類之間的結構,整個連接池里面實際上是有兩個池,一個是在異步通信中可以重復利用的SocketAsyncEventArgs池(當然處於別的方面考慮,池里面並不是單純放SocketAsyncEventArgs的實例集合),另一個是接收數據時用到的byte[]緩沖池。而這兩個池是外部不能訪問的,外部通過一個控制(或者叫管理)的類進行操作。繼承了前兩篇博文中的異步通信的思想,最底的一個類存放了一次連接的信息,包括兩個SocketAsyncEventArgs實例和通信另一端的Socket實例。下面將逐部分介紹。
1 /// <summary> 2 /// 連接單元 3 /// </summary> 4 class ConnectionUnit:IDisposable 5 { 6 private string _uid;//單元的編號,默認為-1 7 private bool _state;//單元的狀態,true表示使用,false表示空閑 8 private SocketAsyncEventArgs _sendArg;//專用於發送 9 private SocketAsyncEventArgs _recArg;//專用於接收 10 internal Socket client { get; set; }//客戶端的socket實例 11 internal ArrayList tempArray { get; set; }//暫存已接收的數據,避免粘包用的 12 13 14 public ConnectionUnit(string UID) 15 { 16 _uid = UID; 17 tempArray = new ArrayList(); 18 } 19 20 public ConnectionUnit() : this("-1") { } 21 22 public void Dispose() 23 { 24 if (_sendArg != null) 25 _sendArg.Dispose(); 26 if (_recArg != null) 27 _recArg.Dispose(); 28 29 _sendArg = null; 30 _recArg = null; 31 } 32 }
這個與之前一篇講異步通信的博文一樣,一個Socket兩個SocketAsyncEventArgs。為了取客戶端的Socket方便一點,兩個SocketAsyncEventArgs一個用於收,一個用於發,田大哥說這樣可以實現雙工。的確是這樣,當初把類定成這樣也是為了在收的同時也能發。以上代碼刪掉了把字段裝成屬性的那部分。
SocketAsyncEventArgsPool這個類就是上面鏈接單元的一個池,這個池才是整個連接池里最核心的池,也是真正意義上的池。池里面用了兩個集合來存放這一堆連接單元。分別是空閑棧(freeCollecton)和在線字典集(busyCollection)。對於這樣的一個池,我就認為,只要在我需要用的時候把對象取出來,頂多在去的時候給一些參數,讓池幫忙把這個對象配置好,我就拿來使用,等到我用完了,把他放回池里,池就把對象還原,等待下次再使用。正體感覺這個池對外有點像個棧,取的時候就Pop,放的時候就Push,此外還提供了兩個與棧不相干的的方法,根據編號獲取在線的連接單元和獲取所有在線連接單元的編號。
1 class SocketAsyncEventArgsPool:IDisposable 2 { 3 private Dictionary<string, ConnectionUnit> busyCollection; 4 private Stack<ConnectionUnit> freeCollecton; 5 6 internal SocketAsyncEventArgsPool() 7 { 8 busyCollection = new Dictionary<string, ConnectionUnit>(); 9 freeCollecton = new Stack<ConnectionUnit>(); 10 } 11 12 /// <summary> 13 /// 取出 14 /// </summary> 15 internal ConnectionUnit Pop(string uid) 16 { 17 ConnectionUnit unit = freeCollecton.Pop(); 18 unit.State = true; 19 unit.Uid = uid; 20 busyCollection.Add(uid, unit); 21 return unit; 22 } 23 24 /// <summary> 25 /// 放回 26 /// </summary> 27 internal void Push(ConnectionUnit unit) 28 { 29 if (!string.IsNullOrEmpty(unit.Uid) && unit.Uid != "-1") 30 busyCollection.Remove(unit.Uid); 31 unit.Uid = "-1"; 32 unit.State = false; 33 freeCollecton.Push(unit); 34 } 35 36 /// <summary> 37 /// 獲取 38 /// </summary> 39 internal ConnectionUnit GetConnectionUnitByUID(string uid) 40 { 41 if (busyCollection.ContainsKey(uid)) 42 return busyCollection[uid]; 43 return null; 44 } 45 46 /// <summary> 47 /// 48 /// </summary> 49 internal string[] GetOnLineList() 50 { 51 return busyCollection.Keys.ToArray(); 52 } 53 54 public void Dispose() 55 { 56 foreach (KeyValuePair<string,ConnectionUnit> item in busyCollection) 57 item.Value.Dispose(); 58 59 busyCollection.Clear(); 60 61 while (freeCollecton.Count > 0) 62 freeCollecton.Pop().Dispose(); 63 }
BufferManager這個是專給接收的SocketAsyncEventArgs用的緩沖池,整一個連接池里面所有接收用的緩沖區都用這個BufferManager,參照田大哥的思想,現在內存里開辟一大片區域存放byte,然后給每一個接收用的SocketAsyncEventArgs分配一塊。
1 class BufferManager:IDisposable 2 { 3 private byte[] buffers;//緩沖池 4 private int bufferSize;//每個單元使用的大小 5 private int allSize;//池的總大小 6 private int currentIndex;//當前可用的索引 7 private Stack<int> freeIndexs;//已使用過的空閑索引 8 9 /// <summary> 10 /// 構造緩存池 11 /// </summary> 12 /// <param name="buffersSize">池總大小</param> 13 /// <param name="defaultSize">默認單元大小</param> 14 internal BufferManager(int buffersSize, int defaultSize) 15 { 16 this.bufferSize=defaultSize; 17 this.allSize=buffersSize; 18 currentIndex=0; 19 this.buffers = new byte[allSize]; 20 freeIndexs = new Stack<int>(); 21 } 22 23 /// <summary> 24 /// 給SocketAsyncEventArgs設置緩沖區 25 /// </summary> 26 internal bool SetBuffer(SocketAsyncEventArgs e) 27 { 28 //首先看看空閑棧里有沒有空閑的區域,有就使用 29 if (freeIndexs.Count > 0) 30 { 31 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize); 32 } 33 else 34 { 35 //沒有就得從buffers里取,如果buffers用光了當然取不了 36 if ((allSize - currentIndex) < bufferSize) return false; 37 e.SetBuffer(buffers, currentIndex, bufferSize); 38 currentIndex += bufferSize; 39 } 40 return true; 41 } 42 43 /// <summary> 44 /// 釋放SocketAsyncEventArgs的緩沖區 45 /// </summary> 46 /// <param name="e"></param> 47 internal void FreeBuffer(SocketAsyncEventArgs e) 48 { 49 //把索引放到空閑索引棧里面,供下次取的時候重復利用 50 freeIndexs.Push(e.Offset); 51 //同時清空這部分區域的數據免得上次使用時遺留的數據會摻 52 //和到下次讀取的數據中 53 for (int i = e.Offset; i < e.Offset + bufferSize; i++) 54 { 55 if (buffers[i] == 0) break; 56 buffers[i] = 0; 57 } 58 e.SetBuffer(null, 0, 0); 59 } 60 61 public void Dispose() 62 { 63 buffers = null; 64 freeIndexs = null; 65 } 66 }
其實上面兩個池都是很大程度參照了《高性能Socket設計實現》中的內容。下面這個SocketPoolController是對外的類,這個類的設計參照的就沒那么多了。而對於一個Socket通信(服務端的)來說,無非都是三件事,接受連接,接收數據,發送數據。這三件事我再操作類里面是這樣做的
接受連接:接受再運行池的時候就開始了,異步循環地接受,執行一次異步接受就阻塞,等到接受完成才被喚醒。
1 /// <summary> 2 /// 異步Accept客戶端的連接 3 /// </summary> 4 void MyAsyncAccept() 5 { 6 //這里使用Action的方式異步循環接受客戶端的連接 7 //模仿同事的做法沒開線程,不知這種方式是好是壞 8 Action callback = new Action(delegate() 9 { 10 while (true) 11 { 12 //每次接受都要新開一個SocketAsyncEventArgs,否則會報錯 13 //其實我也想重復利用的 14 SocketAsyncEventArgs e = new SocketAsyncEventArgs(); 15 e.Completed += new EventHandler<SocketAsyncEventArgs>(Accept_Completed); 16 17 acceptLock.Reset(); 18 server.AcceptAsync(e); 19 //在異步接受完成之前阻塞當前線程 20 acceptLock.WaitOne(); 21 } 22 }); 23 callback.BeginInvoke(null, null); 24 } 25 26 void Accept_Completed(object sender, SocketAsyncEventArgs e) 27 { 28 Socket client = e.AcceptSocket; 29 try 30 { 31 if (client.Connected) 32 { 33 IPEndPoint point = client.RemoteEndPoint as IPEndPoint; 34 string uid = point.Address + ":" + point.Port; 35 ConnectionUnit unit = pool.Pop(uid); 36 unit.client = client; 37 unit.State = true; 38 unit.Uid = uid; 39 unit.RecArg.UserToken = unit; 40 unit.SendArg.UserToken = unit; 41 buffer.SetBuffer(unit.RecArg); 42 43 //在接受成功之后就開始接收數據了 44 client.ReceiveAsync(unit.RecArg); 45 //設置並發限制信號和增加當前連接數 46 semaphoreAccept.WaitOne(); 47 Interlocked.Increment(ref currentConnect); 48 49 if (OnAccept != null) OnAccept(uid); 50 } 51 else if (client != null) 52 { 53 client.Close(); 54 client.Dispose(); 55 client = null; 56 } 57 } 58 catch (Exception ex) { Console.WriteLine(ex.ToString()); } 59 //設置Accept信號,以便下次Accept的執行 60 acceptLock.Set(); 61 e.Dispose(); 62 }
接收消息:在異步接受成功的時候開始接收,每次接收完成之后就進行下一次接收,直到客戶端斷開連接才終止。
1 void RecArg_Completed(object sender, SocketAsyncEventArgs e) 2 { 3 Socket client = sender as Socket; 4 ConnectionUnit unit = e.UserToken as ConnectionUnit; 5 //這里大致與上一篇異步通信的一樣,只是對緩沖區的處理有一點差異 6 if (e.SocketError == SocketError.Success) 7 { 8 int rec = e.BytesTransferred; 9 if (rec == 0) 10 { 11 CloseSocket(unit); 12 return; 13 } 14 if (client.Available > 0) 15 { 16 unit.tempArray.AddRange(e.Buffer); 17 buffer.FreeBuffer(unit.RecArg); 18 buffer.SetBuffer(unit.RecArg); 19 client.SendAsync(unit.RecArg); 20 return; 21 } 22 byte[] data = e.Buffer; 23 int len = rec; 24 int offset = e.Offset; 25 if (unit.tempArray.Count != 0) 26 { 27 foreach (byte item in data) 28 { 29 if (item == 0) break; 30 unit.tempArray.Add(item); 31 } 32 data = unit.tempArray.ToArray(typeof(byte)) as byte[]; 33 rec = data.Length; 34 offset = 0; 35 unit.tempArray.Clear(); 36 } 37 38 string dataStr = Encoding.ASCII.GetString(data,offset,len); 39 if (OnReceive != null) 40 OnReceive(unit.Uid, dataStr); 41 42 if (!unit.State) return; 43 buffer.FreeBuffer(e); 44 buffer.SetBuffer(e); 45 client.ReceiveAsync(e); 46 } 47 //這里還多個了一個關閉當前連接 48 else 49 { 50 CloseSocket(unit); 51 } 52 } 53 54 /// <summary> 55 /// 關閉一個連接單元 56 /// </summary> 57 private void CloseSocket( ConnectionUnit unit ) 58 { 59 //關閉並釋放客戶端socket的字眼 60 if (unit.client != null) 61 { 62 unit.client.Shutdown(SocketShutdown.Both); 63 unit.client.Dispose(); 64 unit.client = null; 65 } 66 //Console.WriteLine(unit.Uid+" disconnect "); 67 //把連接放回連接池 68 pool.Push(unit); 69 //釋放並發信號 70 semaphoreAccept.Release(); 71 //減少當前連接數 72 Interlocked.Decrement(ref currentConnect); 73 }
發送消息:外放方法,在需要的時候自行調用方法發送。
1 /// <summary> 2 /// 發送消息 3 /// </summary> 4 /// <param name="uid"></param> 5 /// <param name="message"></param> 6 public void SendMessage(string uid, string message) 7 { 8 sendLock.Reset(); 9 ConnectionUnit unit=pool.GetConnectionUnitByUID(uid); 10 //如果獲取不了連接單元就不發送了, 11 if (unit == null) 12 { 13 if(OnSend!=null) OnSend(uid,"100"); 14 sendLock.Set(); 15 return; 16 } 17 byte[] datas = Encoding.ASCII.GetBytes(message); 18 unit.SendArg.SetBuffer(datas, 0, datas.Length); 19 unit.client.SendAsync(unit.SendArg); 20 //阻塞當前線程,等到發送完成才釋放 21 sendLock.WaitOne(); 22 } 23 24 void SendArg_Completed(object sender, SocketAsyncEventArgs e) 25 { 26 Socket client = sender as Socket; 27 ConnectionUnit unit = e.UserToken as ConnectionUnit; 28 //這里的消息碼有三個,2字頭的是成功的,1字頭是不成功的 29 //101是未知錯誤,100是客戶端不在線 30 if (e.SocketError == SocketError.Success) 31 if (OnSend != null) OnSend(unit.Uid, "200"); 32 else if (OnSend != null) OnSend(unit.Uid, "101"); 33 //釋放信號,以便下次發送消息執行 34 sendLock.Set(); 35 }
下面則是類里面的一些字段信息和構造函數
1 /// <summary> 2 /// 初始化池的互斥體 3 /// </summary> 4 private Mutex mutex = new Mutex(); 5 6 /// <summary> 7 /// Accept限制信號 8 /// </summary> 9 private Semaphore semaphoreAccept; 10 11 /// <summary> 12 /// Accept信號 13 /// </summary> 14 private static ManualResetEvent acceptLock = new ManualResetEvent(false); 15 16 /// <summary> 17 /// Send信號 18 /// </summary> 19 private static ManualResetEvent sendLock = new ManualResetEvent(false); 20 21 /// <summary> 22 /// 最大並發數(連接數) 23 /// </summary> 24 private int maxConnect; 25 26 /// <summary> 27 /// 當前連接數(並發數) 28 /// </summary> 29 private int currentConnect; 30 31 /// <summary> 32 /// 緩沖池 33 /// </summary> 34 private BufferManager buffer; 35 36 /// <summary> 37 /// SocketasyncEventArgs池 38 /// </summary> 39 private SocketAsyncEventArgsPool pool; 40 41 /// <summary> 42 /// 服務端Socket 43 /// </summary> 44 private Socket server; 45 46 /// <summary> 47 /// 完成接受的委托 48 /// </summary> 49 public delegate void AcceptHandler(string uid); 50 51 /// <summary> 52 /// 完成發送的委托 53 /// </summary> 54 public delegate void SendHandler(string uid, string result); 55 56 /// <summary> 57 /// 完成接收的委托 58 /// </summary> 59 public delegate void RecevieHandler(string uid, string data); 60 61 /// <summary> 62 /// 完成接受事件 63 /// </summary> 64 public event AcceptHandler OnAccept; 65 66 /// <summary> 67 /// 完成發送事件 68 /// </summary> 69 public event SendHandler OnSend; 70 71 /// <summary> 72 /// 完成接收事件 73 /// </summary> 74 public event RecevieHandler OnReceive; 75 76 /// <summary> 77 /// 構造函數 78 /// </summary> 79 /// <param name="buffersize">單元緩沖區大小</param> 80 /// <param name="maxCount">並發總數</param> 81 public SocketPoolController(int buffersize, int maxCount) 82 { 83 buffer = new BufferManager(buffersize * maxCount,buffersize); 84 this.currentConnect = 0; 85 this.maxConnect = maxCount; 86 this.currentConnect = 0; 87 this.pool = new SocketAsyncEventArgsPool(); 88 //設置並發數信號,經試驗過是並發數-1才對 89 this.semaphoreAccept = new Semaphore(maxCount-1, maxCount-1); 90 InitPool(); 91 }
構造函數里用到的方法
1 /// <summary> 2 /// 初始化SocketAsyncEventArgs池 3 /// 這里主要是給空閑棧填充足夠的實例 4 /// </summary> 5 private void InitPool() 6 { 7 ConnectionUnit unit = null; 8 for (int i = 0; i < maxConnect; i++) 9 { 10 unit = new ConnectionUnit(); 11 unit.Uid = "-1"; 12 unit.RecArg = new SocketAsyncEventArgs(); 13 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs>(RecArg_Completed); 14 unit.SendArg = new SocketAsyncEventArgs(); 15 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs>(SendArg_Completed); 16 this.pool.Push(unit); 17 } 18 }
其他外放專門控制池的方法
1 /// <summary> 2 /// 啟動池 3 /// </summary> 4 /// <param name="ipAddress">服務端的IP</param> 5 /// <param name="port">端口</param> 6 public void RunPool(string ipAddress, int port) 7 { 8 IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); 9 server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 10 server.Bind(endpoint); 11 server.Listen(100); 12 13 //調用方法異步Accept客戶端的連接 14 MyAsyncAccept(); 15 //設置信號,防止再池在已經啟動的情況下再次啟動 16 mutex.WaitOne(); 17 } 18 19 /// <summary> 20 /// 停止池 21 /// </summary> 22 public void StopPool() 23 { 24 //把服務端的socket關了 25 if (server != null) 26 server.Close(); 27 //釋放互斥信號,等待下次啟動 28 mutex.ReleaseMutex(); 29 //釋放資源 30 Dispose(); 31 }
要把這個操作類與現實的事物類比的話,這個與我們平常跟某個或一堆人聊天時差不多,別人說的東西,不用你自己控制你都會聽得到(排除帶上耳機,塞住耳朵這種極端的情況),所以接收消息那個方法就不需要了,而你要說什么這個要靠個人控制,而事件那些也好類比,OnAccept就相當於某人加入到這次聊天中你會做些什么(say hello 或者無視),OnRecevie就聽到別人說什么自己有什么反應,OnSend就自己說完話之后有什么反應(有時候發現說錯了會糾正,有時覺得自己說得好笑的也笑一場諸如此類)。
在使用的時候可以這樣子
1 SocketPoolController pool; 2 pool = new SocketPoolController(32 * 1024, 1000); 3 pool.OnReceive += new SocketPoolController.RecevieHandler(pool_OnReceive); 4 pool.OnSend += new SocketPoolController.SendHandler(pool_OnSend); 5 pool.OnAccept += new SocketPoolController.AcceptHandler(pool_OnAccept); 6 pool.RunPool("127.0.0.1", 8081); 7 Console.WriteLine("Pool has run\r\npress any key to stop..."); 8 Console.ReadKey(); 9 pool.StopPool(); 10 Console.WriteLine("Pool has stop\r\npress any key to exit..."); 11 Console.ReadLine();
在池里面有一部分地方看似跟用同步的差不多,像接受客戶端的連接,發送消息這些地方。可是用這種異步,萬一客戶端突然斷開連接也不會有同步那樣馬上拋異常。還有一點的是在這份代碼里面缺少了對異常的捕捉,有一部分錯誤我在測試的過程中設了判斷避開了。以前跟某只貓提過Socket編程會與多線程一起使用,我也覺得是這樣,在之前一篇博文 《Socket一對多通信》里我也用到線程,后來的異步通信也是有線程的,不過不是.net framework自行創建的。看了田大哥的博文給我的另一個收獲是信號量的使用,在以前不懂得使用信號量,只會設置一大堆標識狀態的布爾值或整形的變量來計數判斷。田大哥的博文介紹的時高性能的Socket,而我的這個應該性能不會高到哪里去。上面有什么說錯的請各位指出,有什么說漏的,請各位提點,多多指導。謝謝!

1 /// <summary> 2 /// 連接單元 3 /// </summary> 4 class ConnectionUnit:IDisposable 5 { 6 private string _uid;//單元的編號,默認為-1 7 private bool _state;//單元的狀態,true表示使用,false表示空閑 8 private SocketAsyncEventArgs _sendArg;//專用於發送 9 private SocketAsyncEventArgs _recArg;//專用於接收 10 internal Socket client { get; set; }//客戶端的socket實例 11 internal ArrayList tempArray { get; set; }//暫存已接收的數據,避免粘包用的 12 13 public string Uid 14 { 15 get { return _uid; } 16 set { _uid = value; } 17 } 18 19 public ConnectionUnit(string UID) 20 { 21 _uid = UID; 22 tempArray = new ArrayList(); 23 } 24 25 public ConnectionUnit() : this("-1") { } 26 27 public bool State 28 { 29 get { return _state; } 30 set { _state = value; } 31 } 32 33 public SocketAsyncEventArgs SendArg 34 { 35 get { return _sendArg; } 36 set { _sendArg = value; } 37 } 38 39 public SocketAsyncEventArgs RecArg 40 { 41 get { return _recArg; } 42 set { _recArg = value; } 43 } 44 45 public void Dispose() 46 { 47 if (_sendArg != null) 48 _sendArg.Dispose(); 49 if (_recArg != null) 50 _recArg.Dispose(); 51 52 _sendArg = null; 53 _recArg = null; 54 } 55 } 56 57 class BufferManager:IDisposable 58 { 59 private byte[] buffers; 60 private int bufferSize; 61 private int allSize; 62 private int currentIndex; 63 private Stack<int> freeIndexs; 64 65 /// <summary> 66 /// 構造緩存池 67 /// </summary> 68 /// <param name="buffersSize">池總大小</param> 69 /// <param name="defaultSize">默認單元大小</param> 70 internal BufferManager(int buffersSize, int defaultSize) 71 { 72 this.bufferSize=defaultSize; 73 this.allSize=buffersSize; 74 currentIndex=0; 75 this.buffers = new byte[allSize]; 76 freeIndexs = new Stack<int>(); 77 } 78 79 /// <summary> 80 /// 81 /// </summary> 82 /// <param name="e"></param> 83 /// <param name="offSet"></param> 84 /// <returns></returns> 85 internal bool SetBuffer(SocketAsyncEventArgs e) 86 { 87 if (freeIndexs.Count > 0) 88 { 89 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize); 90 } 91 else 92 { 93 if ((allSize - currentIndex) < bufferSize) return false; 94 e.SetBuffer(buffers, currentIndex, bufferSize); 95 currentIndex += bufferSize; 96 } 97 return true; 98 } 99 100 /// <summary> 101 /// 102 /// </summary> 103 /// <param name="e"></param> 104 internal void FreeBuffer(SocketAsyncEventArgs e) 105 { 106 freeIndexs.Push(e.Offset); 107 for (int i = e.Offset; i < e.Offset + bufferSize; i++) 108 { 109 if (buffers[i] == 0) break; 110 buffers[i] = 0; 111 } 112 e.SetBuffer(null, 0, 0); 113 } 114 115 public void Dispose() 116 { 117 buffers = null; 118 freeIndexs = null; 119 } 120 } 121 122 class SocketAsyncEventArgsPool:IDisposable 123 { 124 private Dictionary<string, ConnectionUnit> busyCollection; 125 private Stack<ConnectionUnit> freeCollecton; 126 127 internal SocketAsyncEventArgsPool() 128 { 129 busyCollection = new Dictionary<string, ConnectionUnit>(); 130 freeCollecton = new Stack<ConnectionUnit>(); 131 } 132 133 /// <summary> 134 /// 取出 135 /// </summary> 136 internal ConnectionUnit Pop(string uid) 137 { 138 ConnectionUnit unit = freeCollecton.Pop(); 139 unit.State = true; 140 unit.Uid = uid; 141 busyCollection.Add(uid, unit); 142 return unit; 143 } 144 145 /// <summary> 146 /// 放回 147 /// </summary> 148 internal void Push(ConnectionUnit unit) 149 { 150 if (!string.IsNullOrEmpty(unit.Uid) && unit.Uid != "-1") 151 busyCollection.Remove(unit.Uid); 152 unit.Uid = "-1"; 153 unit.State = false; 154 freeCollecton.Push(unit); 155 } 156 157 /// <summary> 158 /// 獲取 159 /// </summary> 160 internal ConnectionUnit GetConnectionUnitByUID(string uid) 161 { 162 if (busyCollection.ContainsKey(uid)) 163 return busyCollection[uid]; 164 return null; 165 } 166 167 /// <summary> 168 /// 169 /// </summary> 170 internal string[] GetOnLineList() 171 { 172 return busyCollection.Keys.ToArray(); 173 } 174 175 public void Dispose() 176 { 177 foreach (KeyValuePair<string,ConnectionUnit> item in busyCollection) 178 item.Value.Dispose(); 179 180 busyCollection.Clear(); 181 182 while (freeCollecton.Count > 0) 183 freeCollecton.Pop().Dispose(); 184 } 185 } 186 187 public class SocketPoolController:IDisposable 188 { 189 /// <summary> 190 /// 初始化池的互斥體 191 /// </summary> 192 private Mutex mutex = new Mutex(); 193 194 /// <summary> 195 /// Accept限制信號 196 /// </summary> 197 private Semaphore semaphoreAccept; 198 199 /// <summary> 200 /// Accept信號 201 /// </summary> 202 private static ManualResetEvent acceptLock = new ManualResetEvent(false); 203 204 /// <summary> 205 /// Send信號 206 /// </summary> 207 private static ManualResetEvent sendLock = new ManualResetEvent(false); 208 209 /// <summary> 210 /// 最大並發數(連接數) 211 /// </summary> 212 private int maxConnect; 213 214 /// <summary> 215 /// 當前連接數(並發數) 216 /// </summary> 217 private int currentConnect; 218 219 /// <summary> 220 /// 緩沖池 221 /// </summary> 222 private BufferManager buffer; 223 224 /// <summary> 225 /// SocketasyncEventArgs池 226 /// </summary> 227 private SocketAsyncEventArgsPool pool; 228 229 /// <summary> 230 /// 服務端Socket 231 /// </summary> 232 private Socket server; 233 234 /// <summary> 235 /// 完成接受的委托 236 /// </summary> 237 public delegate void AcceptHandler(string uid); 238 239 /// <summary> 240 /// 完成發送的委托 241 /// </summary> 242 public delegate void SendHandler(string uid, string result); 243 244 /// <summary> 245 /// 完成接收的委托 246 /// </summary> 247 public delegate void RecevieHandler(string uid, string data); 248 249 /// <summary> 250 /// 完成接受事件 251 /// </summary> 252 public event AcceptHandler OnAccept; 253 254 /// <summary> 255 /// 完成發送事件 256 /// </summary> 257 public event SendHandler OnSend; 258 259 /// <summary> 260 /// 完成接收事件 261 /// </summary> 262 public event RecevieHandler OnReceive; 263 264 /// <summary> 265 /// 構造函數 266 /// </summary> 267 /// <param name="buffersize">單元緩沖區大小</param> 268 /// <param name="maxCount">並發總數</param> 269 public SocketPoolController(int buffersize, int maxCount) 270 { 271 buffer = new BufferManager(buffersize * maxCount,buffersize); 272 this.currentConnect = 0; 273 this.maxConnect = maxCount; 274 this.currentConnect = 0; 275 this.pool = new SocketAsyncEventArgsPool(); 276 //設置並發數信號,經試驗過是並發數-1才對 277 this.semaphoreAccept = new Semaphore(maxCount-1, maxCount-1); 278 InitPool(); 279 } 280 281 /// <summary> 282 /// 初始化SocketAsyncEventArgs池 283 /// 這里主要是給空閑棧填充足夠的實例 284 /// </summary> 285 private void InitPool() 286 { 287 ConnectionUnit unit = null; 288 for (int i = 0; i < maxConnect; i++) 289 { 290 unit = new ConnectionUnit(); 291 unit.Uid = "-1"; 292 unit.RecArg = new SocketAsyncEventArgs(); 293 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs>(RecArg_Completed); 294 unit.SendArg = new SocketAsyncEventArgs(); 295 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs>(SendArg_Completed); 296 this.pool.Push(unit); 297 } 298 } 299 300 /// <summary> 301 /// 啟動池 302 /// </summary> 303 /// <param name="ipAddress">服務端的IP</param> 304 /// <param name="port">端口</param> 305 public void RunPool(string ipAddress, int port) 306 { 307 IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); 308 server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 309 server.Bind(endpoint); 310 server.Listen(100); 311 312 //調用方法異步Accept客戶端的連接 313 MyAsyncAccept(); 314 //設置信號,防止再池在已經啟動的情況下再次啟動 315 mutex.WaitOne(); 316 } 317 318 /// <summary> 319 /// 異步Accept客戶端的連接 320 /// </summary> 321 void MyAsyncAccept() 322 { 323 //這里使用Action的方式異步循環接受客戶端的連接 324 //模仿同事的做法沒開線程,不知這種方式是好是壞 325 Action callback = new Action(delegate() 326 { 327 while (true) 328 { 329 //每次接受都要新開一個SocketAsyncEventArgs,否則會報錯 330 //其實我也想重復利用的 331 SocketAsyncEventArgs e = new SocketAsyncEventArgs(); 332 e.Completed += new EventHandler<SocketAsyncEventArgs>(Accept_Completed); 333 334 acceptLock.Reset(); 335 server.AcceptAsync(e); 336 //在異步接受完成之前阻塞當前線程 337 acceptLock.WaitOne(); 338 } 339 }); 340 callback.BeginInvoke(null, null); 341 } 342 343 344 /// <summary> 345 /// 停止池 346 /// </summary> 347 public void StopPool() 348 { 349 //把服務端的socket關了 350 if (server != null) 351 server.Close(); 352 //釋放互斥信號,等待下次啟動 353 mutex.ReleaseMutex(); 354 //釋放資源 355 Dispose(); 356 } 357 358 /// <summary> 359 /// 發送消息 360 /// </summary> 361 /// <param name="uid"></param> 362 /// <param name="message"></param> 363 public void SendMessage(string uid, string message) 364 { 365 sendLock.Reset(); 366 ConnectionUnit unit=pool.GetConnectionUnitByUID(uid); 367 //如果獲取不了連接單元就不發送了, 368 if (unit == null) 369 { 370 if(OnSend!=null) OnSend(uid,"100"); 371 sendLock.Set(); 372 return; 373 } 374 byte[] datas = Encoding.ASCII.GetBytes(message); 375 unit.SendArg.SetBuffer(datas, 0, datas.Length); 376 unit.client.SendAsync(unit.SendArg); 377 //阻塞當前線程,等到發送完成才釋放 378 sendLock.WaitOne(); 379 } 380 381 void SendArg_Completed(object sender, SocketAsyncEventArgs e) 382 { 383 Socket client = sender as Socket; 384 ConnectionUnit unit = e.UserToken as ConnectionUnit; 385 //這里的消息碼有三個,2字頭的是成功的,1字頭是不成功的 386 //101是未知錯誤,100是客戶端不在線 387 if (e.SocketError == SocketError.Success) 388 if (OnSend != null) OnSend(unit.Uid, "200"); 389 else if (OnSend != null) OnSend(unit.Uid, "101"); 390 //釋放信號,以便下次發送消息執行 391 sendLock.Set(); 392 } 393 394 void RecArg_Completed(object sender, SocketAsyncEventArgs e) 395 { 396 Socket client = sender as Socket; 397 ConnectionUnit unit = e.UserToken as ConnectionUnit; 398 //這里大致與上一篇異步通信的一樣,只是對緩沖區的處理有一點差異 399 if (e.SocketError == SocketError.Success) 400 { 401 int rec = e.BytesTransferred; 402 if (rec == 0) 403 { 404 CloseSocket(unit); 405 return; 406 } 407 if (client.Available > 0) 408 { 409 unit.tempArray.AddRange(e.Buffer); 410 buffer.FreeBuffer(unit.RecArg); 411 buffer.SetBuffer(unit.RecArg); 412 client.SendAsync(unit.RecArg); 413 return; 414 } 415 byte[] data = e.Buffer; 416 int len = rec; 417 if (unit.tempArray.Count != 0) 418 { 419 foreach (byte item in data) 420 { 421 if (item == 0) break; 422 unit.tempArray.Add(item); 423 } 424 data = unit.tempArray.ToArray(typeof(byte)) as byte[]; 425 rec = data.Length; 426 unit.tempArray.Clear(); 427 } 428 429 string dataStr = Encoding.ASCII.GetString(data, 0, len); 430 if (OnReceive != null) 431 OnReceive(unit.Uid, dataStr); 432 433 if (!unit.State) return; 434 buffer.FreeBuffer(e); 435 buffer.SetBuffer(e); 436 client.ReceiveAsync(e); 437 } 438 //這里還多個了一個關閉當前連接 439 else 440 { 441 CloseSocket(unit); 442 } 443 } 444 445 void Accept_Completed(object sender, SocketAsyncEventArgs e) 446 { 447 Socket client = e.AcceptSocket; 448 try 449 { 450 if (client.Connected) 451 { 452 IPEndPoint point = client.RemoteEndPoint as IPEndPoint; 453 string uid = point.Address + ":" + point.Port; 454 ConnectionUnit unit = pool.Pop(uid); 455 unit.client = client; 456 unit.State = true; 457 unit.Uid = uid; 458 unit.RecArg.UserToken = unit; 459 unit.SendArg.UserToken = unit; 460 buffer.SetBuffer(unit.RecArg); 461 462 //在接受成功之后就開始接收數據了 463 client.ReceiveAsync(unit.RecArg); 464 //設置並發限制信號和增加當前連接數 465 semaphoreAccept.WaitOne(); 466 Interlocked.Increment(ref currentConnect); 467 468 if (OnAccept != null) OnAccept(uid); 469 } 470 else if (client != null) 471 { 472 client.Close(); 473 client.Dispose(); 474 client = null; 475 } 476 } 477 catch (Exception ex) { Console.WriteLine(ex.ToString()); } 478 //設置Accept信號,以便下次Accept的執行 479 acceptLock.Set(); 480 e.Dispose(); 481 } 482 483 /// <summary> 484 /// 關閉一個連接單元 485 /// </summary> 486 private void CloseSocket( ConnectionUnit unit ) 487 { 488 //關閉並釋放客戶端socket的字眼 489 if (unit.client != null) 490 { 491 unit.client.Shutdown(SocketShutdown.Both); 492 unit.client.Dispose(); 493 unit.client = null; 494 } 495 //Console.WriteLine(unit.Uid+" disconnect "); 496 //把連接放回連接池 497 pool.Push(unit); 498 //釋放並發信號 499 semaphoreAccept.Release(); 500 //減少當前連接數 501 Interlocked.Decrement(ref currentConnect); 502 } 503 504 public void Dispose() 505 { 506 if (pool != null) 507 { 508 pool.Dispose(); 509 pool = null; 510 } 511 if (buffer != null) 512 { 513 buffer.Dispose(); 514 buffer = null; 515 } 516 if (server != null) 517 { 518 server.Dispose(); 519 server = null; 520 } 521 522 } 523 }