1.背景
LRU-least recently used-最近最少使用算法,是一種內存數據淘汰策略,使用常見是當內存不足時,需要淘汰最近最少使用的數據。LRU常用語緩存系統的淘汰策略。
2.LRU原理
LRU最早實在操作系統接觸到這個算法的,如下如所示。
這里的棧有別於咱們后進先出的數據結構,主要用來描述原理本身。從途中可知LRU是如何實行淘汰的,同時,大家可能也意識到這種實現可能性能並不太好,存在大量的拷貝動作。
3.LRU算法實現
我們先從一道LRU設計算法題開始。
算法題:LRU緩存機制
運用你所掌握的數據結構,設計和實現一個 LRU (最近最少使用) 緩存機制。它應該支持以下操作: 獲取數據 get 和 寫入數據 put 。
獲取數據 get(key) - 如果關鍵字 (key) 存在於緩存中,則獲取關鍵字的值(總是正數),否則返回 -1。
寫入數據 put(key, value) - 如果關鍵字已經存在,則變更其數據值;如果關鍵字不存在,則插入該組「關鍵字/值」。當緩存容量達到上限時,它應該在寫入新數據之前刪除最久未使用的數據值,從而為新的數據值留出空間。
進階: 你是否可以在 O(1) 時間復雜度內完成這兩種操作?
示例要求:
LRUCache cache = new LRUCache( 2 /* 緩存容量 */ ); cache.put(1, 1); cache.put(2, 2); cache.get(1); // 返回 1 cache.put(3, 3); // 該操作會使得關鍵字 2 作廢 cache.get(2); // 返回 -1 (未找到) cache.put(4, 4); // 該操作會使得關鍵字 1 作廢 cache.get(1); // 返回 -1 (未找到) cache.get(3); // 返回 3 cache.get(4); // 返回 4
分析:
如果使用數組來實現一個基於LRU的緩存,按照LRU原理要求可以預知存在大量的拷貝操作,性能上可能無法滿足。設計一個LRU緩存,滿足放入和移出都是O(1),我們需要把訪問次序維護好,但是這個次序的維護並不需要在內存中真正排序來反應,按照這種思路,有一種實現方法就是雙向鏈表。
API定義
class LRUCache { public LRUCache(int capacity) { } public int get(int key) { } public void save(int key, int value) { } }
基於 HashMap + 雙向鏈表 實現LRU
HahsMap用於快速查找到結點所在位置,然后將使用到的結點放在對頭,這樣最近最少使用的結點自然就落入到隊尾。雙向鏈表提供了良好的靈活性,兩邊可達。如下圖所示。
假設我們需要執行如下操作:
save("key1", 7) save("key2", 0) save("key3", 1) save("key4", 2) get("key2") save("key5", 3) get("key2") save("key6", 4)
使用HashMap + 雙向鏈表數據結構實現的LRU操作雙向鏈表部分的軌跡如下。(下圖中有一個錯誤,應該是 s(key4, 2) )
算法操作步驟如下:
1. save(key, value): 1. 首先在 HashMap 找到 Key 對應的節點,如果節點存在,更新節點的值,並把這個節點移動隊頭。 2. 如果不存在,需要構造新的節點,並且嘗試把節點塞到隊頭。 3. 如果LRU空間不足,則通過 tail 淘汰掉隊尾的節點,同時在 HashMap 中移除 Key。 2. get(key): 1. 通過 HashMap 找到 LRU 鏈表節點,因為根據LRU 原理,這個節點是最新訪問的,所以要把節點插入到隊頭,然后返回緩存的值。
算法實現
由於可能存在並發讀寫LRUCache,因此需要保證線程安全。
public class LRUCache { class DLinkedNode { String key; int value; DLinkedNode pre; DLinkedNode post; } private ConcurrentMap<String, DLinkedNode> cache = new ConcurrentHashMap<String, DLinkedNode>(); private int count; private int capacity; private DLinkedNode head, tail; public LRUCache(int capacity) { this.count = 0; this.capacity = capacity; head = new DLinkedNode(); head.pre = null; tail = new DLinkedNode(); tail.post = null; head.post = tail; tail.pre = head; } public int get(String key) { DLinkedNode node = cache.get(key); if(node == null){ return -1; // should raise exception here. } moveToHead(node); return node.value; } public void put(String key, int value) { DLinkedNode node = cache.get(key); if (node != null) { node.value = value; moveToHead(node); return; } DLinkedNode newNode = new DLinkedNode(); newNode.key = key; newNode.value = value; cache.put(key, newNode); addNode(newNode); ++count; if(count > capacity){ // pop the tail DLinkedNode tail = popTail(); cache.remove(tail.key); --count; } } private void addNode(DLinkedNode node){ node.pre = head; node.post = head.post; head.post.pre = node; head.post = node; } private void removeNode(DLinkedNode node){ DLinkedNode pre = node.pre; DLinkedNode post = node.post; pre.post = post; post.pre = pre; } private void moveToHead(DLinkedNode node){ removeNode(node); addNode(node); } private DLinkedNode popTail(){ DLinkedNode res = tail.pre; removeNode(res); return res; } }
采用HashMap + 雙向鏈表,提供了很好的讀寫操作,且能在O(1)內完成讀寫操作。那么,Redis的淘汰策略是不是也是根據LRU,如果是,它的淘汰算法是不是也采用的這種數據結果?
Python 算法實現:
class DLinkedNode: def __init__(self, key=0, value=0): self.key = key self.value = value self.prev = None self.next = None class LRUCache: def __init__(self, capacity: int): self.cache = dict() self.head = DLinkedNode() self.tail = DLinkedNode() self.head.next = self.tail self.tail.prev = self.head self.capacity = capacity self.size = 0 def get(self, key: int) -> int: if key not in self.cache: return -1 node = self.cache[key] # move this node to head self.moveToHead(node) return node.value def put(self, key: int, value: int) -> None: # old key if key in self.cache: node = self.cache[key] node.value = value self.moveToHead(node) # new key else: node = DLinkedNode(key, value) self.cache[key] = node self.addToHead(node) self.size += 1 if self.size > self.capacity: tail = self.removeTail() del self.cache[tail.key] self.size -= 1 def addToHead(self, node): node.next = self.head.next node.next.prev = node self.head.next = node node.prev = self.head def removeNode(self, node): prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def moveToHead(self, node): self.removeNode(node) self.addToHead(node) def removeTail(self): node = self.tail.prev self.removeNode(node) return node # Your LRUCache object will be instantiated and called as such: # obj = LRUCache(capacity) # param_1 = obj.get(key) # obj.put(key,value)
題目鏈接: https://leetcode-cn.com/problems/lru-cache/
4. Redis LRU算法實現
分析Redis LRU實現之前,我們先了解一下Redis緩存淘汰策略。
當Redis內存超出物理內存限制時,內存會頻繁的磁盤swap區交換數據,而交換會導致redis對外服務性能的急劇下降,這在生產環境是不允許的。說得更明白些,在生產環境是不允許交換行為的,通過設置maxmemory可限制內存超過期望大小。
當實際需要的內存大於maxmemory時,Redis提供了6種可選策略:
- noeviction:不繼續提供寫服務,讀請求可以繼續;
- volatile-lru:嘗試淘汰設置了過期時間的key,最少使用的key優先淘汰。也就是說沒有設置過期時間的key不會被淘汰;
- volatile-ttl:也是淘汰設置了過期時間的key,只不過策略不是lru,而是根據剩余壽命的ttl值,ttl越小越優先被淘汰;
- volatile-random:同理,也是淘汰設置了過期時間的key,只不過策略是隨機;
- allkeys-lru:類比volatile-lru,只不過未設置過期時間的key也在淘汰范圍;
- allkeys-random:類比volatile-random,只不過未設置過期時間的key也在淘汰范圍。
采用HashMap + 雙向循環鏈表具有較好的讀寫性能,但是有沒有發現什么問題呢?對,HashMap和鏈表都存在空間浪費的情況,HashMap本來就很耗內存,雙向鏈表由於需要空間存儲指針,兩種數據結構空間使用率都不高,這顯然很不划算。
針對這個問題,Redis采用了近似的做法,我們來分析分析。
首先,針對問題本身,我們需要淘汰的是最近未使用的相對比較舊的數據淘汰掉,那么,我們是否一定得非常精確地淘汰掉最舊的數據還是只需要淘汰掉比較舊的數據?
咱們來看下Redis是如何實現的。Redis做法很簡單:隨機取若干個key,然后按照訪問時間排序,淘汰掉最不經常使用的數據。為此,Redis給每個key額外增加了一個24bit長度的字段,用於保存最后一次被訪問的時鍾(Redis維護了一個全局LRU時鍾lruclock:REDIS_LUR_BITS,時鍾分辨率默認1秒)。
下面我們看下采用volatile_lru和allkeys-lru是如何實現的,關鍵代碼如下。
// 評估object空閑時間 unsigned long estimateObjectIdleTime(robj *o) { if (server.lruclock >= o->lru) { return (server.lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION; } else { return ((REDIS_LRU_CLOCK_MAX - o->lru) + server.lruclock) * REDIS_LRU_CLOCK_RESOLUTION; } } // LRU淘汰算法實現 ...... /* volatile-lru and allkeys-lru policy */ else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU || server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) { for (k = 0; k < server.maxmemory_samples; k++) { sds thiskey; long thisval; robj *o; de = dictGetRandomKey(dict); thiskey = dictGetKey(de); if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) de = dictFind(db->dict, thiskey); o = dictGetVal(de); thisval = estimateObjectIdleTime(o); /* Higher idle time is better candidate for deletion */ if (bestkey == NULL || thisval > bestval) { bestkey = thiskey; bestval = thisval; } } } ......
redis會基於server.maxmemory_samples配置選取固定數目的key,然后比較它們的lru訪問時間,然后淘汰最近最久沒有訪問的key,maxmemory_samples的值越大,Redis的近似LRU算法就越接近於嚴格LRU算法,但是相應消耗也變高,對性能有一定影響,樣本值默認為5。
上圖是Redis官網的一組LRU統計數據,Redis3.0以上采用近視LRU算法獲得了不錯的效果。從Redis實現我們看出,在商業世界,為了追求空間的利用率,也會采用權衡的實現方案。
總結
LRU是緩存系統中常見的淘汰策略,當內存不足時,我們需要淘汰掉最近最少使用的數據,LRU就是實現這種策略的統稱。LRU算法實現可以基於HashMap + 雙向鏈表的數據結構實現高效數據讀寫,於此同時,高效查詢卻帶來了高內存消耗的的問題,為此Redis選擇了近似LRU算法,隨機采用一組key,選擇最老的數據進行刪除,能夠達到類似的效果。
參考鏈接: