Socket連接池


  “池”這個概念好像最早是在操作系統的課上聽過的,到后來出來工作的第二天組長也跟我提起“池”這個東東。它給我的感覺是某種對象的集合,如果要用的話就取出,不用的話就放回。在學多線程的時候有接觸過線程池,在寫《Socket 一對多通信》的時候想到了Socket連接池這回事,不過在網上谷歌了一下,發現這類的文章貌似不多,看了一下園友的博文《高性能Socket設計實現》,獲益良多,下了份源碼來看,雖然有一部分看不明白,而且由於個人水平跑不了那份代碼,但是從中我學到了不少,至少我寫的“池”有一部分是用了這位田大哥的思想。

  先來分析各個類之間的結構,整個連接池里面實際上是有兩個池,一個是在異步通信中可以重復利用的SocketAsyncEventArgs池(當然處於別的方面考慮,池里面並不是單純放SocketAsyncEventArgs的實例集合),另一個是接收數據時用到的byte[]緩沖池。而這兩個池是外部不能訪問的,外部通過一個控制(或者叫管理)的類進行操作。繼承了前兩篇博文中的異步通信的思想,最底的一個類存放了一次連接的信息,包括兩個SocketAsyncEventArgs實例和通信另一端的Socket實例。下面將逐部分介紹。

 

 1     /// <summary>
 2     /// 連接單元
 3     /// </summary>
 4      class ConnectionUnit:IDisposable
 5     {
 6         private string _uid;//單元的編號,默認為-1
 7         private bool _state;//單元的狀態,true表示使用,false表示空閑
 8         private SocketAsyncEventArgs _sendArg;//專用於發送
 9         private SocketAsyncEventArgs _recArg;//專用於接收
10         internal Socket client { get; set; }//客戶端的socket實例
11         internal ArrayList tempArray { get; set; }//暫存已接收的數據,避免粘包用的
12 
13 
14         public ConnectionUnit(string UID)
15         {
16             _uid = UID;
17             tempArray = new ArrayList();
18         }
19 
20         public ConnectionUnit() : this("-1") { }
21 
22         public void Dispose()
23         {
24             if (_sendArg != null)
25                 _sendArg.Dispose();
26             if (_recArg != null)
27                 _recArg.Dispose();
28 
29             _sendArg = null;
30             _recArg = null;
31         }
32     }

 

  這個與之前一篇講異步通信的博文一樣,一個Socket兩個SocketAsyncEventArgs。為了取客戶端的Socket方便一點,兩個SocketAsyncEventArgs一個用於收,一個用於發,田大哥說這樣可以實現雙工。的確是這樣,當初把類定成這樣也是為了在收的同時也能發。以上代碼刪掉了把字段裝成屬性的那部分。

  SocketAsyncEventArgsPool這個類就是上面鏈接單元的一個池,這個池才是整個連接池里最核心的池,也是真正意義上的池。池里面用了兩個集合來存放這一堆連接單元。分別是空閑棧(freeCollecton)和在線字典集(busyCollection)。對於這樣的一個池,我就認為,只要在我需要用的時候把對象取出來,頂多在去的時候給一些參數,讓池幫忙把這個對象配置好,我就拿來使用,等到我用完了,把他放回池里,池就把對象還原,等待下次再使用。正體感覺這個池對外有點像個棧,取的時候就Pop,放的時候就Push,此外還提供了兩個與棧不相干的的方法,根據編號獲取在線的連接單元和獲取所有在線連接單元的編號。

 1     class SocketAsyncEventArgsPool:IDisposable
 2     {
 3         private Dictionary<string, ConnectionUnit> busyCollection;
 4         private Stack<ConnectionUnit> freeCollecton;
 5 
 6         internal SocketAsyncEventArgsPool()
 7         {
 8             busyCollection = new Dictionary<string, ConnectionUnit>();
 9             freeCollecton = new Stack<ConnectionUnit>();
10         }
11 
12         /// <summary>
13         /// 取出
14         /// </summary>
15         internal ConnectionUnit Pop(string uid)
16         {
17             ConnectionUnit unit = freeCollecton.Pop();
18             unit.State = true;
19             unit.Uid = uid;
20             busyCollection.Add(uid, unit);
21             return unit;
22         }
23 
24         /// <summary>
25         /// 放回
26         /// </summary>
27         internal void Push(ConnectionUnit unit)
28         {
29             if (!string.IsNullOrEmpty(unit.Uid) && unit.Uid != "-1")
30                 busyCollection.Remove(unit.Uid);
31             unit.Uid = "-1";
32             unit.State = false;
33             freeCollecton.Push(unit);
34         }
35 
36         /// <summary>
37         /// 獲取
38         /// </summary>
39         internal ConnectionUnit GetConnectionUnitByUID(string uid)
40         {
41             if (busyCollection.ContainsKey(uid))
42                 return busyCollection[uid];
43             return null;
44         }
45 
46         /// <summary>
47         /// 
48         /// </summary>
49         internal string[] GetOnLineList()
50         {
51             return busyCollection.Keys.ToArray();
52         }
53 
54         public void Dispose()
55         {
56             foreach (KeyValuePair<string,ConnectionUnit> item in busyCollection)
57                 item.Value.Dispose();
58 
59             busyCollection.Clear();
60 
61             while (freeCollecton.Count > 0)
62                 freeCollecton.Pop().Dispose();
63         }

 

  BufferManager這個是專給接收的SocketAsyncEventArgs用的緩沖池,整一個連接池里面所有接收用的緩沖區都用這個BufferManager,參照田大哥的思想,現在內存里開辟一大片區域存放byte,然后給每一個接收用的SocketAsyncEventArgs分配一塊。

 1     class BufferManager:IDisposable
 2     {
 3         private byte[] buffers;//緩沖池
 4         private int bufferSize;//每個單元使用的大小
 5         private int allSize;//池的總大小
 6         private int currentIndex;//當前可用的索引
 7         private Stack<int> freeIndexs;//已使用過的空閑索引
 8 
 9         /// <summary>
10         /// 構造緩存池
11         /// </summary>
12         /// <param name="buffersSize">池總大小</param>
13         /// <param name="defaultSize">默認單元大小</param>
14         internal BufferManager(int buffersSize, int defaultSize) 
15         {
16             this.bufferSize=defaultSize;
17             this.allSize=buffersSize;
18             currentIndex=0;
19             this.buffers = new byte[allSize];
20             freeIndexs = new Stack<int>();
21         }
22 
23         /// <summary>
24         /// 給SocketAsyncEventArgs設置緩沖區
25         /// </summary>
26         internal bool SetBuffer(SocketAsyncEventArgs e)
27         {
28             //首先看看空閑棧里有沒有空閑的區域,有就使用
29             if (freeIndexs.Count > 0)
30             {
31                 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize);
32             }
33             else
34             {
35                 //沒有就得從buffers里取,如果buffers用光了當然取不了
36                 if ((allSize - currentIndex) < bufferSize) return false;
37                 e.SetBuffer(buffers, currentIndex, bufferSize);
38                 currentIndex += bufferSize;
39             }
40             return true;
41         }
42 
43         /// <summary>
44         /// 釋放SocketAsyncEventArgs的緩沖區
45         /// </summary>
46         /// <param name="e"></param>
47         internal void FreeBuffer(SocketAsyncEventArgs e)
48         {
49             //把索引放到空閑索引棧里面,供下次取的時候重復利用
50             freeIndexs.Push(e.Offset);
51             //同時清空這部分區域的數據免得上次使用時遺留的數據會摻
52             //和到下次讀取的數據中
53             for (int i = e.Offset; i < e.Offset + bufferSize; i++)
54             {
55                 if (buffers[i] == 0) break;
56                 buffers[i] = 0;
57             }
58             e.SetBuffer(null, 0, 0);
59         }
60 
61         public void Dispose()
62         {
63             buffers = null;
64             freeIndexs = null;
65         }
66     }

 

  其實上面兩個池都是很大程度參照了《高性能Socket設計實現》中的內容。下面這個SocketPoolController是對外的類,這個類的設計參照的就沒那么多了。而對於一個Socket通信(服務端的)來說,無非都是三件事,接受連接,接收數據,發送數據。這三件事我再操作類里面是這樣做的

  接受連接:接受再運行池的時候就開始了,異步循環地接受,執行一次異步接受就阻塞,等到接受完成才被喚醒。

 1         /// <summary>
 2         /// 異步Accept客戶端的連接
 3         /// </summary>
 4         void MyAsyncAccept()
 5         {
 6             //這里使用Action的方式異步循環接受客戶端的連接
 7             //模仿同事的做法沒開線程,不知這種方式是好是壞
 8             Action callback = new Action(delegate()
 9             {
10                 while (true)
11                 {
12                     //每次接受都要新開一個SocketAsyncEventArgs,否則會報錯
13                     //其實我也想重復利用的
14                     SocketAsyncEventArgs e = new SocketAsyncEventArgs();
15                     e.Completed += new EventHandler<SocketAsyncEventArgs>(Accept_Completed);
16                     
17                     acceptLock.Reset();
18                     server.AcceptAsync(e);
19                     //在異步接受完成之前阻塞當前線程
20                     acceptLock.WaitOne();
21                 }
22             });
23             callback.BeginInvoke(null, null);
24         }
25 
26         void Accept_Completed(object sender, SocketAsyncEventArgs e)
27         {
28             Socket client = e.AcceptSocket;
29             try
30             {
31                 if (client.Connected)
32                 {
33                     IPEndPoint point = client.RemoteEndPoint as IPEndPoint;
34                     string uid = point.Address + ":" + point.Port;
35                     ConnectionUnit unit = pool.Pop(uid);
36                     unit.client = client;
37                     unit.State = true;
38                     unit.Uid = uid;
39                     unit.RecArg.UserToken = unit;
40                     unit.SendArg.UserToken = unit;
41                     buffer.SetBuffer(unit.RecArg);
42 
43                     //在接受成功之后就開始接收數據了
44                     client.ReceiveAsync(unit.RecArg);
45                     //設置並發限制信號和增加當前連接數
46                     semaphoreAccept.WaitOne();
47                     Interlocked.Increment(ref currentConnect);
48 
49                     if (OnAccept != null) OnAccept(uid);
50                 }
51                 else if (client != null)
52                 {
53                     client.Close();
54                     client.Dispose();
55                     client = null;
56                 }
57             }
58             catch (Exception ex) { Console.WriteLine(ex.ToString()); }
59             //設置Accept信號,以便下次Accept的執行
60             acceptLock.Set();
61             e.Dispose();
62         }

 

  接收消息:在異步接受成功的時候開始接收,每次接收完成之后就進行下一次接收,直到客戶端斷開連接才終止。

 1         void RecArg_Completed(object sender, SocketAsyncEventArgs e)
 2         {
 3             Socket client = sender as Socket;
 4             ConnectionUnit unit = e.UserToken as ConnectionUnit;
 5             //這里大致與上一篇異步通信的一樣,只是對緩沖區的處理有一點差異
 6             if (e.SocketError == SocketError.Success)
 7             {
 8                 int rec = e.BytesTransferred;
 9                 if (rec == 0)
10                 {
11                     CloseSocket(unit);
12                     return;
13                 }
14                 if (client.Available > 0)
15                 {
16                     unit.tempArray.AddRange(e.Buffer);
17                     buffer.FreeBuffer(unit.RecArg);
18                     buffer.SetBuffer(unit.RecArg);
19                     client.SendAsync(unit.RecArg);
20                     return;
21                 }
22                 byte[] data = e.Buffer;
23                 int len = rec;
24                 int offset = e.Offset;
25                 if (unit.tempArray.Count != 0)
26                 {
27                     foreach (byte item in data)
28                     {
29                         if (item == 0) break;
30                         unit.tempArray.Add(item);
31                     }
32                     data = unit.tempArray.ToArray(typeof(byte)) as byte[];
33                     rec = data.Length;
34                     offset = 0;
35                     unit.tempArray.Clear();
36                 }
37 
38                 string dataStr = Encoding.ASCII.GetString(data,offset,len);
39                 if (OnReceive != null)
40                     OnReceive(unit.Uid, dataStr);
41 
42                 if (!unit.State) return;
43                 buffer.FreeBuffer(e);
44                 buffer.SetBuffer(e);
45                 client.ReceiveAsync(e);
46             }
47             //這里還多個了一個關閉當前連接
48             else
49             {
50                 CloseSocket(unit);
51             }
52         }
53 
54         /// <summary>
55         /// 關閉一個連接單元
56         /// </summary>
57         private void CloseSocket( ConnectionUnit unit )
58         {
59             //關閉並釋放客戶端socket的字眼
60             if (unit.client != null)
61             {
62                 unit.client.Shutdown(SocketShutdown.Both);
63                 unit.client.Dispose();
64                 unit.client = null;
65             }
66             //Console.WriteLine(unit.Uid+" disconnect ");
67             //把連接放回連接池
68             pool.Push(unit);
69             //釋放並發信號
70             semaphoreAccept.Release();
71             //減少當前連接數
72             Interlocked.Decrement(ref currentConnect);
73         }

 

  發送消息:外放方法,在需要的時候自行調用方法發送。

 1         /// <summary>
 2         /// 發送消息
 3         /// </summary>
 4         /// <param name="uid"></param>
 5         /// <param name="message"></param>
 6         public void SendMessage(string uid, string message)
 7         {
 8             sendLock.Reset();
 9             ConnectionUnit unit=pool.GetConnectionUnitByUID(uid);
10             //如果獲取不了連接單元就不發送了,
11             if (unit == null)
12             { 
13                 if(OnSend!=null) OnSend(uid,"100");
14                 sendLock.Set();
15                 return;
16             }
17             byte[] datas = Encoding.ASCII.GetBytes(message);
18             unit.SendArg.SetBuffer(datas, 0, datas.Length);
19             unit.client.SendAsync(unit.SendArg);
20             //阻塞當前線程,等到發送完成才釋放
21             sendLock.WaitOne();
22         }
23 
24         void SendArg_Completed(object sender, SocketAsyncEventArgs e)
25         {
26             Socket client = sender as Socket;
27             ConnectionUnit unit = e.UserToken as ConnectionUnit;
28             //這里的消息碼有三個,2字頭的是成功的,1字頭是不成功的
29             //101是未知錯誤,100是客戶端不在線
30             if (e.SocketError == SocketError.Success)
31                 if (OnSend != null) OnSend(unit.Uid, "200");
32             else if (OnSend != null) OnSend(unit.Uid, "101");
33             //釋放信號,以便下次發送消息執行
34             sendLock.Set();
35         }

 

  下面則是類里面的一些字段信息和構造函數

 1         /// <summary>
 2         /// 初始化池的互斥體
 3         /// </summary>
 4         private Mutex mutex = new Mutex();
 5 
 6         /// <summary>
 7         /// Accept限制信號
 8         /// </summary>
 9         private Semaphore semaphoreAccept;
10 
11         /// <summary>
12         /// Accept信號
13         /// </summary>
14         private static ManualResetEvent acceptLock = new ManualResetEvent(false);
15 
16         /// <summary>
17         /// Send信號
18         /// </summary>
19         private static ManualResetEvent sendLock = new ManualResetEvent(false);
20 
21         /// <summary>
22         /// 最大並發數(連接數)
23         /// </summary>
24         private int maxConnect;
25 
26         /// <summary>
27         /// 當前連接數(並發數)
28         /// </summary>
29         private int currentConnect;
30 
31         /// <summary>
32         /// 緩沖池
33         /// </summary>
34         private BufferManager buffer;
35 
36         /// <summary>
37         /// SocketasyncEventArgs池
38         /// </summary>
39         private SocketAsyncEventArgsPool pool;
40 
41         /// <summary>
42         /// 服務端Socket
43         /// </summary>
44         private Socket server;
45 
46         /// <summary>
47         /// 完成接受的委托
48         /// </summary>
49         public delegate void AcceptHandler(string uid);
50 
51         /// <summary>
52         /// 完成發送的委托
53         /// </summary>
54         public delegate void SendHandler(string uid, string result);
55 
56         /// <summary>
57         /// 完成接收的委托
58         /// </summary>
59         public delegate void RecevieHandler(string uid, string data);
60 
61         /// <summary>
62         /// 完成接受事件
63         /// </summary>
64         public event AcceptHandler OnAccept;
65 
66         /// <summary>
67         /// 完成發送事件
68         /// </summary>
69         public event SendHandler OnSend;
70 
71         /// <summary>
72         /// 完成接收事件
73         /// </summary>
74         public event RecevieHandler OnReceive;
75 
76         /// <summary>
77         /// 構造函數
78         /// </summary>
79         /// <param name="buffersize">單元緩沖區大小</param>
80         /// <param name="maxCount">並發總數</param>
81         public SocketPoolController(int buffersize, int maxCount)
82         {
83             buffer = new BufferManager(buffersize * maxCount,buffersize);
84             this.currentConnect = 0;
85             this.maxConnect = maxCount;
86             this.currentConnect = 0;
87             this.pool = new SocketAsyncEventArgsPool();
88             //設置並發數信號,經試驗過是並發數-1才對
89             this.semaphoreAccept = new Semaphore(maxCount-1, maxCount-1);
90             InitPool();
91         }

 

構造函數里用到的方法

 1         /// <summary>
 2         /// 初始化SocketAsyncEventArgs池
 3         /// 這里主要是給空閑棧填充足夠的實例
 4         /// </summary>
 5         private void InitPool()
 6         {
 7             ConnectionUnit unit = null;
 8             for (int i = 0; i < maxConnect; i++)
 9             {
10                 unit = new ConnectionUnit();
11                 unit.Uid = "-1";
12                 unit.RecArg = new SocketAsyncEventArgs();
13                 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs>(RecArg_Completed);
14                 unit.SendArg = new SocketAsyncEventArgs();
15                 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs>(SendArg_Completed);
16                 this.pool.Push(unit);
17             }
18         }

 

  其他外放專門控制池的方法

 1         /// <summary>
 2         /// 啟動池
 3         /// </summary>
 4         /// <param name="ipAddress">服務端的IP</param>
 5         /// <param name="port">端口</param>
 6         public void RunPool(string ipAddress, int port)
 7         {
 8             IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
 9             server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
10             server.Bind(endpoint);
11             server.Listen(100);
12 
13             //調用方法異步Accept客戶端的連接
14             MyAsyncAccept();
15             //設置信號,防止再池在已經啟動的情況下再次啟動
16             mutex.WaitOne();
17         }
18 
19         /// <summary>
20         /// 停止池
21         /// </summary>
22         public void StopPool()
23         {
24             //把服務端的socket關了
25             if (server != null)
26                 server.Close();
27             //釋放互斥信號,等待下次啟動
28             mutex.ReleaseMutex();
29             //釋放資源
30             Dispose();
31         }

 

  要把這個操作類與現實的事物類比的話,這個與我們平常跟某個或一堆人聊天時差不多,別人說的東西,不用你自己控制你都會聽得到(排除帶上耳機,塞住耳朵這種極端的情況),所以接收消息那個方法就不需要了,而你要說什么這個要靠個人控制,而事件那些也好類比,OnAccept就相當於某人加入到這次聊天中你會做些什么(say hello 或者無視),OnRecevie就聽到別人說什么自己有什么反應,OnSend就自己說完話之后有什么反應(有時候發現說錯了會糾正,有時覺得自己說得好笑的也笑一場諸如此類)。

  在使用的時候可以這樣子

 1             SocketPoolController pool;
 2             pool = new SocketPoolController(32 * 1024, 1000);
 3             pool.OnReceive += new SocketPoolController.RecevieHandler(pool_OnReceive);
 4             pool.OnSend += new SocketPoolController.SendHandler(pool_OnSend);
 5             pool.OnAccept += new SocketPoolController.AcceptHandler(pool_OnAccept);
 6             pool.RunPool("127.0.0.1", 8081);
 7             Console.WriteLine("Pool has run\r\npress any key to stop...");
 8             Console.ReadKey();
 9             pool.StopPool();
10             Console.WriteLine("Pool has stop\r\npress any key to exit...");
11             Console.ReadLine();

 

  在池里面有一部分地方看似跟用同步的差不多,像接受客戶端的連接,發送消息這些地方。可是用這種異步,萬一客戶端突然斷開連接也不會有同步那樣馬上拋異常。還有一點的是在這份代碼里面缺少了對異常的捕捉,有一部分錯誤我在測試的過程中設了判斷避開了。以前跟某只貓提過Socket編程會與多線程一起使用,我也覺得是這樣,在之前一篇博文 《Socket一對多通信》里我也用到線程,后來的異步通信也是有線程的,不過不是.net framework自行創建的。看了田大哥的博文給我的另一個收獲是信號量的使用,在以前不懂得使用信號量,只會設置一大堆標識狀態的布爾值或整形的變量來計數判斷。田大哥的博文介紹的時高性能的Socket,而我的這個應該性能不會高到哪里去。上面有什么說錯的請各位指出,有什么說漏的,請各位提點,多多指導。謝謝!

整份源碼
  1     /// <summary>
  2     /// 連接單元
  3     /// </summary>
  4     class ConnectionUnit:IDisposable
  5     {
  6         private string _uid;//單元的編號,默認為-1
  7         private bool _state;//單元的狀態,true表示使用,false表示空閑
  8         private SocketAsyncEventArgs _sendArg;//專用於發送
  9         private SocketAsyncEventArgs _recArg;//專用於接收
 10         internal Socket client { get; set; }//客戶端的socket實例
 11         internal ArrayList tempArray { get; set; }//暫存已接收的數據,避免粘包用的
 12 
 13         public string Uid
 14         {
 15             get { return _uid; }
 16             set { _uid = value; }
 17         }
 18 
 19         public ConnectionUnit(string UID)
 20         {
 21             _uid = UID;
 22             tempArray = new ArrayList();
 23         }
 24 
 25         public ConnectionUnit() : this("-1") { }
 26 
 27         public bool State
 28         {
 29             get { return _state; }
 30             set { _state = value; }
 31         }
 32 
 33         public SocketAsyncEventArgs SendArg
 34         {
 35             get { return _sendArg; }
 36             set { _sendArg = value; }
 37         }
 38 
 39         public SocketAsyncEventArgs RecArg
 40         {
 41             get { return _recArg; }
 42             set { _recArg = value; }
 43         }
 44 
 45         public void Dispose()
 46         {
 47             if (_sendArg != null)
 48                 _sendArg.Dispose();
 49             if (_recArg != null)
 50                 _recArg.Dispose();
 51 
 52             _sendArg = null;
 53             _recArg = null;
 54         }
 55     }
 56 
 57     class BufferManager:IDisposable
 58     {
 59         private byte[] buffers;
 60         private int bufferSize;
 61         private int allSize;
 62         private int currentIndex;
 63         private Stack<int> freeIndexs;
 64 
 65         /// <summary>
 66         /// 構造緩存池
 67         /// </summary>
 68         /// <param name="buffersSize">池總大小</param>
 69         /// <param name="defaultSize">默認單元大小</param>
 70         internal BufferManager(int buffersSize, int defaultSize) 
 71         {
 72             this.bufferSize=defaultSize;
 73             this.allSize=buffersSize;
 74             currentIndex=0;
 75             this.buffers = new byte[allSize];
 76             freeIndexs = new Stack<int>();
 77         }
 78 
 79         /// <summary>
 80         /// 
 81         /// </summary>
 82         /// <param name="e"></param>
 83         /// <param name="offSet"></param>
 84         /// <returns></returns>
 85         internal bool SetBuffer(SocketAsyncEventArgs e)
 86         {
 87             if (freeIndexs.Count > 0)
 88             {
 89                 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize);
 90             }
 91             else
 92             {
 93                 if ((allSize - currentIndex) < bufferSize) return false;
 94                 e.SetBuffer(buffers, currentIndex, bufferSize);
 95                 currentIndex += bufferSize;
 96             }
 97             return true;
 98         }
 99 
100         /// <summary>
101         /// 
102         /// </summary>
103         /// <param name="e"></param>
104         internal void FreeBuffer(SocketAsyncEventArgs e)
105         {
106             freeIndexs.Push(e.Offset);
107             for (int i = e.Offset; i < e.Offset + bufferSize; i++)
108             {
109                 if (buffers[i] == 0) break;
110                 buffers[i] = 0;
111             }
112             e.SetBuffer(null, 0, 0);
113         }
114 
115         public void Dispose()
116         {
117             buffers = null;
118             freeIndexs = null;
119         }
120     }
121 
122     class SocketAsyncEventArgsPool:IDisposable
123     {
124         private Dictionary<string, ConnectionUnit> busyCollection;
125         private Stack<ConnectionUnit> freeCollecton;
126 
127         internal SocketAsyncEventArgsPool()
128         {
129             busyCollection = new Dictionary<string, ConnectionUnit>();
130             freeCollecton = new Stack<ConnectionUnit>();
131         }
132 
133         /// <summary>
134         /// 取出
135         /// </summary>
136         internal ConnectionUnit Pop(string uid)
137         {
138             ConnectionUnit unit = freeCollecton.Pop();
139             unit.State = true;
140             unit.Uid = uid;
141             busyCollection.Add(uid, unit);
142             return unit;
143         }
144 
145         /// <summary>
146         /// 放回
147         /// </summary>
148         internal void Push(ConnectionUnit unit)
149         {
150             if (!string.IsNullOrEmpty(unit.Uid) && unit.Uid != "-1")
151                 busyCollection.Remove(unit.Uid);
152             unit.Uid = "-1";
153             unit.State = false;
154             freeCollecton.Push(unit);
155         }
156 
157         /// <summary>
158         /// 獲取
159         /// </summary>
160         internal ConnectionUnit GetConnectionUnitByUID(string uid)
161         {
162             if (busyCollection.ContainsKey(uid))
163                 return busyCollection[uid];
164             return null;
165         }
166 
167         /// <summary>
168         /// 
169         /// </summary>
170         internal string[] GetOnLineList()
171         {
172             return busyCollection.Keys.ToArray();
173         }
174 
175         public void Dispose()
176         {
177             foreach (KeyValuePair<string,ConnectionUnit> item in busyCollection)
178                 item.Value.Dispose();
179 
180             busyCollection.Clear();
181 
182             while (freeCollecton.Count > 0)
183                 freeCollecton.Pop().Dispose();
184         }
185     }
186 
187     public class SocketPoolController:IDisposable
188     {
189         /// <summary>
190         /// 初始化池的互斥體
191         /// </summary>
192         private Mutex mutex = new Mutex();
193 
194         /// <summary>
195         /// Accept限制信號
196         /// </summary>
197         private Semaphore semaphoreAccept;
198 
199         /// <summary>
200         /// Accept信號
201         /// </summary>
202         private static ManualResetEvent acceptLock = new ManualResetEvent(false);
203 
204         /// <summary>
205         /// Send信號
206         /// </summary>
207         private static ManualResetEvent sendLock = new ManualResetEvent(false);
208 
209         /// <summary>
210         /// 最大並發數(連接數)
211         /// </summary>
212         private int maxConnect;
213 
214         /// <summary>
215         /// 當前連接數(並發數)
216         /// </summary>
217         private int currentConnect;
218 
219         /// <summary>
220         /// 緩沖池
221         /// </summary>
222         private BufferManager buffer;
223 
224         /// <summary>
225         /// SocketasyncEventArgs池
226         /// </summary>
227         private SocketAsyncEventArgsPool pool;
228 
229         /// <summary>
230         /// 服務端Socket
231         /// </summary>
232         private Socket server;
233 
234         /// <summary>
235         /// 完成接受的委托
236         /// </summary>
237         public delegate void AcceptHandler(string uid);
238 
239         /// <summary>
240         /// 完成發送的委托
241         /// </summary>
242         public delegate void SendHandler(string uid, string result);
243 
244         /// <summary>
245         /// 完成接收的委托
246         /// </summary>
247         public delegate void RecevieHandler(string uid, string data);
248 
249         /// <summary>
250         /// 完成接受事件
251         /// </summary>
252         public event AcceptHandler OnAccept;
253 
254         /// <summary>
255         /// 完成發送事件
256         /// </summary>
257         public event SendHandler OnSend;
258 
259         /// <summary>
260         /// 完成接收事件
261         /// </summary>
262         public event RecevieHandler OnReceive;
263 
264         /// <summary>
265         /// 構造函數
266         /// </summary>
267         /// <param name="buffersize">單元緩沖區大小</param>
268         /// <param name="maxCount">並發總數</param>
269         public SocketPoolController(int buffersize, int maxCount)
270         {
271             buffer = new BufferManager(buffersize * maxCount,buffersize);
272             this.currentConnect = 0;
273             this.maxConnect = maxCount;
274             this.currentConnect = 0;
275             this.pool = new SocketAsyncEventArgsPool();
276             //設置並發數信號,經試驗過是並發數-1才對
277             this.semaphoreAccept = new Semaphore(maxCount-1, maxCount-1);
278             InitPool();
279         }
280 
281         /// <summary>
282         /// 初始化SocketAsyncEventArgs池
283         /// 這里主要是給空閑棧填充足夠的實例
284         /// </summary>
285         private void InitPool()
286         {
287             ConnectionUnit unit = null;
288             for (int i = 0; i < maxConnect; i++)
289             {
290                 unit = new ConnectionUnit();
291                 unit.Uid = "-1";
292                 unit.RecArg = new SocketAsyncEventArgs();
293                 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs>(RecArg_Completed);
294                 unit.SendArg = new SocketAsyncEventArgs();
295                 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs>(SendArg_Completed);
296                 this.pool.Push(unit);
297             }
298         }
299 
300         /// <summary>
301         /// 啟動池
302         /// </summary>
303         /// <param name="ipAddress">服務端的IP</param>
304         /// <param name="port">端口</param>
305         public void RunPool(string ipAddress, int port)
306         {
307             IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
308             server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
309             server.Bind(endpoint);
310             server.Listen(100);
311 
312             //調用方法異步Accept客戶端的連接
313             MyAsyncAccept();
314             //設置信號,防止再池在已經啟動的情況下再次啟動
315             mutex.WaitOne();
316         }
317 
318         /// <summary>
319         /// 異步Accept客戶端的連接
320         /// </summary>
321         void MyAsyncAccept()
322         {
323             //這里使用Action的方式異步循環接受客戶端的連接
324             //模仿同事的做法沒開線程,不知這種方式是好是壞
325             Action callback = new Action(delegate()
326             {
327                 while (true)
328                 {
329                     //每次接受都要新開一個SocketAsyncEventArgs,否則會報錯
330                     //其實我也想重復利用的
331                     SocketAsyncEventArgs e = new SocketAsyncEventArgs();
332                     e.Completed += new EventHandler<SocketAsyncEventArgs>(Accept_Completed);
333                     
334                     acceptLock.Reset();
335                     server.AcceptAsync(e);
336                     //在異步接受完成之前阻塞當前線程
337                     acceptLock.WaitOne();
338                 }
339             });
340             callback.BeginInvoke(null, null);
341         }
342 
343 
344         /// <summary>
345         /// 停止池
346         /// </summary>
347         public void StopPool()
348         {
349             //把服務端的socket關了
350             if (server != null)
351                 server.Close();
352             //釋放互斥信號,等待下次啟動
353             mutex.ReleaseMutex();
354             //釋放資源
355             Dispose();
356         }
357 
358         /// <summary>
359         /// 發送消息
360         /// </summary>
361         /// <param name="uid"></param>
362         /// <param name="message"></param>
363         public void SendMessage(string uid, string message)
364         {
365             sendLock.Reset();
366             ConnectionUnit unit=pool.GetConnectionUnitByUID(uid);
367             //如果獲取不了連接單元就不發送了,
368             if (unit == null)
369             { 
370                 if(OnSend!=null) OnSend(uid,"100");
371                 sendLock.Set();
372                 return;
373             }
374             byte[] datas = Encoding.ASCII.GetBytes(message);
375             unit.SendArg.SetBuffer(datas, 0, datas.Length);
376             unit.client.SendAsync(unit.SendArg);
377             //阻塞當前線程,等到發送完成才釋放
378             sendLock.WaitOne();
379         }
380 
381         void SendArg_Completed(object sender, SocketAsyncEventArgs e)
382         {
383             Socket client = sender as Socket;
384             ConnectionUnit unit = e.UserToken as ConnectionUnit;
385             //這里的消息碼有三個,2字頭的是成功的,1字頭是不成功的
386             //101是未知錯誤,100是客戶端不在線
387             if (e.SocketError == SocketError.Success)
388                 if (OnSend != null) OnSend(unit.Uid, "200");
389             else if (OnSend != null) OnSend(unit.Uid, "101");
390             //釋放信號,以便下次發送消息執行
391             sendLock.Set();
392         }
393 
394         void RecArg_Completed(object sender, SocketAsyncEventArgs e)
395         {
396             Socket client = sender as Socket;
397             ConnectionUnit unit = e.UserToken as ConnectionUnit;
398             //這里大致與上一篇異步通信的一樣,只是對緩沖區的處理有一點差異
399             if (e.SocketError == SocketError.Success)
400             {
401                 int rec = e.BytesTransferred;
402                 if (rec == 0)
403                 {
404                     CloseSocket(unit);
405                     return;
406                 }
407                 if (client.Available > 0)
408                 {
409                     unit.tempArray.AddRange(e.Buffer);
410                     buffer.FreeBuffer(unit.RecArg);
411                     buffer.SetBuffer(unit.RecArg);
412                     client.SendAsync(unit.RecArg);
413                     return;
414                 }
415                 byte[] data = e.Buffer;
416                 int len = rec;
417                 if (unit.tempArray.Count != 0)
418                 {
419                     foreach (byte item in data)
420                     {
421                         if (item == 0) break;
422                         unit.tempArray.Add(item);
423                     }
424                     data = unit.tempArray.ToArray(typeof(byte)) as byte[];
425                     rec = data.Length;
426                     unit.tempArray.Clear();
427                 }
428 
429                 string dataStr = Encoding.ASCII.GetString(data, 0, len);
430                 if (OnReceive != null)
431                     OnReceive(unit.Uid, dataStr);
432 
433                 if (!unit.State) return;
434                 buffer.FreeBuffer(e);
435                 buffer.SetBuffer(e);
436                 client.ReceiveAsync(e);
437             }
438             //這里還多個了一個關閉當前連接
439             else
440             {
441                 CloseSocket(unit);
442             }
443         }
444 
445         void Accept_Completed(object sender, SocketAsyncEventArgs e)
446         {
447             Socket client = e.AcceptSocket;
448             try
449             {
450                 if (client.Connected)
451                 {
452                     IPEndPoint point = client.RemoteEndPoint as IPEndPoint;
453                     string uid = point.Address + ":" + point.Port;
454                     ConnectionUnit unit = pool.Pop(uid);
455                     unit.client = client;
456                     unit.State = true;
457                     unit.Uid = uid;
458                     unit.RecArg.UserToken = unit;
459                     unit.SendArg.UserToken = unit;
460                     buffer.SetBuffer(unit.RecArg);
461 
462                     //在接受成功之后就開始接收數據了
463                     client.ReceiveAsync(unit.RecArg);
464                     //設置並發限制信號和增加當前連接數
465                     semaphoreAccept.WaitOne();
466                     Interlocked.Increment(ref currentConnect);
467 
468                     if (OnAccept != null) OnAccept(uid);
469                 }
470                 else if (client != null)
471                 {
472                     client.Close();
473                     client.Dispose();
474                     client = null;
475                 }
476             }
477             catch (Exception ex) { Console.WriteLine(ex.ToString()); }
478             //設置Accept信號,以便下次Accept的執行
479             acceptLock.Set();
480             e.Dispose();
481         }
482 
483         /// <summary>
484         /// 關閉一個連接單元
485         /// </summary>
486         private void CloseSocket( ConnectionUnit unit )
487         {
488             //關閉並釋放客戶端socket的字眼
489             if (unit.client != null)
490             {
491                 unit.client.Shutdown(SocketShutdown.Both);
492                 unit.client.Dispose();
493                 unit.client = null;
494             }
495             //Console.WriteLine(unit.Uid+" disconnect ");
496             //把連接放回連接池
497             pool.Push(unit);
498             //釋放並發信號
499             semaphoreAccept.Release();
500             //減少當前連接數
501             Interlocked.Decrement(ref currentConnect);
502         }
503         
504         public void Dispose()
505         {
506             if (pool != null)
507             {
508                 pool.Dispose();
509                 pool = null;
510             }
511             if (buffer != null)
512             {
513                 buffer.Dispose();
514                 buffer = null;
515             }
516             if (server != null)
517             {
518                 server.Dispose();
519                 server = null;
520             }
521 
522         }
523     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM