首先,這里的緩沖池指的是 Cache,而不是 Buffer,就是指將代價較大的對象先存儲起來,以備以后需要的時候可以直接拿來用,能夠節約一些時間或空間。
當緩沖池中的對象過多時,就需要刪掉一些“不會再用”的對象來節約內存。但是沒人能夠知道某個對象什么時候會再用,因此這就涉及到緩存替換算法了,好的緩存替換算法可以有更大的概率刪掉“不會再用”的對象,能夠保留“很可能再用”的對象。
現在發明的緩存替換算法有很多,除了只有理論意義的 OPT(Optimal,最優替換,也就是上帝模式)和聽天由命的隨機替換,常用的有 LRU(Least recently used,最近最少使用),LFU(Least Frequently Used,最不經常使用)和 FIFO(First in first out,先進先出)以及很多改進算法。其中 LRU 算法經常被使用,我也打算使用 LRU 算法來實現一個緩沖池。
一、改進的 LRU 算法
LRU 算法認為最近被訪問的對象,以后也很可能被訪問。因此,在實現時,一般使用一個哈希表類存儲被緩存的對象,和一個雙向鏈表類存儲對象被使用的情況:如果一個對象被訪問了,就將它提升到鏈表的頭部,如圖1所示,這樣就能保證最近被訪問的對象總是更接近頭部。當添加新對象時,直接添加到鏈表的頭部;當需要淘汰舊對象時,就從鏈表的尾部(最久沒有被訪問)開始淘汰。
圖1 K 被訪問了,提升到鏈表的頭部。
LRU 算法的實現還是比較容易的,但是由於每個操作都會有鏈表的修改,因此在並發訪問時都每個操作需要加鎖(雙向鏈表沒有無鎖版的,只有單向鏈表有),不太利於並發訪問。
同時,LRU 只將對象“最近是否被訪問”作為度量指標,並不准確,偶發性和周期性的操作會導致 LRU 的命中率急劇下降,因此也提出了很多 LRU 的改進算法,例如 LRU-K,2Q,以及命中率更高的 LIRS 算法(它的論文是這么說的)。
后來,無意間看到了《OCP知識點講解 之 LRU鏈與臟LRU鏈》這篇文章,里面介紹了一種 Oracle 改進的 LRU 算法,感覺效果會不錯,實現起來也比較簡單,因此使用該算法來實現緩沖池。
這個改進的算法將 LRU 鏈從中間分成了兩半,前一半是熱端,后一半是冷端,如圖2所示。熱端記錄的是訪問次數大於等於兩次的對象,它們被認為可能會被經常訪問到,冷端記錄的是訪問次數為一次的對象。區分熱端和冷端,就可以保留訪問次數較大的對象,同時更快的淘汰掉只訪問了一次的對象,使命中情況更好,2Q 等改進算法使用的也是類似的思想。如果熱端和冷端並不平均分配,就可以更快或更慢的淘汰掉只訪問了一次的對象。
圖2 改進的 LRU 鏈表,圖片來自文章《OCP知識點講解 之 LRU鏈與臟LRU鏈》
改進算法中還為每個對象的添加了訪問次數,這樣可以將冷端被經常訪問的對象添加到熱端,熱端的對象也會被“降級”到冷端。下面則是具體的實現分析,為了方便起見,無論是熱端還是冷端,都將左邊作為頭,右邊作為尾。
1. 添加新對象
- 緩存空間還未滿:與標准的 LRU 一樣,總是插入到熱端的頭。因為此時緩存空間還足夠,沒必要區分對象是否被多次訪問,之后在淘汰舊對象時自然可以區分出來。
- 緩存空間已滿:這時先淘汰舊的對象,再將新對象添加到冷端的頭。因為新添加對象的訪問次數總是 1,所以應當放到冷端。
2. 訪問對象
這里直接遞增對象的訪問計數即可,因此這個算法的並發訪問效率可以提高一些。
3. 淘汰舊對象
淘汰舊對象是從冷端的末尾開始的,但並不是順序進行淘汰,而是如果冷端末尾的對象的訪問次數大於等於2,就認為可能會被經常訪問,則將它移動到熱端的頭,並將訪問次數清零,同時熱端的末尾被淘汰到冷端的頭。然后繼續嘗試淘汰,直到對象的訪問次數小於2。清零訪問次數保證了一定可以找到被淘汰的對象。
在淘汰舊對象時,如果訪問次數大於等於2,並不需要真的每次都移除末尾節點再將它移動到熱端的頭。因為雙向鏈表可以以環形存儲(System.Collections.Generic.LinkedList<T> 就是這么干的),也就是說 Head.Prev = Tail && Tail.Next = Head,所以這個移動過程就相當於將 head 變為 Head.Prev(也就是前移一位),如圖3所示:淘汰從 H 開始,但是 H 的訪問次數大於 2,就將 Head 從 A 移動到 H,H 自然就被添加到熱端的頭了,下一次嘗試也就從 G 繼續進行。
圖3 嘗試淘汰訪問次數大於等於 2 的 H。
淘汰舊對象這里感覺很像 Clock 算法,不過 Clock 總是存儲到剛被淘汰的位置,改進的 LRU 算法則是存儲到冷端的頭。
二、緩沖池接口定義
緩沖池接口的定義如下,是像 Dictioary 一樣利用鍵來檢索對象。在添加對象時,總是會覆蓋舊對象;在檢索時,由於必須要對對象的有效性進行判斷(說不定什么時候就被替換掉了),因此提供的是 TryGet 方法而不是拋異常。為了方便使用緩沖池,加入了 GetOrAdd,這樣就不用每次都自己判斷對象是否存在了。
using System; namespace Cyjb.Utility { /// <summary> /// 表示緩沖池的接口。 /// </summary> /// <typeparam name="TKey">緩沖對象的鍵的類型。</typeparam> /// <typeparam name="TValue">緩沖對象的類型。</typeparam> public interface ICache<TKey, TValue> { /// <summary> /// 將指定的鍵和對象添加到緩存中,無論鍵是否存在。 /// </summary> /// <param name="key">要添加的對象的鍵。</param> /// <param name="value">要添加的對象。</param> /// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception> void Add(TKey key, TValue value); /// <summary> /// 清空緩存中的所有對象。 /// </summary> void Clear(); /// <summary> /// 確定緩存中是否包含指定的鍵。 /// </summary> /// <param name="key">要在緩存中查找的鍵。</param> /// <returns>如果緩存中包含具有指定鍵的元素,則為 <c>true</c>;否則為 <c>false</c>。</returns> /// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception> bool Contains(TKey key); /// <summary> /// 從緩存中獲取與指定的鍵關聯的對象,如果不存在則將對象添加到緩存中。 /// </summary> /// <param name="key">要獲取的對象的鍵。</param> /// <param name="valueFactory">用於為鍵生成對象的函數。</param> /// <returns>如果在緩存中找到該鍵,則為對應的對象;否則為 <paramref name="valueFactory"/> 返回的新對象。</returns> /// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception> TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory); /// <summary> /// 從緩存中移除並返回具有指定鍵的對象。 /// </summary> /// <param name="key">要移除並返回的對象的鍵。</param> /// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception> void Remove(TKey key); /// <summary> /// 嘗試從緩存中獲取與指定的鍵關聯的對象。 /// </summary> /// <param name="key">要獲取的對象的鍵。</param> /// <param name="value">此方法返回時,<paramref name="value"/> 包含緩存中具有指定鍵的對象; /// 如果操作失敗,則包含默認值。</param> /// <returns>如果在緩存中找到該鍵,則為 <c>true</c>;否則為 <c>false</c>。</returns> /// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception> bool TryGet(TKey key, out TValue value); } }
三、緩沖池的實現
緩沖池中,使用 ReaderWriterLockSlim 讀寫鎖來實現並發訪問,自己實現了一個雙向環形鏈表來將對象鏈起來。如果對象實現了 System.IDisposable 接口,會在移除或替換對象時自動調用 Dispose 方法,這里需要注意的是 Dispose 最好在鎖之外調用,防止 Dispose 方法導致鎖不能盡快被釋放掉。
添加對象的核心代碼是:
LruNode<TKey, TValue> node; IDisposable disposable = null; cacheLock.EnterWriteLock(); try { if (cacheDict.TryGetValue(key, out node)) { // 更新節點。 node.Value = value; // 寫鎖互斥,這里不用 Interlocked。 node.VisitCount++; return; } else { if (count < maxSize) { // 將節點添加到熱端起始。 node = new LruNode<TKey, TValue>(key, value); AddHotFirst(node); // 寫鎖互斥,這里不用 Interlocked。 count++; if (count == hotSize + 1) { codeHead = head.Prev; } cacheDict.Add(key, node); return; } else { // 從冷端末尾嘗試淘汰舊節點,將訪問次數大於 1 的移動到熱端的頭。 // 由於雙向鏈表是環形存儲的,就相當於將 head 前移。 while (head.Prev.VisitCount >= 2) { // 清零訪問計數。 head.Prev.VisitCount = 0; head = head.Prev; codeHead = codeHead.Prev; } // 將 node 移除,並添加到冷端的頭。 node = head.Prev; disposable = node.Value as IDisposable; this.cacheDict.Remove(node.Key); this.Remove(node); // 這里直接重用舊節點。 node.Key = key; node.Value = value; node.VisitCount = 1; this.AddCodeFirst(node); cacheDict.Add(key, node); } } } finally { cacheLock.ExitWriteLock(); } if (disposable != null) { disposable.Dispose(); }
可以看到,如果要添加的對象的鍵已存在,是直接替換舊對象的。雙向鏈表的節點也都是重用的,所以沒必要使用對象池(應該不會經常刪除緩存項把……)。
在下面的 GetOrAdd 方法中,實際上就是分別調用了 TryGet 方法和 AddInternal 方法,這么做可能會導致不必要的對象創建,但是為了保證 valueFactory 不會長時間的占用寫鎖,因此只好這么做了。
public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory) { ExceptionHelper.CheckArgumentNull(key, "key"); ExceptionHelper.CheckArgumentNull(valueFactory, "valueFactory"); TValue value; if (this.TryGet(key, out value)) { return value; } value = valueFactory(key); this.AddInternal(key, value); return value; }
2013.7.15 更新:這里可以利用 System.Lazy<T> 類來防止多次調用 valueFactory,即將對象用 Lazy<TValue> 包裝起來,在獲取對象時只會獲取到 Lazy<TValue> 對象,實際對象的初始化則被 Lazy<TValue> 推遲了,同時也不會導致長時間占用寫鎖(只要得到了 Lazy<TValue> 對象就可以釋放鎖了),防止 valueFactory 多次被調用則由 Lazy<TValue> 完成。
緩沖池的實現基本就這樣了,具體的代碼可以見 Cyjb.Utility。
四、命中率的測試
最后,稍微寫了一段代碼來測試算法的命中率,測試的四個算法分別是 OPT,改進的 LRU,普通的 LRU 和 FIFO。下面先附上測試代碼:
int maxCacheSize = 200; // 初始化數據。 int[] data = new int[100000]; Random random = new Random(); for (int i = 0; i < data.Length; i++) { if (random.Next(10) < 1) { data[i] = random.Next(10000); } else { data[i] = random.Next(10000); } } // OPT。 int miss = 0; Dictionary<int, int> nextVisit = new Dictionary<int, int>(); HashSet<int> optCache = new HashSet<int>(); for (int i = 0; i < data.Length; i++) { if (!optCache.Contains(data[i])) { miss++; if (optCache.Count >= maxCacheSize) { int minI = 0, maxVisit = 0; foreach (int j in optCache) { if (!nextVisit.ContainsKey(j)) { nextVisit.Add(j, 0); } if (nextVisit[j] <= i) { nextVisit[j] = int.MaxValue; for (int k = i + 1; k < data.Length; k++) { if (data[k] == j) { nextVisit[j] = k; break; } } } if (nextVisit[j] > maxVisit) { maxVisit = nextVisit[j]; minI = j; if (maxVisit == int.MaxValue) { break; } } } optCache.Remove(minI); } optCache.Add(data[i]); } } Console.WriteLine("OPT:{0}%", 100 - miss * 100.0 / data.Length); // LRU。 miss = 0; ICache<int, int> cache = new LruCache<int, int>(maxCacheSize); for (int i = 0; i < data.Length; i++) { int value; if (!cache.TryGet(data[i], out value)) { cache.Add(data[i], data[i]); miss++; } } Console.WriteLine("LRU:{0}%", 100 - miss * 100.0 / data.Length); // 普通 LRU。 miss = 0; cache = new LruNormalCache<int, int>(maxCacheSize); for (int i = 0; i < data.Length; i++) { int value; if (!cache.TryGet(data[i], out value)) { cache.Add(data[i], data[i]); miss++; } } Console.WriteLine("LRU_Normal:{0}%", 100 - miss * 100.0 / data.Length); // FIFO。 miss = 0; Queue<int> queue = new Queue<int>(); HashSet<int> set = new HashSet<int>(); for (int i = 0; i < data.Length; i++) { if (!set.Contains(data[i])) { set.Add(data[i]); miss++; queue.Enqueue(data[i]); if (queue.Count > maxCacheSize) { set.Remove(queue.Dequeue()); } } } Console.WriteLine("FIFO:{0}%", 100 - miss * 100.0 / data.Length);
下面是一些測試結果,緩沖池大小都是 200,數據個數是 10W,第一列表示數據的范圍,50% 0~1000,50% 0~150 表示有 50% 的數據范圍是 0~1000,50% 的數據范圍是 0~150(這里給出的測試實際上不太嚴謹,數據范圍應該是取 0~150 和 151~1000 的,現在的測試方法實際上有 57.5% 的數據范圍是 0~150,42.5% 的數據范圍是 151~1000)。表格中的數值表示命中率。
數據范圍 | OPT | 改進LRU | 普通LRU | FIFO |
50% 0~1000,50% 0~150 | 70.745% | 50.966% | 41.532% | 37.325% |
10% 0~1000,90% 0~150 |
93.851%
|
91.79% | 89.941% | 78.46% |
10% 0~10000,90% 0~500 | 66.59% | 34.056% | 32.056% | 31.342% |
10% 0~10000,90% 0~5000 | 24.617% | 3.705% | 3.60599999999999% | 3.605% |
100% 0~10000 | 18.739% | 2.04000000000001% | 2.05500000000001% | 2.063% |
這些只是隨意的測試,僅供參考,不過看起來改進的 LRU 還是比較給力的,可以比較好的保留經常被訪問的數據。而當所有數據都是隨機訪問的時候,命中率反而會下降,甚至還不如 FIFO。
對於並發訪問的測試,我不太了解怎么弄,所以就沒有做,不過個人感覺 ReaderWriterLockSlim 應該就沒有問題了。