Storm常見模式——TimeCacheMap


Storm中使用一種叫做TimeCacheMap的數據結構,用於在內存中保存近期活躍的對象,它的實現非常地高效,而且可以自動刪除過期不再活躍的對象。

TimeCacheMap使用多個桶buckets來縮小鎖的粒度,以此換取高並發讀寫性能。下面我們來看看TimeCacheMap內部是如何實現的。

1. 實現原理

桶鏈表:鏈表中每個元素是一個HashMap,用於保存key,value格式的數據。

    private LinkedList<HashMap<K, V>> _buckets;

鎖對象:用於對TimeCacheMap進行get/put等操作時上鎖保證原子性。

    private final Object _lock = new Object();

后台清理線程:負責超時后清理數據。

    private Thread _cleaner;

超時回調接口:用於超時后進行函數回調,做一些其他處理。

    public static interface ExpiredCallback<K, V> {
        public void expire(K key, V val);
    }
    private ExpiredCallback _callback;

有了以上數據結構,下面來看看構造函數的具體實現:

1、 首先,初始化指定個數的bucket,以鏈式鏈表形式存儲,每個bucket中放入空的HashMap;

2、 然后,設置清理線程,處理流程為:

  a)   休眠expirationMillis / (numBuckets-1)毫秒時間(即:expirationSecs / (numBuckets-1)秒);

  b)   對_lock對象上鎖,然后從buckets鏈表中移除最后一個元素;

  c)   向buckets鏈表頭部新加入一個空的HashMap桶,解除_lock對象鎖;

  d)   如果設置了callback函數,則進行回調。

    public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
        if(numBuckets<2) {
            throw new IllegalArgumentException("numBuckets must be >= 2");
        }
        _buckets = new LinkedList<HashMap<K, V>>();
        for(int i=0; i<numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }


        _callback = callback;
        final long expirationMillis = expirationSecs * 1000L;
        final long sleepTime = expirationMillis / (numBuckets-1);
        _cleaner = new Thread(new Runnable() {
            public void run() {
                try {
                    while(true) {
                        Map<K, V> dead = null;
                        Time.sleep(sleepTime);
                        synchronized(_lock) {
                            dead = _buckets.removeLast();
                            _buckets.addFirst(new HashMap<K, V>());
                        }
                        if(_callback!=null) {
                            for(Entry<K, V> entry: dead.entrySet()) {
                                _callback.expire(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                } catch (InterruptedException ex) {

                }
            }
        });
        _cleaner.setDaemon(true);
        _cleaner.start();
    }

構造函數需要傳遞三個參數:expirationSecs:超時的時間,單位為秒;numBuckets:桶的個數;callback:超時回調函數。

為了方便使用,還提供了以下三種形式的構造函數,使用時可以根據需要選擇:

    //this default ensures things expire at most 50% past the expiration time
    private static final int DEFAULT_NUM_BUCKETS = 3;
    public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
        this(expirationSecs, DEFAULT_NUM_BUCKETS, callback);
    }
    public TimeCacheMap(int expirationSecs, int numBuckets) {
        this(expirationSecs, numBuckets, null);
    }
    public TimeCacheMap(int expirationSecs) {
        this(expirationSecs, DEFAULT_NUM_BUCKETS);
    }

2. 性能分析

get操作:遍歷各個bucket,如果存在指定的key則返回,時間復雜度為O(numBuckets)

    public V get(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return bucket.get(key);
                }
            }
            return null;
        }
    }

put操作:將key,value放到_buckets的第一個桶中,然后遍歷其他numBuckets-1個桶,從HashMap中移除其中鍵為key的記錄,時間復雜度為O(numBuckets)

    public void put(K key, V value) {
        synchronized(_lock) {
            Iterator<HashMap<K, V>> it = _buckets.iterator();
            HashMap<K, V> bucket = it.next();
            bucket.put(key, value);
            while(it.hasNext()) {
                bucket = it.next();
                bucket.remove(key);
            }
        }
    }

remove操作:遍歷各個bucket,如果存在以key為鍵的記錄,直接刪除,時間復雜度為O(numBuckets)

    public Object remove(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return bucket.remove(key);
                }
            }
            return null;
        }
    }

containsKey操作:遍歷各個bucket,如果存在指定的key則返回true,否則返回false,時間復雜度為O(numBuckets)

    public boolean containsKey(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return true;
                }
            }
            return false;
        }
    }

size操作:遍歷各個bucket,累加各個bucket的HashMap的大小,時間復雜度為O (numBuckets)

    public int size() {
        synchronized(_lock) {
            int size = 0;
            for(HashMap<K, V> bucket: _buckets) {
                size+=bucket.size();
            }
            return size;
        }
    }

3. 超時時間

經過上面對put操作和_cleaner線程的分析,我們已經知道:

  a) put操作將數據放到_buckets的第一個桶中,然后遍歷其他numBuckets-1個桶,從HashMap中移除其中鍵為key的記錄;

  b) _cleaner線程每隔expirationSecs / (numBuckets-1)秒會把_buckets中最后一個桶中的數據從TimeCacheMap中移除掉。

因此,假設_cleaner線程剛剛清理數據,put函數調用發生將key放入桶中,那么一條數據的超時時間為:

expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1))

然而,假設put函數調用剛剛執行結束,_cleaner線程就開始清理數據,那么一條數據的超時時間為:

expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs

4. 總結

1、 TimeCacheMap的高效之處在於鎖的粒度小,O(1)時間內完成鎖操作,因此,大部分時間內都可以進行get和put操作。

2、 get,put,remove,containsKey和size操作都可以在O(numBuckets)時間內完成,其中numBuckets是桶的個數,默認為3。

3、 未更新數據的超時時間在expirationSecs和expirationSecs * (1 + 1 / (numBuckets-1))之間。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM