一個改進 LRU 算法的緩沖池 update 2013.7.15


首先,這里的緩沖池指的是 Cache,而不是 Buffer,就是指將代價較大的對象先存儲起來,以備以后需要的時候可以直接拿來用,能夠節約一些時間或空間。

當緩沖池中的對象過多時,就需要刪掉一些“不會再用”的對象來節約內存。但是沒人能夠知道某個對象什么時候會再用,因此這就涉及到緩存替換算法了,好的緩存替換算法可以有更大的概率刪掉“不會再用”的對象,能夠保留“很可能再用”的對象。

現在發明的緩存替換算法有很多,除了只有理論意義的 OPT(Optimal,最優替換,也就是上帝模式)和聽天由命的隨機替換,常用的有 LRU(Least recently used,最近最少使用),LFU(Least Frequently Used,最不經常使用)和 FIFO(First in first out,先進先出)以及很多改進算法。其中 LRU 算法經常被使用,我也打算使用 LRU 算法來實現一個緩沖池。

一、改進的 LRU 算法

LRU 算法認為最近被訪問的對象,以后也很可能被訪問。因此,在實現時,一般使用一個哈希表類存儲被緩存的對象,和一個雙向鏈表類存儲對象被使用的情況:如果一個對象被訪問了,就將它提升到鏈表的頭部,如圖1所示,這樣就能保證最近被訪問的對象總是更接近頭部。當添加新對象時,直接添加到鏈表的頭部;當需要淘汰舊對象時,就從鏈表的尾部(最久沒有被訪問)開始淘汰。

圖1 K 被訪問了,提升到鏈表的頭部。

LRU 算法的實現還是比較容易的,但是由於每個操作都會有鏈表的修改,因此在並發訪問時都每個操作需要加鎖(雙向鏈表沒有無鎖版的,只有單向鏈表有),不太利於並發訪問。

同時,LRU 只將對象“最近是否被訪問”作為度量指標,並不准確,偶發性和周期性的操作會導致 LRU 的命中率急劇下降,因此也提出了很多 LRU 的改進算法,例如 LRU-K,2Q,以及命中率更高的 LIRS 算法(它的論文是這么說的)。

后來,無意間看到了《OCP知識點講解 之 LRU鏈與臟LRU鏈》這篇文章,里面介紹了一種 Oracle 改進的 LRU 算法,感覺效果會不錯,實現起來也比較簡單,因此使用該算法來實現緩沖池。

這個改進的算法將 LRU 鏈從中間分成了兩半,前一半是熱端,后一半是冷端,如圖2所示。熱端記錄的是訪問次數大於等於兩次的對象,它們被認為可能會被經常訪問到,冷端記錄的是訪問次數為一次的對象。區分熱端和冷端,就可以保留訪問次數較大的對象,同時更快的淘汰掉只訪問了一次的對象,使命中情況更好,2Q 等改進算法使用的也是類似的思想。如果熱端和冷端並不平均分配,就可以更快或更慢的淘汰掉只訪問了一次的對象。

圖2 改進的 LRU 鏈表,圖片來自文章《OCP知識點講解 之 LRU鏈與臟LRU鏈》

改進算法中還為每個對象的添加了訪問次數,這樣可以將冷端被經常訪問的對象添加到熱端,熱端的對象也會被“降級”到冷端。下面則是具體的實現分析,為了方便起見,無論是熱端還是冷端,都將左邊作為頭,右邊作為尾。

1. 添加新對象

  • 緩存空間還未滿:與標准的 LRU 一樣,總是插入到熱端的頭。因為此時緩存空間還足夠,沒必要區分對象是否被多次訪問,之后在淘汰舊對象時自然可以區分出來。
  • 緩存空間已滿:這時先淘汰舊的對象,再將新對象添加到冷端的頭。因為新添加對象的訪問次數總是 1,所以應當放到冷端。

2. 訪問對象

這里直接遞增對象的訪問計數即可,因此這個算法的並發訪問效率可以提高一些。

3. 淘汰舊對象

淘汰舊對象是從冷端的末尾開始的,但並不是順序進行淘汰,而是如果冷端末尾的對象的訪問次數大於等於2,就認為可能會被經常訪問,則將它移動到熱端的頭,並將訪問次數清零,同時熱端的末尾被淘汰到冷端的頭。然后繼續嘗試淘汰,直到對象的訪問次數小於2。清零訪問次數保證了一定可以找到被淘汰的對象。

在淘汰舊對象時,如果訪問次數大於等於2,並不需要真的每次都移除末尾節點再將它移動到熱端的頭。因為雙向鏈表可以以環形存儲(System.Collections.Generic.LinkedList<T> 就是這么干的),也就是說 Head.Prev = Tail && Tail.Next = Head,所以這個移動過程就相當於將 head 變為 Head.Prev(也就是前移一位),如圖3所示:淘汰從 H 開始,但是 H 的訪問次數大於 2,就將 Head 從 A 移動到 H,H 自然就被添加到熱端的頭了,下一次嘗試也就從 G 繼續進行。

圖3 嘗試淘汰訪問次數大於等於 2 的 H。

淘汰舊對象這里感覺很像 Clock 算法,不過 Clock 總是存儲到剛被淘汰的位置,改進的 LRU 算法則是存儲到冷端的頭。

二、緩沖池接口定義

緩沖池接口的定義如下,是像 Dictioary 一樣利用鍵來檢索對象。在添加對象時,總是會覆蓋舊對象;在檢索時,由於必須要對對象的有效性進行判斷(說不定什么時候就被替換掉了),因此提供的是 TryGet 方法而不是拋異常。為了方便使用緩沖池,加入了 GetOrAdd,這樣就不用每次都自己判斷對象是否存在了。

using System;

namespace Cyjb.Utility {
	/// <summary>
	/// 表示緩沖池的接口。
	/// </summary>
	/// <typeparam name="TKey">緩沖對象的鍵的類型。</typeparam>
	/// <typeparam name="TValue">緩沖對象的類型。</typeparam>
	public interface ICache<TKey, TValue> {
		/// <summary>
		/// 將指定的鍵和對象添加到緩存中,無論鍵是否存在。
		/// </summary>
		/// <param name="key">要添加的對象的鍵。</param>
		/// <param name="value">要添加的對象。</param>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception>
		void Add(TKey key, TValue value);
		/// <summary>
		/// 清空緩存中的所有對象。
		/// </summary>
		void Clear();
		/// <summary>
		/// 確定緩存中是否包含指定的鍵。
		/// </summary>
		/// <param name="key">要在緩存中查找的鍵。</param>
		/// <returns>如果緩存中包含具有指定鍵的元素,則為 <c>true</c>;否則為 <c>false</c>。</returns>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception>
		bool Contains(TKey key);
		/// <summary>
		/// 從緩存中獲取與指定的鍵關聯的對象,如果不存在則將對象添加到緩存中。
		/// </summary>
		/// <param name="key">要獲取的對象的鍵。</param>
		/// <param name="valueFactory">用於為鍵生成對象的函數。</param>
		/// <returns>如果在緩存中找到該鍵,則為對應的對象;否則為 <paramref name="valueFactory"/> 返回的新對象。</returns>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception>
		TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory);
		/// <summary>
		/// 從緩存中移除並返回具有指定鍵的對象。
		/// </summary>
		/// <param name="key">要移除並返回的對象的鍵。</param>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception>
		void Remove(TKey key);
		/// <summary>
		/// 嘗試從緩存中獲取與指定的鍵關聯的對象。
		/// </summary>
		/// <param name="key">要獲取的對象的鍵。</param>
		/// <param name="value">此方法返回時,<paramref name="value"/> 包含緩存中具有指定鍵的對象;
		/// 如果操作失敗,則包含默認值。</param>
		/// <returns>如果在緩存中找到該鍵,則為 <c>true</c>;否則為 <c>false</c>。</returns>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 為 <c>null</c>。</exception>
		bool TryGet(TKey key, out TValue value);
	}
}

