背景
最近對接一個TCP協議,這個協議定義的非常好,有頭標識和校驗位!但是,接口提供方定的通信協議還是欠缺考慮...正常情況下,這個協議是沒有問題的,但是在高並發的情況下,客戶端方就需要點真功夫了。
分析
該通信協議中,沒有使用事務號,也就是說,用同一條連接連續發送兩次請求,你不知道返回的響應數據是哪個請求的。你可能會說,第一個響應是第一個請求的,第二個響應是第二個請求的!這是絕對的理想情況,服務器處理所有請求的耗時一樣,網絡沒有抖動。如果耗時和網絡抖動都無法確定的情況下,響應順序與請求順序就有可能不一致!結論就是,如果只有一個連接,那么所有請求只能排隊,一個請求處理完才能發送下一個。那如果高並發請求比較多怎么辦?
如何用沒有事務號的通信協議實現高並發?Http?bingo!Http的請求方式就是解決這個問題的榜樣!Http就是在一個連接中一次只發送一個請求,請求沒有收到響應或者超時斷開的情況下,是不會發送第二個請求的。那么Http是怎么並發的?通過同時發送多個Http請求來實現並發!以Http為榜樣,那么就可以通過一個TCP連接一次發送一個請求,這個請求沒有結束之前不再發送新的請求,如此來保證請求與響應的匹配,通過多個連接同時發送多個請求來實現高並發。
解決思路有了,還得考慮下性能。肯定不能每次發送請求就新建一個連接,請求結束就斷開連接,這樣很不TCP!那就考慮復用TCP連接,請求未結束之前,這個連接不可用,請求結束后不斷開連接,可供新請求使用。並發量肯定有低峰和高峰,低峰的時候,不需要保留太多的連接;高峰的時候,如果不加以控制,TCP連接數量會飆升,也需要加以控制!
實現
基於以上分析,需要寫一個TCP連接池,該連接池可配置最少、最多連接數量,以及連接可空閑時間。當需要發送請求時,如果沒有可用連接,並且池內連接數量不超過最大數量,就創建新的連接;當池內數量達到上限,並且需要發送請求時,需要阻塞,直至有可用的連接;當連接空閑時間達到設置的可空閑時間,並且池內連接數量大於最小值時,清理掉多余的連接。
以下是連接池類,ConnectionInfo類和接口IConnection沒有發,但是不影響看連接池代碼。歡迎拍磚!
1 /// <summary> 2 /// 連接池 3 /// 對於不能單連並發的,用此連接池實現多連接並發,可控制最少最大連接數量 4 /// 連接保活 5 /// </summary> 6 public class ConnectionPool : IDisposable 7 { 8 private ConcurrentQueue<ConnectionInfo> m_IdleConnections; 9 private ConcurrentDictionary<long, ConnectionInfo> m_Connections; 10 private object m_ConnectionLock = new object(); 11 12 private int m_Max; 13 private int m_Min; 14 private TimeSpan m_Expired; 15 private Func<IConnection> m_ConnectionFactory; 16 17 private bool m_IsConnected = false; 18 /// <summary> 19 /// 是否連接 20 /// </summary> 21 public bool IsConnected 22 { 23 get 24 { 25 return m_IsConnected; 26 } 27 set 28 { 29 if (m_IsConnected != value) 30 { 31 m_IsConnected = value; 32 if (m_IsConnected) 33 { 34 Connected?.Invoke(this, EventArgs.Empty); 35 } 36 else 37 { 38 Disconnected?.Invoke(this, EventArgs.Empty); 39 } 40 } 41 } 42 } 43 44 /// <summary> 45 /// 構造 46 /// </summary> 47 /// <param name="minConnection">最小連接數</param> 48 /// <param name="maxConnection">最大連接數</param> 49 /// <param name="expired">過期時間 空閑時間超過此值則會被清理掉</param> 50 /// <param name="connectionFactory">創建連接的回調</param> 51 public ConnectionPool(int minConnection, int maxConnection, TimeSpan expired, Func<IConnection> connectionFactory) 52 { 53 m_Min = minConnection; 54 m_Max = maxConnection; 55 m_Expired = expired; 56 m_ConnectionFactory = connectionFactory; 57 m_Connections = new ConcurrentDictionary<long, ConnectionInfo>(); 58 m_IdleConnections = new ConcurrentQueue<ConnectionInfo>(); 59 StartClear(); 60 } 61 62 private long m_No = 0; 63 private object m_NoLock = new object(); 64 65 /// <summary> 66 /// 生成新的編號 67 /// </summary> 68 /// <returns></returns> 69 private long NewNo() 70 { 71 long no; 72 lock (m_NoLock) 73 { 74 no = ++m_No; 75 } 76 return no; 77 } 78 79 /// <summary> 80 /// 抓取連接 81 /// </summary> 82 /// <returns></returns> 83 private ConnectionInfo GrabConnection() 84 { 85 //判斷空閑隊列是否有 86 ConnectionInfo connectionInfo = null; 87 //開始抓取連接 88 Begin: 89 while (!m_IsDisposed && !m_IdleConnections.IsEmpty) 90 { 91 if (m_IdleConnections.TryDequeue(out connectionInfo)) 92 { 93 if (m_Connections.ContainsKey(connectionInfo.No)) 94 {//取到的連接沒有被銷毀 95 break; 96 } 97 else 98 {//不可用則銷毀此連接,繼續尋找 99 DestoryConnection(connectionInfo); 100 connectionInfo = null; 101 } 102 } 103 else 104 { 105 connectionInfo = null; 106 } 107 } 108 if (!m_IsDisposed && connectionInfo == null) 109 {//沒有取到連接 110 if (!CreateOrAddConnection(null)) 111 {//創建連接失敗,睡眠10ms 112 Thread.Sleep(10); 113 } 114 //繼續抓取連接 115 goto Begin; 116 } 117 return connectionInfo; 118 } 119 120 /// <summary> 121 /// 創建或者添加連接 122 /// </summary> 123 public bool CreateOrAddConnection(IConnection connection) 124 { 125 bool rst = false; 126 if (m_Connections.Count < m_Max) 127 { 128 lock (m_ConnectionLock) 129 { 130 if (m_Connections.Count < m_Max) 131 { 132 if (connection == null) 133 { 134 connection = m_ConnectionFactory.Invoke(); 135 } 136 var conInfo = new ConnectionInfo() 137 { 138 No = NewNo(), 139 Connection = connection, 140 CreateTime = DateTime.Now 141 }; 142 if (m_Connections.TryAdd(conInfo.No, conInfo)) 143 { 144 connection.Connect(); 145 IsConnected = connection.IsConnected; 146 m_IdleConnections.Enqueue(conInfo); 147 rst = true; 148 OutputDebugInfo(string.Format("創建{0}", conInfo.No)); 149 } 150 } 151 } 152 } 153 return rst; 154 } 155 156 /// <summary> 157 /// 銷毀連接 158 /// </summary> 159 /// <param name="connectionInfo"></param> 160 private void DestoryConnection(ConnectionInfo connectionInfo) 161 { 162 ConnectionInfo temp; 163 while (m_Connections.ContainsKey(connectionInfo.No)) 164 { 165 if (m_Connections.TryRemove(connectionInfo.No, out temp)) 166 { 167 break; 168 } 169 else 170 { 171 Thread.Sleep(10); 172 } 173 } 174 try 175 { 176 connectionInfo.Connection.Disconnect(); 177 } 178 catch (Exception ex) 179 { 180 OutputDebugInfo(string.Format("斷開連接失敗:{0}", ex.ToString())); 181 } 182 OutputDebugInfo(string.Format("銷毀{0}", connectionInfo.No)); 183 } 184 185 /// <summary> 186 /// 發送 187 /// </summary> 188 /// <typeparam name="T1"></typeparam> 189 /// <typeparam name="T2"></typeparam> 190 /// <param name="rstData"></param> 191 /// <param name="callback"></param> 192 public void Send(byte[] rstData, Action<string, byte[]> callback) 193 { 194 var connInfo = GrabConnection(); 195 if (connInfo == null) 196 { 197 OutputDebugInfo("未獲取到可用連接"); 198 callback?.Invoke("無可用連接", null); 199 return; 200 } 201 OutputDebugInfo(string.Format("獲取到連接{0}", connInfo.No)); 202 if (connInfo.Connection.IsConnected) 203 { 204 connInfo.LastUsedTime = DateTime.Now; 205 try 206 { 207 connInfo.Connection.Send(rstData, 208 (error, rndData) => 209 { 210 callback?.BeginInvoke(error, rndData, null, null); 211 //重新加入空閑隊列 212 if (string.IsNullOrEmpty(error) && m_Connections.ContainsKey(connInfo.No)) 213 { 214 m_IdleConnections.Enqueue(connInfo); 215 } 216 }); 217 } 218 catch (Exception ex) 219 { 220 OutputDebugInfo(string.Format("發送失敗:{0}", ex.ToString())); 221 } 222 } 223 else 224 { 225 callback?.BeginInvoke("斷開連接", null, null, null); 226 DestoryConnection(connInfo); 227 } 228 } 229 230 /// <summary> 231 /// 是否已清理 232 /// </summary> 233 private bool m_IsDisposed = false; 234 /// <summary> 235 /// 清理 236 /// </summary> 237 public void Dispose() 238 { 239 if (m_IsDisposed) 240 { 241 return; 242 } 243 m_IsDisposed = true; 244 245 Clear(true); 246 OutputDebugInfo("釋放完成"); 247 } 248 249 private object m_ClearLock = new object(); 250 251 /// <summary> 252 /// 開始清理 253 /// </summary> 254 private void StartClear() 255 { 256 ThreadPool.QueueUserWorkItem( 257 (obj) => 258 { 259 Clear(false); 260 Thread.Sleep(1000); 261 if (!m_IsDisposed) 262 { 263 StartClear(); 264 } 265 }); 266 } 267 268 /// <summary> 269 /// 清理 270 /// </summary> 271 private void Clear(bool isForece) 272 { 273 lock (m_ClearLock) 274 { 275 var nos = m_Connections.Keys.ToList(); 276 ConnectionInfo connInfo = null; 277 foreach (var no in nos) 278 { 279 if (m_Connections.TryGetValue(no, out connInfo)) 280 { 281 try 282 { 283 connInfo.Connection.Send(new byte[0], null); 284 IsConnected = true; 285 } 286 catch 287 { 288 IsConnected = false; 289 } 290 if ((m_Connections.Count > m_Min && DateTime.Now - connInfo.LastUsedTime > m_Expired) || isForece) 291 { 292 DestoryConnection(connInfo); 293 } 294 } 295 } 296 } 297 } 298 299 /// <summary> 300 /// 301 /// </summary> 302 /// <param name="debugInfo"></param> 303 private void OutputDebugInfo(string debugInfo) 304 { 305 System.Diagnostics.Debug.WriteLine(string.Format("{0}-ConnectionPool-{1}", m_ConnectionFactory == null ? "Server" : "Client", debugInfo)); 306 } 307 308 public event EventHandler Connected; 309 public event EventHandler Disconnected; 310 }
