TimeCacheMap是Twitter Storm里面一個類, Storm使用它來保存那些最近活躍的對象,並且可以自動刪除那些已經過期的對象。
不過在storm0.8之后TimeCacheMap被棄用了,取而代之的是RotatingMap。
RotatingMap與TimeCacheMap的區別如下:
- 1.前者去掉了自動清理的線程,讓用戶自己去控制清理過期的數據,控制清理數據用rotate()方法,就是去尾加新頭。
- 2.前者get,put等方法都不加鎖了,需要用戶自己控制鎖
總之就是提供了更大的自由度,讓開發者去控制這個數據結構!下面先具體分析TimeCacheMap,而后RotatingMap就一目了然了
我直接在源碼中,加上中文的注釋分析源碼TimeCacheMap
package backtype.storm.utils; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import backtype.storm.utils.Time; /** * Expires keys that have not been updated in the configured number of seconds. * The algorithm used will take between expirationSecs and * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message. * * get, put, remove, containsKey, and size take O(numBuckets) time to run. * * The advantage of this design is that the expiration thread only locks the object * for O(1) time, meaning the object is essentially always available for gets/puts. */ /** *如果在配置的時間內沒有更新數據,這個數據就會被刪 *expirationSecs * (1 + 1 / (numBuckets-1))解釋: * *假設_cleaner線程剛剛清理數據,put函數調用發生將key放入桶中,那么一條數據的超時時間為: *expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1)) *然而,假設put函數調用剛剛執行結束,_cleaner線程就開始清理數據,那么一條數據的超時時間為: *expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs * *這個數據結構最大的好處是:數據分成多個桶,鎖的粒度小,只要O(1)的復雜度就可以刪掉過期數據。因此,大部分時間都可以進行get和put操作 */ //deprecated in favor of non-threaded RotatingMap //雖然在storm0.8之后TimeCacheMap被棄用了,不過其設計還是很獨到的,值得一探究竟 @Deprecated public class TimeCacheMap<K, V> { //this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; //回調函數實現這個接口就可以,至少可以把刪掉的元素傳回去 public static interface ExpiredCallback<K, V> { public void expire(K key, V val); } //把數據分成多個桶,用鏈表是因為在頭尾的增減操作時O(1) private LinkedList<HashMap<K, V>> _buckets; private final Object _lock = new Object(); private Thread _cleaner; private ExpiredCallback _callback; public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if(numBuckets<2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } //構造函數中,按照桶的數量,初始桶 _buckets = new LinkedList<HashMap<K, V>>(); for(int i=0; i<numBuckets; i++) { _buckets.add(new HashMap<K, V>()); } _callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets-1); _cleaner = new Thread(new Runnable() { public void run() { try { while(true) { Map<K, V> dead = null; Time.sleep(sleepTime); synchronized(_lock) { //刪掉最后一個桶,在頭補充一個新的桶,最后一個桶的數據是最舊的 dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); } if(_callback!=null) { for(Entry<K, V> entry: dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } } } catch (InterruptedException ex) { } } }); //作為守護線程運行,一旦主線程不在,這個線程自動結束 _cleaner.setDaemon(true); _cleaner.start(); } public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) { this(expirationSecs, DEFAULT_NUM_BUCKETS, callback); } public TimeCacheMap(int expirationSecs) { this(expirationSecs, DEFAULT_NUM_BUCKETS); } public TimeCacheMap(int expirationSecs, int numBuckets) { this(expirationSecs, numBuckets, null); } public boolean containsKey(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return true; } } return false; } } public V get(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.get(key); } } return null; } } public void put(K key, V value) { synchronized(_lock) { Iterator<HashMap<K, V>> it = _buckets.iterator(); HashMap<K, V> bucket = it.next(); //在第一個桶上更新數據 bucket.put(key, value); //去掉后面桶的數據 while(it.hasNext()) { bucket = it.next(); bucket.remove(key); } } } public Object remove(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.remove(key); } } return null; } } public int size() { synchronized(_lock) { int size = 0; for(HashMap<K, V> bucket: _buckets) { size+=bucket.size(); } return size; } }
//這個方法也太迷惑人了,作用就是把清理線程殺掉,這樣數據就不會過期了,應該改名叫neverCleanup public void cleanup() { //中斷清理線程中的sleep,_cleaner線程會拋出異常,然后_cleaner線程就死了,不再清理過期數據了 _cleaner.interrupt(); //調用了interrupt后,再跑sleep就會拋InterruptedException異常
} }
RotatingMap源碼幾乎和TimeCacheMap一樣,就是去掉清理線程去掉鎖,加了一個rotate()方法開發者自己清理過期數據
package backtype.storm.utils; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; /** * Expires keys that have not been updated in the configured number of seconds. * The algorithm used will take between expirationSecs and * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message. * * get, put, remove, containsKey, and size take O(numBuckets) time to run. * * The advantage of this design is that the expiration thread only locks the object * for O(1) time, meaning the object is essentially always available for gets/puts. */ public class RotatingMap<K, V> { //this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; public static interface ExpiredCallback<K, V> { public void expire(K key, V val); } private LinkedList<HashMap<K, V>> _buckets; private ExpiredCallback _callback; public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) { if(numBuckets<2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } _buckets = new LinkedList<HashMap<K, V>>(); for(int i=0; i<numBuckets; i++) { _buckets.add(new HashMap<K, V>()); } _callback = callback; } public RotatingMap(ExpiredCallback<K, V> callback) { this(DEFAULT_NUM_BUCKETS, callback); } public RotatingMap(int numBuckets) { this(numBuckets, null); } public Map<K, V> rotate() { Map<K, V> dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); if(_callback!=null) { for(Entry<K, V> entry: dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } return dead; } public boolean containsKey(K key) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return true; } } return false; } public V get(K key) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.get(key); } } return null; } public void put(K key, V value) { Iterator<HashMap<K, V>> it = _buckets.iterator(); HashMap<K, V> bucket = it.next(); bucket.put(key, value); while(it.hasNext()) { bucket = it.next(); bucket.remove(key); } } public Object remove(K key) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.remove(key); } } return null; } public int size() { int size = 0; for(HashMap<K, V> bucket: _buckets) { size+=bucket.size(); } return size; } }