三、緩沖池的實現

緩沖池中,使用 ReaderWriterLockSlim 讀寫鎖來實現並發訪問,自己實現了一個雙向環形鏈表來將對象鏈起來。如果對象實現了 System.IDisposable 接口,會在移除或替換對象時自動調用 Dispose 方法,這里需要注意的是 Dispose 最好在鎖之外調用,防止 Dispose 方法導致鎖不能盡快被釋放掉。

添加對象的核心代碼是:

LruNode<TKey, TValue> node;
IDisposable disposable = null;
cacheLock.EnterWriteLock();
try {
	if (cacheDict.TryGetValue(key, out node)) {
		// 更新節點。
		node.Value = value;
		// 寫鎖互斥,這里不用 Interlocked。
		node.VisitCount++;
		return;
	} else {
		if (count < maxSize) {
			// 將節點添加到熱端起始。
			node = new LruNode<TKey, TValue>(key, value);
			AddHotFirst(node);
			// 寫鎖互斥,這里不用 Interlocked。
			count++;
			if (count == hotSize + 1) {
				codeHead = head.Prev;
			}
			cacheDict.Add(key, node);
			return;
		} else {
			// 從冷端末尾嘗試淘汰舊節點,將訪問次數大於 1 的移動到熱端的頭。
			// 由於雙向鏈表是環形存儲的,就相當於將 head 前移。
			while (head.Prev.VisitCount >= 2) {
				// 清零訪問計數。
				head.Prev.VisitCount = 0;
				head = head.Prev;
				codeHead = codeHead.Prev;
			}
			// 將 node 移除,並添加到冷端的頭。
			node = head.Prev;
			disposable = node.Value as IDisposable;
			this.cacheDict.Remove(node.Key);
			this.Remove(node);
			// 這里直接重用舊節點。
			node.Key = key;
			node.Value = value;
			node.VisitCount = 1;
			this.AddCodeFirst(node);
			cacheDict.Add(key, node);
		}
	}
} finally {
	cacheLock.ExitWriteLock();
}
if (disposable != null) {
	disposable.Dispose();
}

可以看到,如果要添加的對象的鍵已存在,是直接替換舊對象的。雙向鏈表的節點也都是重用的,所以沒必要使用對象池(應該不會經常刪除緩存項把……)。

在下面的 GetOrAdd 方法中,實際上就是分別調用了 TryGet 方法和 AddInternal 方法,這么做可能會導致不必要的對象創建,但是為了保證 valueFactory 不會長時間的占用寫鎖,因此只好這么做了。

public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
{
	ExceptionHelper.CheckArgumentNull(key, "key");
	ExceptionHelper.CheckArgumentNull(valueFactory, "valueFactory");
	TValue value;
	if (this.TryGet(key, out value))
	{
		return value;
	}
	value = valueFactory(key);
	this.AddInternal(key, value);
	return value;
}

2013.7.15 更新:這里可以利用 System.Lazy<T> 類來防止多次調用 valueFactory,即將對象用 Lazy<TValue> 包裝起來,在獲取對象時只會獲取到 Lazy<TValue> 對象,實際對象的初始化則被 Lazy<TValue> 推遲了,同時也不會導致長時間占用寫鎖(只要得到了 Lazy<TValue> 對象就可以釋放鎖了),防止 valueFactory 多次被調用則由 Lazy<TValue> 完成。

緩沖池的實現基本就這樣了,具體的代碼可以見 Cyjb.Utility

四、命中率的測試

最后,稍微寫了一段代碼來測試算法的命中率,測試的四個算法分別是 OPT,改進的 LRU,普通的 LRU 和 FIFO。下面先附上測試代碼:

int maxCacheSize = 200;
// 初始化數據。
int[] data = new int[100000];
Random random = new Random();
for (int i = 0; i < data.Length; i++) {
	if (random.Next(10) < 1) {
		data[i] = random.Next(10000);
	} else {
		data[i] = random.Next(10000);
	}
}

// OPT。
int miss = 0;
Dictionary<int, int> nextVisit = new Dictionary<int, int>();
HashSet<int> optCache = new HashSet<int>();
for (int i = 0; i < data.Length; i++) {
	if (!optCache.Contains(data[i])) {
		miss++;
		if (optCache.Count >= maxCacheSize) {
			int minI = 0, maxVisit = 0;
			foreach (int j in optCache) {
				if (!nextVisit.ContainsKey(j)) {
					nextVisit.Add(j, 0);
				}
				if (nextVisit[j] <= i) {
					nextVisit[j] = int.MaxValue;
					for (int k = i + 1; k < data.Length; k++) {
						if (data[k] == j) {
							nextVisit[j] = k;
							break;
						}
					}
				}
				if (nextVisit[j] > maxVisit) {
					maxVisit = nextVisit[j];
					minI = j;
					if (maxVisit == int.MaxValue) {
						break;
					}
				}
			}
			optCache.Remove(minI);
		}
		optCache.Add(data[i]);
	}
}
Console.WriteLine("OPT:{0}%", 100 - miss * 100.0 / data.Length);

// LRU。
miss = 0;
ICache<int, int> cache = new LruCache<int, int>(maxCacheSize);
for (int i = 0; i < data.Length; i++) {
	int value;
	if (!cache.TryGet(data[i], out value)) {
		cache.Add(data[i], data[i]);
		miss++;
	}
}
Console.WriteLine("LRU:{0}%", 100 - miss * 100.0 / data.Length);

// 普通 LRU。
miss = 0;
cache = new LruNormalCache<int, int>(maxCacheSize);
for (int i = 0; i < data.Length; i++) {
	int value;
	if (!cache.TryGet(data[i], out value))
	{
		cache.Add(data[i], data[i]);
		miss++;
	}
}
Console.WriteLine("LRU_Normal:{0}%", 100 - miss * 100.0 / data.Length);

// FIFO。
miss = 0;
Queue<int> queue = new Queue<int>();
HashSet<int> set = new HashSet<int>();
for (int i = 0; i < data.Length; i++) {
	if (!set.Contains(data[i])) {
		set.Add(data[i]);
		miss++;
		queue.Enqueue(data[i]);
		if (queue.Count > maxCacheSize) {
			set.Remove(queue.Dequeue());
		}
	}
}
Console.WriteLine("FIFO:{0}%", 100 - miss * 100.0 / data.Length);

下面是一些測試結果,緩沖池大小都是 200,數據個數是 10W,第一列表示數據的范圍,50% 0~1000,50% 0~150 表示有 50% 的數據范圍是 0~1000,50% 的數據范圍是 0~150(這里給出的測試實際上不太嚴謹,數據范圍應該是取 0~150 和 151~1000 的,現在的測試方法實際上有 57.5% 的數據范圍是 0~150,42.5% 的數據范圍是 151~1000)。表格中的數值表示命中率。

數據范圍 OPT 改進LRU 普通LRU FIFO
50% 0~1000,50% 0~150 70.745% 50.966% 41.532% 37.325%
10% 0~1000,90% 0~150
93.851%
91.79% 89.941% 78.46%
10% 0~10000,90% 0~500 66.59% 34.056% 32.056% 31.342%
10% 0~10000,90% 0~5000 24.617% 3.705% 3.60599999999999% 3.605%
100% 0~10000 18.739% 2.04000000000001% 2.05500000000001% 2.063%

這些只是隨意的測試,僅供參考,不過看起來改進的 LRU 還是比較給力的,可以比較好的保留經常被訪問的數據。而當所有數據都是隨機訪問的時候,命中率反而會下降,甚至還不如 FIFO。

對於並發訪問的測試,我不太了解怎么弄,所以就沒有做,不過個人感覺 ReaderWriterLockSlim 應該就沒有問題了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM