探索c#之storm的TimeCacheMap


閱讀目錄:

  1. 概述
  2. 算法介紹
  3. 清理線程
  4. 獲取、插入、刪除
  5. 總結

概述

最近在看storm,發現其中的TimeCacheMap算法設計頗為高效,就簡單分享介紹下。
思考一下如果需要一個帶過期淘汰的緩存容器,我們通常會使用定時器或線程去掃描容器,以便判斷是否過期從而刪除。但這樣性能並不友好,在數據量較大時O(n)檢查是一筆不小的開銷,並且在大量過期數據刪除時需要頻繁對容器加鎖,這會多少會影響到正常的數據讀寫刪除。
Storm設計了一種比較高效的時間緩存容器TimeCacheMap,它的算法可以在某個時間周期內將數據批量刪除,一次批量刪除只需要加一次鎖即可,並且其讀寫刪除復雜度均為O(1)。

算法介紹

TimeCacheMap把要緩存的數據分拆存儲到多個小容器內,這里稱為桶。另外有個線程專門在一定時間內去掃描這些桶,一旦發現過期后就把整個桶的數據給刪除掉。 其中第二步比較關鍵,它並不是傳統意義上的去定時掃描,而是根據過期時間來觸發,比如如果一個桶過期時間10s,那么這個線程就10秒觸發一次把整個桶刪除即可,當然多個桶的觸發策略會有所不同,但思路是同一個。   
為了更詳細的描述,用代碼和例子介紹如下:

    private LinkedList<Dictionary<K, V>> buckets;
    private readonly object Obj = new object();
    private static readonly int NumBuckets = 3;
    private Thread cleaner;

上面使用了k、v的形式作為緩存數據結構,每個Dictionary是一個桶,然后使用鏈表把多個桶存儲起來。Obj是要鎖的對象,NumBuckets是桶的數量,cleaner是清理線程。
在緩存初始化的時候,會實例三個空桶加入到buckets,清理線程開始啟動循環檢查,假設過期時間時30秒,桶的數量為3,當有新數據進來時,會全部加入到第一個桶中。

為了刪除性能,清理線程會定期把整個桶給刪除掉,一般我們會每次把鏈表中最后一個桶給清理掉,然后再加入一個新桶到鏈表頭部。
這種情況下就不能按照緩存過期時間去觸發線程清理了,因為有三個桶,如果每30秒觸發線程清理掉最后一個桶,那么第三個桶要等到第90秒才開始清理,很明顯這樣是不合理的。 正確的應該是第30秒開始清理,這時就需要調整線程觸發時間,比如調整成10秒,繼續模擬下:

  1. 觸發前1秒插入新數據到第一個桶,如果調整成10秒觸發,等到觸發刪除這個桶時才過了20秒,跟緩存過期時間30秒不一致同樣不合理,不管是1秒還是9秒都會導致提前刪除數據,需要繼續調整觸發時間。
  2. 如上緩存提前刪除不能允許的,但延遲刪除一般是可以接受的,因此可以加入一些冗余時間來保證不會提前刪除。 這里調整到15秒觸發,觸發前1秒插入的緩存桶正好在30秒后觸發刪除,達到不會提前刪除的目的。
  3. 如上在觸發前14秒插入數據,那就需要過了30秒+14秒才能刪除。

根據上面的模擬,調整到15秒觸發是一個比較合理的值,因此推出緩存最長過期時間的公式為:

expirationSecs * (1 + 1 / (numBuckets-1))

如果過期時間是30秒,其最長刪除時間是:

30*(1+1/(3-1))=30*(1+0.5)=45  

因此其過期時間范圍即為expirationSecs到expirationSecs * (1 + 1 / (numBuckets-1))之間。

清理線程

如上算法的介紹,我們在類型的構造函數中,實例化並啟動清理線程:

 public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallBack ex)
    {
        if (numBuckets < 2)
            throw new ArgumentException("numBuckets must be >=2");
        this.buckets = new LinkedList<Dictionary<K, V>>();
        for (int i = 0; i < numBuckets; i++)
            buckets.AddFirst(new Dictionary<K, V>());
        var expirationMillis = expirationSecs * 1000;
        var sleepTime = expirationMillis / (numBuckets - 1);
        cleaner = new Thread(() =>
        {
            while (true)
            {
                Dictionary<K, V> dead = null;
                Thread.Sleep(sleepTime);
                lock (Obj)
                {
                    dead = buckets.Last();
                    buckets.RemoveLast();
                    buckets.AddFirst(new Dictionary<K, V>());
                }
                if (ex != null)
                    ex(dead);
            }
        });
        cleaner.IsBackground = true;
        cleaner.Start();
    }

代碼執行步驟:

  1. 初始化桶加入到鏈表
  2. 計算緩存數據最長過期時間,並作為線程休眠的時間。
  3. 線程觸發時刪除最后一個桶並加入新的桶
  4. 不斷循環休眠觸發觸發
  5. 啟動線程

整個桶的數據刪除只需要加一次鎖即可,保證其高效。

獲取、插入、刪除

遍歷整個鏈表,查詢到第一個滿足key的立即返回,這需要保證不會有重復key。

   public V Get(K key)
        {
            lock (Obj)
            {
                foreach (var item in buckets)
                {
                    if (item.ContainsKey(key))
                        return item[key];
                }
                return default(V);
            }
        }

在插入時刪除對應的key,保證不會有重復的key出現。

 public void Put(K key, V value)
    {
        lock (Obj)
        {
            foreach (var item in buckets)
            {
                item.Remove(key);
            }
            buckets.First().Add(key, value);
        }
    }

刪除對應的key

    public void Remove(K key)
    {
        lock (Obj)
        {
            foreach (var item in buckets)
            {
                if (item.ContainsKey(key))
                    item.Remove(key);
            }
        }
    }

總結

那些年我們一起追過的緩存寫法(三)中有介紹過關於惰性刪除及高效LRU算法優化緩存容器的過期,有興趣的童鞋可以看看。
完整代碼中有容器Size、ContainsKey的實現,github-TimeCacheMap.c#
在storm中,spout發射的消息和acker的消息即保存在各自的TimeCacheMap里,如果消息超時后會自動通知spout的fail方法。 在storm0.8后TimeCacheMap被棄用了,使用的是新的RotatingMap,但設計和實現基本沒變,github-TimeCacheMap.javagithub-RotatingMap.java


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM