用C#實現TCP連接池


背景

   最近對接一個TCP協議,這個協議定義的非常好,有頭標識和校驗位!但是,接口提供方定的通信協議還是欠缺考慮...正常情況下,這個協議是沒有問題的,但是在高並發的情況下,客戶端方就需要點真功夫了。

分析

    該通信協議中,沒有使用事務號,也就是說,用同一條連接連續發送兩次請求,你不知道返回的響應數據是哪個請求的。你可能會說,第一個響應是第一個請求的,第二個響應是第二個請求的!這是絕對的理想情況,服務器處理所有請求的耗時一樣,網絡沒有抖動。如果耗時和網絡抖動都無法確定的情況下,響應順序與請求順序就有可能不一致!結論就是,如果只有一個連接,那么所有請求只能排隊,一個請求處理完才能發送下一個。那如果高並發請求比較多怎么辦?

    如何用沒有事務號的通信協議實現高並發?Http?bingo!Http的請求方式就是解決這個問題的榜樣!Http就是在一個連接中一次只發送一個請求,請求沒有收到響應或者超時斷開的情況下,是不會發送第二個請求的。那么Http是怎么並發的?通過同時發送多個Http請求來實現並發!以Http為榜樣,那么就可以通過一個TCP連接一次發送一個請求,這個請求沒有結束之前不再發送新的請求,如此來保證請求與響應的匹配,通過多個連接同時發送多個請求來實現高並發。

    解決思路有了,還得考慮下性能。肯定不能每次發送請求就新建一個連接,請求結束就斷開連接,這樣很不TCP!那就考慮復用TCP連接,請求未結束之前,這個連接不可用,請求結束后不斷開連接,可供新請求使用。並發量肯定有低峰和高峰,低峰的時候,不需要保留太多的連接;高峰的時候,如果不加以控制,TCP連接數量會飆升,也需要加以控制!

實現

    基於以上分析,需要寫一個TCP連接池,該連接池可配置最少、最多連接數量,以及連接可空閑時間。當需要發送請求時,如果沒有可用連接,並且池內連接數量不超過最大數量,就創建新的連接;當池內數量達到上限,並且需要發送請求時,需要阻塞,直至有可用的連接;當連接空閑時間達到設置的可空閑時間,並且池內連接數量大於最小值時,清理掉多余的連接。

    以下是連接池類,ConnectionInfo類和接口IConnection沒有發,但是不影響看連接池代碼。歡迎拍磚!

  1     /// <summary>
  2     /// 連接池
  3     /// 對於不能單連並發的,用此連接池實現多連接並發,可控制最少最大連接數量
  4     /// 連接保活
  5     /// </summary>
  6     public class ConnectionPool : IDisposable
  7     {
  8         private ConcurrentQueue<ConnectionInfo> m_IdleConnections;
  9         private ConcurrentDictionary<long, ConnectionInfo> m_Connections;
 10         private object m_ConnectionLock = new object();
 11 
 12         private int m_Max;
 13         private int m_Min;
 14         private TimeSpan m_Expired;
 15         private Func<IConnection> m_ConnectionFactory;
 16 
 17         private bool m_IsConnected = false;
 18         /// <summary>
 19         /// 是否連接
 20         /// </summary>
 21         public bool IsConnected
 22         {
 23             get
 24             {
 25                 return m_IsConnected;
 26             }
 27             set
 28             {
 29                 if (m_IsConnected != value)
 30                 {
 31                     m_IsConnected = value;
 32                     if (m_IsConnected)
 33                     {
 34                         Connected?.Invoke(this, EventArgs.Empty);
 35                     }
 36                     else
 37                     {
 38                         Disconnected?.Invoke(this, EventArgs.Empty);
 39                     }
 40                 }
 41             }
 42         }
 43 
 44         /// <summary>
 45         /// 構造
 46         /// </summary>
 47         /// <param name="minConnection">最小連接數</param>
 48         /// <param name="maxConnection">最大連接數</param>
 49         /// <param name="expired">過期時間 空閑時間超過此值則會被清理掉</param>
 50         /// <param name="connectionFactory">創建連接的回調</param>
 51         public ConnectionPool(int minConnection, int maxConnection, TimeSpan expired, Func<IConnection> connectionFactory)
 52         {
 53             m_Min = minConnection;
 54             m_Max = maxConnection;
 55             m_Expired = expired;
 56             m_ConnectionFactory = connectionFactory;
 57             m_Connections = new ConcurrentDictionary<long, ConnectionInfo>();
 58             m_IdleConnections = new ConcurrentQueue<ConnectionInfo>();
 59             StartClear();
 60         }
 61 
 62         private long m_No = 0;
 63         private object m_NoLock = new object();
 64 
 65         /// <summary>
 66         /// 生成新的編號
 67         /// </summary>
 68         /// <returns></returns>
 69         private long NewNo()
 70         {
 71             long no;
 72             lock (m_NoLock)
 73             {
 74                 no = ++m_No;
 75             }
 76             return no;
 77         }
 78 
 79         /// <summary>
 80         /// 抓取連接
 81         /// </summary>
 82         /// <returns></returns>
 83         private ConnectionInfo GrabConnection()
 84         {
 85             //判斷空閑隊列是否有
 86             ConnectionInfo connectionInfo = null;
 87         //開始抓取連接
 88         Begin:
 89             while (!m_IsDisposed && !m_IdleConnections.IsEmpty)
 90             {
 91                 if (m_IdleConnections.TryDequeue(out connectionInfo))
 92                 {
 93                     if (m_Connections.ContainsKey(connectionInfo.No))
 94                     {//取到的連接沒有被銷毀
 95                         break;
 96                     }
 97                     else
 98                     {//不可用則銷毀此連接,繼續尋找
 99                         DestoryConnection(connectionInfo);
100                         connectionInfo = null;
101                     }
102                 }
103                 else
104                 {
105                     connectionInfo = null;
106                 }
107             }
108             if (!m_IsDisposed && connectionInfo == null)
109             {//沒有取到連接
110                 if (!CreateOrAddConnection(null))
111                 {//創建連接失敗,睡眠10ms
112                     Thread.Sleep(10);
113                 }
114                 //繼續抓取連接
115                 goto Begin;
116             }
117             return connectionInfo;
118         }
119 
120         /// <summary>
121         /// 創建或者添加連接
122         /// </summary>
123         public bool CreateOrAddConnection(IConnection connection)
124         {
125             bool rst = false;
126             if (m_Connections.Count < m_Max)
127             {
128                 lock (m_ConnectionLock)
129                 {
130                     if (m_Connections.Count < m_Max)
131                     {
132                         if (connection == null)
133                         {
134                             connection = m_ConnectionFactory.Invoke();
135                         }
136                         var conInfo = new ConnectionInfo()
137                         {
138                             No = NewNo(),
139                             Connection = connection,
140                             CreateTime = DateTime.Now
141                         };
142                         if (m_Connections.TryAdd(conInfo.No, conInfo))
143                         {
144                             connection.Connect();
145                             IsConnected = connection.IsConnected;
146                             m_IdleConnections.Enqueue(conInfo);
147                             rst = true;
148                             OutputDebugInfo(string.Format("創建{0}", conInfo.No));
149                         }
150                     }
151                 }
152             }
153             return rst;
154         }
155 
156         /// <summary>
157         /// 銷毀連接
158         /// </summary>
159         /// <param name="connectionInfo"></param>
160         private void DestoryConnection(ConnectionInfo connectionInfo)
161         {
162             ConnectionInfo temp;
163             while (m_Connections.ContainsKey(connectionInfo.No))
164             {
165                 if (m_Connections.TryRemove(connectionInfo.No, out temp))
166                 {
167                     break;
168                 }
169                 else
170                 {
171                     Thread.Sleep(10);
172                 }
173             }
174             try
175             {
176                 connectionInfo.Connection.Disconnect();
177             }
178             catch (Exception ex)
179             {
180                 OutputDebugInfo(string.Format("斷開連接失敗:{0}", ex.ToString()));
181             }
182             OutputDebugInfo(string.Format("銷毀{0}", connectionInfo.No));
183         }
184 
185         /// <summary>
186         /// 發送
187         /// </summary>
188         /// <typeparam name="T1"></typeparam>
189         /// <typeparam name="T2"></typeparam>
190         /// <param name="rstData"></param>
191         /// <param name="callback"></param>
192         public void Send(byte[] rstData, Action<string, byte[]> callback)
193         {
194             var connInfo = GrabConnection();
195             if (connInfo == null)
196             {
197                 OutputDebugInfo("未獲取到可用連接");
198                 callback?.Invoke("無可用連接", null);
199                 return;
200             }
201             OutputDebugInfo(string.Format("獲取到連接{0}", connInfo.No));
202             if (connInfo.Connection.IsConnected)
203             {
204                 connInfo.LastUsedTime = DateTime.Now;
205                 try
206                 {
207                     connInfo.Connection.Send(rstData,
208                     (error, rndData) =>
209                     {
210                         callback?.BeginInvoke(error, rndData, null, null);
211                         //重新加入空閑隊列
212                         if (string.IsNullOrEmpty(error) && m_Connections.ContainsKey(connInfo.No))
213                         {
214                             m_IdleConnections.Enqueue(connInfo);
215                         }
216                     });
217                 }
218                 catch (Exception ex)
219                 {
220                     OutputDebugInfo(string.Format("發送失敗:{0}", ex.ToString()));
221                 }
222             }
223             else
224             {
225                 callback?.BeginInvoke("斷開連接", null, null, null);
226                 DestoryConnection(connInfo);
227             }
228         }
229 
230         /// <summary>
231         /// 是否已清理
232         /// </summary>
233         private bool m_IsDisposed = false;
234         /// <summary>
235         /// 清理
236         /// </summary>
237         public void Dispose()
238         {
239             if (m_IsDisposed)
240             {
241                 return;
242             }
243             m_IsDisposed = true;
244 
245             Clear(true);
246             OutputDebugInfo("釋放完成");
247         }
248 
249         private object m_ClearLock = new object();
250 
251         /// <summary>
252         /// 開始清理
253         /// </summary>
254         private void StartClear()
255         {
256             ThreadPool.QueueUserWorkItem(
257                 (obj) =>
258                 {
259                     Clear(false);
260                     Thread.Sleep(1000);
261                     if (!m_IsDisposed)
262                     {
263                         StartClear();
264                     }
265                 });
266         }
267 
268         /// <summary>
269         /// 清理
270         /// </summary>
271         private void Clear(bool isForece)
272         {
273             lock (m_ClearLock)
274             {
275                 var nos = m_Connections.Keys.ToList();
276                 ConnectionInfo connInfo = null;
277                 foreach (var no in nos)
278                 {
279                     if (m_Connections.TryGetValue(no, out connInfo))
280                     {
281                         try
282                         {
283                             connInfo.Connection.Send(new byte[0], null);
284                             IsConnected = true;
285                         }
286                         catch
287                         {
288                             IsConnected = false;
289                         }
290                         if ((m_Connections.Count > m_Min && DateTime.Now - connInfo.LastUsedTime > m_Expired) || isForece)
291                         {
292                             DestoryConnection(connInfo);
293                         }
294                     }
295                 }
296             }
297         }
298 
299         /// <summary>
300         /// 
301         /// </summary>
302         /// <param name="debugInfo"></param>
303         private void OutputDebugInfo(string debugInfo)
304         {
305             System.Diagnostics.Debug.WriteLine(string.Format("{0}-ConnectionPool-{1}", m_ConnectionFactory == null ? "Server" : "Client", debugInfo));
306         }
307 
308         public event EventHandler Connected;
309         public event EventHandler Disconnected;
310     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM