在應用中連接池的使用非常普遍,如訪問數據庫,Redis等等網絡產品的Client
都集成了連接池機制;由於最近在編寫微服務網關因此涉及到連接池的編寫,在這里分享一下實現一個可靠連接池的心得。其實編寫一個連接池並不因難,基礎的Stack
結構就能滿足需要;但在設計的時候有些情況是需要考慮的,怎樣使連接池的效益最大化,特別是如何設計連接池的最大負載,當超過最大負載后應該怎么做這些問題都衡量一個連接池好壞的標准。接下來通過代碼的方式一步一步地實現它。
基礎實現
public class ConnectionPool<T> where T : IDisposable, new() { private ConcurrentStack<T> mPool = new ConcurrentStack<T>(); public T Pop() { if (!mPool.TryPop(out T item)) { item = new T(); } return item; } public void Push(T item) { mPool.Push(item); } }
以上是一個最簡單對象池,當然這個對象池是不能真的投入生產,只是大概了解基礎原理;因為它是無限量增長的對象池,不過用來做對象池免強還是可以的,用在連接池上那就不太可行了,畢竟大量的連接不僅增加自己的損耗還增加了對端的損耗。
增加最大限制
public class ConnectionPool<T> where T : IDisposable, new() { public ConnectionPool(int max = 100) { mMaxCount = max; } private ConcurrentStack<T> mPool = new ConcurrentStack<T>(); private int mMaxCount; private int mCount; public T Pop() { if (!mPool.TryPop(out T item)) { int count = System.Threading.Interlocked.Increment(ref mCount); if (mCount > mMaxCount) { System.Threading.Interlocked.Decrement(ref mCount); return default(T); } item = new T(); } return item; } public void Push(T item) { mPool.Push(item); } }
以上增加了最大數限制,但在使用上就要面對一個問題,當池負載滿了返回為空的時候程序又要怎樣處理呢?直接拋異常?自旋或sleep
指定次數后還是獲取為空再報異常?對於調用者來說最不想看到的肯是異常,就算延時能處理也相對是一件不錯的方法;但當在滿負載的情況大量的線程自旋或sleep
又只會讓系統變得更糟糕!所以以上兩種方式並不算是一個好的解決方法。
引入事件驅動
如果請求者要等待使用自旋或sleep
基本上是不可行,這種方法容易損耗CPU資源;接下來引入基於事件驅動
的方法模式,聽上去是不是很高大上,其實設計方式比較簡單就是在Push
引入一個事件通知機制,讓后面等待的請求進行處理;這樣就不用通過自旋或sleep
來完成這個功能。一說到事件驅動相信很多朋友感覺一下子變成了非常復雜,但.net core提供給我們一個好東西async/await
語法糖輕易解決這一問題。
public class ConnectionPool<T> where T : IDisposable, new() { public ConnectionPool(int max = 100) { mMaxCount = max; } private int mWaitQueueLength = 1000; private Stack<T> mPool = new Stack<T>(); private Queue<TaskCompletionSource<T>> mWaitQueue = new Queue<TaskCompletionSource<T>>(); private int mMaxCount; private int mCount; private object mLockPool = new object(); public Task<T> Pop() { lock (mLockPool) { TaskCompletionSource<T> result = new TaskCompletionSource<T>(); if (mPool.Count > 0) { result.SetResult(mPool.Pop()); } else { mCount++; if (mCount < mMaxCount) { result.SetResult(new T()); } else { if (mWaitQueue.Count >= mWaitQueueLength) { result.SetResult(default(T)); } else { mWaitQueue.Enqueue(result); } } } return result.Task; } } public void Push(T item) { lock (mLockPool) { if (mWaitQueue.Count > 0) { var waitItem = mWaitQueue.Dequeue(); Task.Run(() => waitItem.SetResult(item)); } else { mPool.Push(item); } } } }
在對象池中引入了一個隊列,當負載滿的時候請求會扔到隊列中,當對象回歸后會檢測隊列並觸發請求的狀態機代碼執行。注意:一定要通過線程隔離這個執行,畢竟這代碼還在一個鎖的代碼塊里,如果不用線程隔離有能可能會導致下一次其它調用進來時產生死鎖的情況。加入了事件驅動使用代碼如下:
var item = await Pool.Pop(); if(item==null) throw System busy else run item
縮減
池在負載的時候有增長,那在空閑的時候自然也應該有縮減的設計才算合理,同樣這個縮減也可以在Push中設計一下,代碼就留給大家了,簡單的方法是獲取當前Pool的Pop的並發量,如果並發量少於當前池中的對象數量,那Push的時候就不是回歸到池里,而是釋放掉了(不過這個方法並不算太好)。