一、引子
緩存有很多種解決方案,常見的是:
1.存儲在內存中 : 內存緩存顧名思義直接存儲在JVM內存中,JVM宕機那么內存丟失,讀寫速度快,但受內存大小的限制,且有丟失數據風險。
2.存儲在磁盤中: 即從內存落地並序列化寫入磁盤的緩存,持久化在磁盤,讀寫需要IO效率低,但是安全。
3.內存+磁盤組合方式:這種組合模式有很多成熟緩存組件,也是高效且安全的策略,比如redis。
本文分析常用的內存緩存:google cache。源碼包:com.google.guava:guava:22.0 jar包下的pcom.google.common.cache包,適用於高並發讀寫場景,可自定義緩存失效策略。
二、使用方法
2.1 CacheBuilder有3種失效重載模式
1.expireAfterWrite
當 創建 或 寫之后的 固定 有效期到達時,數據會被自動從緩存中移除,源碼注釋如下:
1 /**指明每個數據實體:當 創建 或 最新一次更新 之后的 固定值的 有效期到達時,數據會被自動從緩存中移除 2 * Specifies that each entry should be automatically removed from the cache once a fixed duration 3 * has elapsed after the entry's creation, or the most recent replacement of its value. 4 *當間隔被設置為0時,maximumSize設置為0,忽略其它容量和權重的設置。這使得測試時 臨時性地 禁用緩存且不用改代碼。 5 * <p>When {@code duration} is zero, this method hands off to {@link #maximumSize(long) 6 * maximumSize}{@code (0)}, ignoring any otherwise-specified maximum size or weight. This can be 7 * useful in testing, or to disable caching temporarily without a code change. 8 *過期的數據實體可能會被Cache.size統計到,但不能進行讀寫,數據過期后會被清除。 9 * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or 10 * write operations. Expired entries are cleaned up as part of the routine maintenance described 11 * in the class javadoc. 12 * 13 * @param duration the length of time after an entry is created that it should be automatically 14 * removed 15 * @param unit the unit that {@code duration} is expressed in 16 * @return this {@code CacheBuilder} instance (for chaining) 17 * @throws IllegalArgumentException if {@code duration} is negative 18 * @throws IllegalStateException if the time to live or time to idle was already set 19 */ 20 public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) { 21 checkState( 22 expireAfterWriteNanos == UNSET_INT, 23 "expireAfterWrite was already set to %s ns", 24 expireAfterWriteNanos); 25 checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit); 26 this.expireAfterWriteNanos = unit.toNanos(duration); 27 return this; 28 }
2.expireAfterAccess
指明每個數據實體:當 創建 或 寫 或 讀 之后的 固定值的有效期到達時,數據會被自動從緩存中移除。讀寫操作都會重置訪問時間,但asMap方法不會。源碼注釋如下:
1 /**指明每個數據實體:當 創建 或 更新 或 訪問 之后的 固定值的有效期到達時,數據會被自動從緩存中移除。讀寫操作都會重置訪問時間,但asMap方法不會。 2 * Specifies that each entry should be automatically removed from the cache once a fixed duration 3 * has elapsed after the entry's creation, the most recent replacement of its value, or its last 4 * access. Access time is reset by all cache read and write operations (including 5 * {@code Cache.asMap().get(Object)} and {@code Cache.asMap().put(K, V)}), but not by operations 6 * on the collection-views of {@link Cache#asMap}. 7 * 后面的同expireAfterWrite 8 * <p>When {@code duration} is zero, this method hands off to {@link #maximumSize(long) 9 * maximumSize}{@code (0)}, ignoring any otherwise-specified maximum size or weight. This can be 10 * useful in testing, or to disable caching temporarily without a code change. 11 * 12 * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or 13 * write operations. Expired entries are cleaned up as part of the routine maintenance described 14 * in the class javadoc. 15 * 16 * @param duration the length of time after an entry is last accessed that it should be 17 * automatically removed 18 * @param unit the unit that {@code duration} is expressed in 19 * @return this {@code CacheBuilder} instance (for chaining) 20 * @throws IllegalArgumentException if {@code duration} is negative 21 * @throws IllegalStateException if the time to idle or time to live was already set 22 */ 23 public CacheBuilder<K, V> expireAfterAccess(long duration, TimeUnit unit) { 24 checkState( 25 expireAfterAccessNanos == UNSET_INT, 26 "expireAfterAccess was already set to %s ns", 27 expireAfterAccessNanos); 28 checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit); 29 this.expireAfterAccessNanos = unit.toNanos(duration); 30 return this; 31 }
3.refreshAfterWrite
指明每個數據實體:當 創建 或 寫 之后的 固定值的有效期到達時,且新請求過來時,數據會被自動刷新(注意不是刪除是異步刷新,不會阻塞讀取,先返回舊值,異步重載到數據返回后復寫新值)。源碼注釋如下:
1 /**指明每個數據實體:當 創建 或 更新 之后的 固定值的有效期到達時,數據會被自動刷新。刷新方法在LoadingCache接口的refresh()申明,實際最終調用的是CacheLoader的reload() 2 * Specifies that active entries are eligible for automatic refresh once a fixed duration has 3 * elapsed after the entry's creation, or the most recent replacement of its value. The semantics 4 * of refreshes are specified in {@link LoadingCache#refresh}, and are performed by calling 5 * {@link CacheLoader#reload}. 6 * 默認reload是同步方法,所以建議用戶覆蓋reload方法,否則刷新將在無關的讀寫操作間操作。 7 * <p>As the default implementation of {@link CacheLoader#reload} is synchronous, it is 8 * recommended that users of this method override {@link CacheLoader#reload} with an asynchronous 9 * implementation; otherwise refreshes will be performed during unrelated cache read and write 10 * operations. 11 * 12 * <p>Currently automatic refreshes are performed when the first stale request for an entry 13 * occurs. The request triggering refresh will make a blocking call to {@link CacheLoader#reload} 14 * and immediately return the new value if the returned future is complete, and the old value 15 * otherwise.觸發刷新操作的請求會阻塞調用reload方法並且當返回的Future完成時立即返回新值,否則返回舊值。 16 * 17 * <p><b>Note:</b> <i>all exceptions thrown during refresh will be logged and then swallowed</i>. 18 * 19 * @param duration the length of time after an entry is created that it should be considered 20 * stale, and thus eligible for refresh 21 * @param unit the unit that {@code duration} is expressed in 22 * @return this {@code CacheBuilder} instance (for chaining) 23 * @throws IllegalArgumentException if {@code duration} is negative 24 * @throws IllegalStateException if the refresh interval was already set 25 * @since 11.0 26 */ 27 @GwtIncompatible // To be supported (synchronously). 28 public CacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) { 29 checkNotNull(unit); 30 checkState(refreshNanos == UNSET_INT, "refresh was already set to %s ns", refreshNanos); 31 checkArgument(duration > 0, "duration must be positive: %s %s", duration, unit); 32 this.refreshNanos = unit.toNanos(duration); 33 return this; 34 }
2.2 測試驗證
1)定義一個靜態的LoadingCache,用cacheBuilder構造緩存,分別定義了同步load(耗時2秒)和異步reload(耗時2秒)方法。
2)在main方法中,往緩存中設置值,定義3個線程,用CountDownLatch倒計時器模擬3個線程並發讀取緩存,最后在主線程分別5秒、0.5秒、2秒時get緩存。
測試代碼如下:
1 package guava; 2 3 import com.google.common.cache.CacheBuilder; 4 import com.google.common.cache.CacheLoader; 5 import com.google.common.cache.LoadingCache; 6 import com.google.common.util.concurrent.ListenableFuture; 7 import com.google.common.util.concurrent.ListeningExecutorService; 8 import com.google.common.util.concurrent.MoreExecutors; 9 10 import java.util.Date; 11 import java.util.Random; 12 import java.util.concurrent.Callable; 13 import java.util.concurrent.CountDownLatch; 14 import java.util.concurrent.Executors; 15 import java.util.concurrent.TimeUnit; 16 17 /** 18 * @ClassName guava.LoadingCacheTest 19 * @Description 注意refresh並不會主動刷新,而是被檢索觸發更新value,且隨時可返回舊值 20 * @Author denny 21 * @Date 2018/4/28 下午12:10 22 */ 23 public class LoadingCacheTest { 24 25 // guava線程池,用來產生ListenableFuture 26 private static ListeningExecutorService service = MoreExecutors.listeningDecorator( 27 Executors.newFixedThreadPool(10)); 28 29 /** 30 * 1.expireAfterWrite:指定時間內沒有創建/覆蓋時,會移除該key,下次取的時候觸發"同步load"(一個線程執行load) 31 * 2.refreshAfterWrite:指定時間內沒有被創建/覆蓋,則指定時間過后,再次訪問時,會去刷新該緩存,在新值沒有到來之前,始終返回舊值 32 * "異步reload"(也是一個線程執行reload) 33 * 3.expireAfterAccess:指定時間內沒有讀寫,會移除該key,下次取的時候從loading中取 34 * 區別:指定時間過后,expire是remove該key,下次訪問是同步去獲取返回新值; 35 * 而refresh則是指定時間后,不會remove該key,下次訪問會觸發刷新,新值沒有回來時返回舊值 36 * 37 * 同時使用:可避免定時刷新+定時刪除下次訪問載入 38 */ 39 private static final LoadingCache<String, String> cache = CacheBuilder.newBuilder() 40 .maximumSize(1000) 41 //.refreshAfterWrite(1, TimeUnit.SECONDS) 42 .expireAfterWrite(1, TimeUnit.SECONDS) 43 //.expireAfterAccess(1,TimeUnit.SECONDS) 44 .build(new CacheLoader<String, String>() { 45 @Override 46 public String load(String key) throws Exception { 47 System.out.println(Thread.currentThread().getName() +"==load start=="+",時間=" + new Date()); 48 // 模擬同步重載耗時2秒 49 Thread.sleep(2000); 50 String value = "load-" + new Random().nextInt(10); 51 System.out.println( 52 Thread.currentThread().getName() + "==load end==同步耗時2秒重載數據-key=" + key + ",value="+value+",時間=" + new Date()); 53 return value; 54 } 55 56 @Override 57 public ListenableFuture<String> reload(final String key, final String oldValue) 58 throws Exception { 59 System.out.println( 60 Thread.currentThread().getName() + "==reload ==異步重載-key=" + key + ",時間=" + new Date()); 61 return service.submit(new Callable<String>() { 62 @Override 63 public String call() throws Exception { 64 /* 模擬異步重載耗時2秒 */ 65 Thread.sleep(2000); 66 String value = "reload-" + new Random().nextInt(10); 67 System.out.println(Thread.currentThread().getName() + "==reload-callable-result="+value+ ",時間=" + new Date()); 68 return value; 69 } 70 }); 71 } 72 }); 73 74 //倒計時器 75 private static CountDownLatch latch = new CountDownLatch(1); 76 77 public static void main(String[] args) throws Exception { 78 79 System.out.println("啟動-設置緩存" + ",時間=" + new Date()); 80 cache.put("name", "張三"); 81 System.out.println("緩存是否存在=" + cache.getIfPresent("name")); 82 //休眠 83 Thread.sleep(2000); 84 //System.out.println("2秒后"+",時間="+new Date()); 85 System.out.println("2秒后,緩存是否存在=" + cache.getIfPresent("name")); 86 //啟動3個線程 87 for (int i = 0; i < 3; i++) { 88 startThread(i); 89 } 90 91 // -1直接=0,喚醒所有線程讀取緩存,模擬並發訪問緩存 92 latch.countDown(); 93 //模擬串行讀緩存 94 Thread.sleep(5000); 95 System.out.println(Thread.currentThread().getName() + "休眠5秒后,讀緩存="+cache.get("name")+",時間=" + new Date()); 96 Thread.sleep(500); 97 System.out.println(Thread.currentThread().getName() + "距離上一次讀0.5秒后,讀緩存="+cache.get("name")+",時間=" + new Date()); 98 Thread.sleep(2000); 99 System.out.println(Thread.currentThread().getName() + "距離上一次讀2秒后,讀緩存="+cache.get("name")+",時間=" + new Date()); 100 } 101 102 private static void startThread(int id) { 103 Thread t = new Thread(new Runnable() { 104 @Override 105 public void run() { 106 try { 107 System.out.println(Thread.currentThread().getName() + "...begin" + ",時間=" + new Date()); 108 //休眠,當倒計時器=0時喚醒線程 109 latch.await(); 110 //讀緩存 111 System.out.println( 112 Thread.currentThread().getName() + "並發讀緩存=" + cache.get("name") + ",時間=" + new Date()); 113 } catch (Exception e) { 114 e.printStackTrace(); 115 } 116 } 117 }); 118 119 t.setName("Thread-" + id); 120 t.start(); 121 } 122 }
結果分析
1.expireAfterWrite:當 創建 或 寫 之后的 有效期到達時,數據會被自動從緩存中移除
啟動-設置緩存,時間=Thu May 17 17:55:36 CST 2018-->主線程啟動,緩存創建完畢並設值,即觸發寫緩存 緩存是否存在=張三 2秒后,緩存是否存在=null--》設定了1秒自動刪除緩存,2秒后緩存不存在 Thread-0...begin,時間=Thu May 17 17:55:38 CST 2018--》38秒時,啟動3個線程模擬並發讀:三個線程讀緩存,由於緩存不存在,阻塞在get方法上,等待其中一個線程去同步load數據 Thread-1...begin,時間=Thu May 17 17:55:38 CST 2018 Thread-2...begin,時間=Thu May 17 17:55:38 CST 2018 Thread-1==load start==,時間=Thu May 17 17:55:38 CST 2018---線程1,同步載入數據load() Thread-1==load end==同步耗時2秒重載數據-key=name,value=load-2,時間=Thu May 17 17:55:40 CST 2018--線程1,同步載入數據load()完畢!,即40秒時寫入數據:load-2 Thread-0並發讀緩存=load-2,時間=Thu May 17 17:55:40 CST 2018---線程1同步載入數據load()完畢后,3個阻塞在get方法的線程得到緩存值:load-2 Thread-1並發讀緩存=load-2,時間=Thu May 17 17:55:40 CST 2018 Thread-2並發讀緩存=load-2,時間=Thu May 17 17:55:40 CST 2018 main==load start==,時間=Thu May 17 17:55:43 CST 2018---主線程訪問緩存不存在,執行load() main==load end==同步耗時2秒重載數據-key=name,value=load-4,時間=Thu May 17 17:55:45 CST 2018---load()完畢!45秒時寫入數據:load-4 main休眠5秒后,讀緩存=load-4,時間=Thu May 17 17:55:45 CST 2018---主線程得到緩存:load-4 main距離上一次讀0.5秒后,讀緩存=load-4,時間=Thu May 17 17:55:45 CST 2018--距離上一次寫才0.5秒,數據有效:load-4 main==load start==,時間=Thu May 17 17:55:47 CST 2018-47秒時,距離上一次寫45秒,超過了1秒,數據無效,再次load() main==load end==同步耗時2秒重載數據-key=name,value=load-8,時間=Thu May 17 17:55:49 CST 2018--49秒時load()完畢:load-8 main距離上一次讀2秒后,讀緩存=load-8,時間=Thu May 17 17:55:49 CST 2018--打印get的緩存結果:load-8
2.expireAfterAccess:當 創建 或 寫 或 讀 之后的 有效期到達時,數據會被自動從緩存中移除
修改測試代碼98、99行:
Thread.sleep(700);
System.out.println(Thread.currentThread().getName() + "距離上一次讀0.5秒后,讀緩存="+cache.get("name")+",時間=" + new Date());
啟動-設置緩存,時間=Thu May 17 18:32:38 CST 2018
緩存是否存在=張三
2秒后,緩存是否存在=null
Thread-0...begin,時間=Thu May 17 18:32:40 CST 2018
Thread-1...begin,時間=Thu May 17 18:32:40 CST 2018
Thread-2...begin,時間=Thu May 17 18:32:40 CST 2018
Thread-2==load start==,時間=Thu May 17 18:32:40 CST 2018
Thread-2==load end==同步耗時2秒重載數據-key=name,value=load-6,時間=Thu May 17 18:32:42 CST 2018
Thread-0並發讀緩存=load-6,時間=Thu May 17 18:32:42 CST 2018
Thread-1並發讀緩存=load-6,時間=Thu May 17 18:32:42 CST 2018
Thread-2並發讀緩存=load-6,時間=Thu May 17 18:32:42 CST 2018
main==load start==,時間=Thu May 17 18:32:45 CST 2018
main==load end==同步耗時2秒重載數據-key=name,value=load-7,時間=Thu May 17 18:32:47 CST 2018----47秒時寫
main休眠5秒后,讀緩存=load-7,時間=Thu May 17 18:32:47 CST 2018
main距離上一次讀0.5秒后,讀緩存=load-7,時間=Thu May 17 18:32:48 CST 2018---48秒讀
main距離上一次讀0.5秒后,讀緩存=load-7,時間=Thu May 17 18:32:49 CST 2018--49秒距離上一次寫47秒,間距大於2秒,但是沒有觸發load() ,因為48秒時又讀了一次,刷新了緩存有效期
3.refreshAfterWrite:當 創建 或 寫 之后的 有效期到達時,數據會被自動刷新(注意不是刪除是刷新)。
啟動-設置緩存,時間=Thu May 17 18:39:59 CST 2018--》59秒寫 緩存是否存在=張三 main==reload ==異步重載-key=name,時間=Thu May 17 18:40:01 CST 2018--》01秒,2秒后距離上次寫超過1秒,reload異步重載 2秒后,緩存是否存在=張三--》距離上一次寫過了2秒,但是會立即返回緩存 Thread-0...begin,時間=Thu May 17 18:40:01 CST 2018--》01秒3個線程並發訪問 Thread-1...begin,時間=Thu May 17 18:40:01 CST 2018 Thread-2...begin,時間=Thu May 17 18:40:01 CST 2018 Thread-2並發讀緩存=張三,時間=Thu May 17 18:40:01 CST 2018--》01秒3個線程都立即得到了緩存 Thread-0並發讀緩存=張三,時間=Thu May 17 18:40:01 CST 2018 Thread-1並發讀緩存=張三,時間=Thu May 17 18:40:01 CST 2018 pool-1-thread-1==reload-callable-result=reload-5,時間=Thu May 17 18:40:03 CST 2018--》01秒時的異步,2秒后也就是03秒時,查詢結果:reload-5 main==reload ==異步重載-key=name,時間=Thu May 17 18:40:06 CST 2018--》06秒時,距離上一次寫時間超過1秒,reload異步重載 main休眠5秒后,讀緩存=reload-5,時間=Thu May 17 18:40:06 CST 2018--》06秒時,reload異步重載,立即返回舊值reload-5 main距離上一次讀0.5秒后,讀緩存=reload-5,時間=Thu May 17 18:40:07 CST 2018 main距離上一次讀0.5秒后,讀緩存=reload-5,時間=Thu May 17 18:40:07 CST 2018 pool-1-thread-2==reload-callable-result=reload-4,時間=Thu May 17 18:40:08 CST 2018--》06秒時的異步重載,2秒后也就是08秒,查詢結果:reload-4
三、源碼剖析
前面一節簡單演示了google cache的幾種用法,本節細看源碼。
3.1 簡介
我們就從構造器CacheBuilder的源碼注釋,來看一下google cache的簡單介紹:
//LoadingCache加載緩存和緩存實例是以下的特性的組合:
1 A builder of LoadingCache and Cache instances having any combination of the following features: 2 automatic loading of entries into the cache-》把數據實體自動載入到緩存中去-》基本特性 3 least-recently-used eviction when a maximum size is exceeded-》當緩存到達最大數量時回收最少使用的數據-》限制最大內存,避免內存被占滿-》高級特性,贊👍 4 time-based expiration of entries, measured since last access or last write-》基於時間的實體有效期,依據最后訪問或寫時間-》基本特性,但很細膩 5 keys automatically wrapped in weak references-》緩存的keys自動用,弱引用封裝-》利於GC回收-》贊👍 6 values automatically wrapped in weak or soft references-》緩存的值values自動封裝在弱引用或者軟引用中-》贊👍 7 notification of evicted (or otherwise removed) entries-》回收或被移除實體可收到通知-》贊👍 8 accumulation of cache access statistics--》緩存的訪問統計-》贊👍
9 These features are all optional; caches can be created using all or none of them. By default cache instances created by CacheBuilder will not perform any type of eviction. 10 Usage example://使用樣例: 11 12 LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder() 13 .maximumSize(10000) 14 .expireAfterWrite(10, TimeUnit.MINUTES) 15 .removalListener(MY_LISTENER) 16 .build( 17 new CacheLoader<Key, Graph>() { 18 public Graph load(Key key) throws AnyException { 19 return createExpensiveGraph(key); 20 } 21 }); 22 Or equivalently,//等同於 23 24 // In real life this would come from a command-line flag or config file 支持字符串載入數據 25 String spec = "maximumSize=10000,expireAfterWrite=10m"; 26 27 LoadingCache<Key, Graph> graphs = CacheBuilder.from(spec) 28 .removalListener(MY_LISTENER) 29 .build( 30 new CacheLoader<Key, Graph>() { 31 public Graph load(Key key) throws AnyException { 32 return createExpensiveGraph(key); 33 } 34 });
//這個緩存被實現成一個類似ConcurrentHashMap高性能的哈希表。是線程安全的,但是其它線程並發修改了這個緩存,會顯示在迭代器訪問中,但是不會報ConcurrentModificationException錯。 35 The returned cache is implemented as a hash table with similar performance characteristics to ConcurrentHashMap. It implements all optional operations of the LoadingCache and Cache interfaces.
The asMap view (and its collection views) have weakly consistent iterators. This means that they are safe for concurrent use, but if other threads modify the cache after the iterator is created,
it is undefined which of these changes, if any, are reflected in that iterator. These iterators never throw ConcurrentModificationException.
//默認使用equals方法(內容相同)判斷key/value的相等,但如果申明了弱引用key 或者 弱/軟引用的value,那么必須使用==判斷相等(內存地址相同) 36 Note: by default, the returned cache uses equality comparisons (the equals method) to determine equality for keys or values. However, if weakKeys() was specified, the cache uses identity (==) comparisons instead for keys. Likewise, if weakValues() or softValues() was specified, the cache uses identity comparisons for values. 37
//很多種情況會導致緩存的數據被剔除
Entries are automatically evicted from the cache when any of maximumSize, maximumWeight, expireAfterWrite, expireAfterAccess, weakKeys, weakValues, or softValues are requested.
38 If maximumSize or maximumWeight is requested entries may be evicted on each cache modification.
//寫后失效或者訪問后失效,實體可能在每個緩存修改時被剔除,Cache.size()可能會被統計到,但肯定是無法訪問到。 39 If expireAfterWrite or expireAfterAccess is requested entries may be evicted on each cache modification, on occasional cache accesses, or on calls to Cache.cleanUp(). Expired entries may be counted by Cache.size(), but will never be visible to read or write operations. 40 If weakKeys, weakValues, or softValues are requested, it is possible for a key or value present in the cache to be reclaimed by the garbage collector. Entries with reclaimed keys or values may be removed from the cache on each cache modification, on occasional cache accesses, or on calls to Cache.cleanUp(); such entries may be counted in Cache.size(), but will never be visible to read or write operations. 41
//這里不用管了...
Certain cache configurations will result in the accrual of periodic maintenance tasks which will be performed during write operations, or during occasional read operations in the absence of writes. The Cache.cleanUp() method of the returned cache will also perform maintenance, but calling it should not be necessary with a high throughput cache. Only caches built with removalListener, expireAfterWrite, expireAfterAccess, weakKeys, weakValues, or softValues perform periodic maintenance. 42 The caches produced by CacheBuilder are serializable, and the deserialized caches retain all the configuration properties of the original cache. Note that the serialized form does not include cache contents, but only configuration.
如上圖所示,我們知道兩點:
1.特性
- 把數據實體自動載入到緩存中去-》基本特性
- 當緩存到達最大數量時回收最少使用的數據-》限制最大內存,避免內存被占滿-》高級特性,贊👍
- 基於時間的實體有效期,依據最后訪問或寫時間-》基本特性,但很細膩
- 緩存的keys自動用,弱引用封裝-》利於GC回收-》贊👍
- 回收或被移除實體可收到通知-》贊👍
- 緩存的訪問統計-》贊👍
2.數據結構
類似ConcurrentHashMap高性能的哈希表。是線程安全的,
3.2 源碼剖析
從上節簡介中我們可以找到幾個需要深度剖析的點:
- 數據結構
- 構造器
- 數據過期重載
- 緩存回收機制
1.數據結構
先看一下google cache 核心類如下:
-
CacheBuilder:類,緩存構建器。構建緩存的入口,指定緩存配置參數並初始化本地緩存。
CacheBuilder在build方法中,會把前面設置的參數,全部傳遞給LocalCache,它自己實際不參與任何計算。采用構造器模式(Builder)使得初始化參數的方法值得借鑒,代碼簡潔易讀。
-
CacheLoader:抽象類。用於從數據源加載數據,定義load、reload、loadAll等操作。
-
Cache:接口,定義get、put、invalidate等操作,這里只有緩存增刪改的操作,沒有數據加載的操作。
-
LoadingCache:接口,繼承自Cache。定義get、getUnchecked、getAll等操作,這些操作都會從數據源load數據。
-
LocalCache:類。整個guava cache的核心類,包含了guava cache的數據結構以及基本的緩存的操作方法。
-
LocalManualCache:LocalCache內部靜態類,實現Cache接口。其內部的增刪改緩存操作全部調用成員變量localCache(LocalCache類型)的相應方法。
-
LocalLoadingCache:LocalCache內部靜態類,繼承自LocalManualCache類,實現LoadingCache接口。其所有操作也是調用成員變量localCache(LocalCache類型)的相應方法。
先來看一張LocalCache的數據結構圖:
如上圖所示:LocalCache類似ConcurrentHashMap采用了分段策略,通過減小鎖的粒度來提高並發,LocalCache中數據存儲在Segment[]中,每個segment又包含5個隊列和一個table,這個table是自定義的一種類數組的結構,每個元素都包含一個ReferenceEntry<k,v>鏈表,指向next entry。
這些隊列,前2個是key、value引用隊列用以加速GC回收,后3個隊列記錄用戶的寫記錄、訪問記錄、高頻訪問順序隊列用以實現LRU算法。AtomicReferenceArray是JUC包下的Doug Lea老李頭設計的類:一組對象引用,其中元素支持原子性更新。
最后是ReferenceEntry:引用數據存儲接口,默認強引用,類圖如下:
2.CacheBuilder構造器
1 private static final LoadingCache<String, String> cache = CacheBuilder.newBuilder() 2 .maximumSize(1000) 3 .refreshAfterWrite(1, TimeUnit.SECONDS) 4 //.expireAfterWrite(1, TimeUnit.SECONDS) 5 //.expireAfterAccess(1,TimeUnit.SECONDS) 6 .build(new CacheLoader<String, String>() { 7 @Override 8 public String load(String key) throws Exception { 9 System.out.println(Thread.currentThread().getName() +"==load start=="+",時間=" + new Date()); 10 // 模擬同步重載耗時2秒 11 Thread.sleep(2000); 12 String value = "load-" + new Random().nextInt(10); 13 System.out.println( 14 Thread.currentThread().getName() + "==load end==同步耗時2秒重載數據-key=" + key + ",value="+value+",時間=" + new Date()); 15 return value; 16 } 17 18 @Override 19 public ListenableFuture<String> reload(final String key, final String oldValue) 20 throws Exception { 21 System.out.println( 22 Thread.currentThread().getName() + "==reload ==異步重載-key=" + key + ",時間=" + new Date()); 23 return service.submit(new Callable<String>() { 24 @Override 25 public String call() throws Exception { 26 /* 模擬異步重載耗時2秒 */ 27 Thread.sleep(2000); 28 String value = "reload-" + new Random().nextInt(10); 29 System.out.println(Thread.currentThread().getName() + "==reload-callable-result="+value+ ",時間=" + new Date()); 30 return value; 31 } 32 }); 33 } 34 });
如上圖所示:CacheBuilder參數設置完畢后最后調用build(CacheLoader )構造,參數是用戶自定義的CacheLoader緩存加載器,復寫一些方法(load,reload),返回LoadingCache接口(一種面向接口編程的思想,實際返回具體實現類)如下圖:
1 public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build( 2 CacheLoader<? super K1, V1> loader) { 3 checkWeightWithWeigher(); 4 return new LocalCache.LocalLoadingCache<K1, V1>(this, loader); 5 }
實際是構造了一個LoadingCache接口的實現類:LocalCache的靜態類LocalLoadingCache,本地加載緩存類。
1 LocalLoadingCache( 2 CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) { 3 super(new LocalCache<K, V>(builder, checkNotNull(loader)));//LocalLoadingCache構造函數需要一個LocalCache作為參數 4 } 5 //構造LocalCache 6 LocalCache( 7 CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) { 8 concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);//默認並發水平是4 9 10 keyStrength = builder.getKeyStrength();//key的強引用 11 valueStrength = builder.getValueStrength(); 12 13 keyEquivalence = builder.getKeyEquivalence();//key比較器 14 valueEquivalence = builder.getValueEquivalence(); 15 16 maxWeight = builder.getMaximumWeight(); 17 weigher = builder.getWeigher(); 18 expireAfterAccessNanos = builder.getExpireAfterAccessNanos();//讀寫后有效期,超時重載 19 expireAfterWriteNanos = builder.getExpireAfterWriteNanos();//寫后有效期,超時重載 20 refreshNanos = builder.getRefreshNanos(); 21 22 removalListener = builder.getRemovalListener();//緩存觸發失效 或者 GC回收軟/弱引用,觸發監聽器 23 removalNotificationQueue =//移除通知隊列 24 (removalListener == NullListener.INSTANCE) 25 ? LocalCache.<RemovalNotification<K, V>>discardingQueue() 26 : new ConcurrentLinkedQueue<RemovalNotification<K, V>>(); 27 28 ticker = builder.getTicker(recordsTime()); 29 entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries()); 30 globalStatsCounter = builder.getStatsCounterSupplier().get(); 31 defaultLoader = loader;//緩存加載器 32 33 int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY); 34 if (evictsBySize() && !customWeigher()) { 35 initialCapacity = Math.min(initialCapacity, (int) maxWeight); 36 }
3.數據過期重載
數據過期不會自動重載,而是通過get操作時執行過期重載。具體就是上面追蹤到了CacheBuilder構造的LocalLoadingCache,類圖如下:
返回LocalCache.LocalLoadingCache后
就可以調用如下方法:
1 static class LocalLoadingCache<K, V> extends LocalManualCache<K, V> 2 implements LoadingCache<K, V> { 3 4 LocalLoadingCache( 5 CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) { 6 super(new LocalCache<K, V>(builder, checkNotNull(loader))); 7 } 8 9 // LoadingCache methods 10 11 @Override 12 public V get(K key) throws ExecutionException { 13 return localCache.getOrLoad(key); 14 } 15 16 @Override 17 public V getUnchecked(K key) { 18 try { 19 return get(key); 20 } catch (ExecutionException e) { 21 throw new UncheckedExecutionException(e.getCause()); 22 } 23 } 24 25 @Override 26 public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException { 27 return localCache.getAll(keys); 28 } 29 30 @Override 31 public void refresh(K key) { 32 localCache.refresh(key); 33 } 34 35 @Override 36 public final V apply(K key) { 37 return getUnchecked(key); 38 } 39 40 // Serialization Support 41 42 private static final long serialVersionUID = 1; 43 44 @Override 45 Object writeReplace() { 46 return new LoadingSerializationProxy<K, V>(localCache); 47 } 48 }
最終get方法
@Override public V get(K key) throws ExecutionException { return localCache.getOrLoad(key); } V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException { int hash = hash(checkNotNull(key));--》計算key的哈希值 return segmentFor(hash).get(key, hash, loader);--》先根據哈希值找到segment,再get返回value } Segment<K, V> segmentFor(int hash) { // TODO(fry): Lazily create segments? return segments[(hash >>> segmentShift) & segmentMask]; } V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { checkNotNull(key); checkNotNull(loader); try { if (count != 0) { // 讀volatile 當前段的元素個數,如果存在元素 // don't call getLiveEntry, which would ignore loading values ReferenceEntry<K, V> e = getEntry(key, hash); if (e != null) { long now = map.ticker.read(); V value = getLiveValue(e, now); if (value != null) { recordRead(e, now);//記錄訪問時間,並添加進最近使用(LRU)隊列 statsCounter.recordHits(1);//命中緩存,基數+1 return scheduleRefresh(e, key, hash, value, now, loader);//刷新值並返回 } ValueReference<K, V> valueReference = e.getValueReference(); if (valueReference.isLoading()) {//如果正在重載數據,等待重載完畢后返回值 return waitForLoadingValue(e, key, valueReference); } } } // 當前segment中找不到實體 return lockedGetOrLoad(key, hash, loader); } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof Error) { throw new ExecutionError((Error) cause); } else if (cause instanceof RuntimeException) { throw new UncheckedExecutionException(cause); } throw ee; } finally { postReadCleanup(); } }
刷新:
1 V scheduleRefresh( 2 ReferenceEntry<K, V> entry, 3 K key, 4 int hash, 5 V oldValue, 6 long now, 7 CacheLoader<? super K, V> loader) { 8 if (map.refreshes() 9 && (now - entry.getWriteTime() > map.refreshNanos) 10 && !entry.getValueReference().isLoading()) { 11 V newValue = refresh(key, hash, loader, true);//重載數據 12 if (newValue != null) {//重載數據成功,直接返回 13 return newValue; 14 } 15 }//否則返回舊值 16 return oldValue; 17 }
刷新核心方法:
1 V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) { 2 final LoadingValueReference<K, V> loadingValueReference = 3 insertLoadingValueReference(key, hash, checkTime); 4 if (loadingValueReference == null) { 5 return null; 6 } 7 //異步重載數據 8 ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader); 9 if (result.isDone()) { 10 try { 11 return Uninterruptibles.getUninterruptibly(result); 12 } catch (Throwable t) { 13 // don't let refresh exceptions propagate; error was already logged 14 } 15 } 16 return null; 17 } 18 19 ListenableFuture<V> loadAsync( 20 final K key, 21 final int hash, 22 final LoadingValueReference<K, V> loadingValueReference, 23 CacheLoader<? super K, V> loader) { 24 final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader); 25 loadingFuture.addListener( 26 new Runnable() { 27 @Override 28 public void run() { 29 try { 30 getAndRecordStats(key, hash, loadingValueReference, loadingFuture); 31 } catch (Throwable t) { 32 logger.log(Level.WARNING, "Exception thrown during refresh", t); 33 loadingValueReference.setException(t); 34 } 35 } 36 }, 37 directExecutor()); 38 return loadingFuture; 39 } 40 41 public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) { 42 try { 43 stopwatch.start(); 44 V previousValue = oldValue.get(); 45 if (previousValue == null) { 46 V newValue = loader.load(key); 47 return set(newValue) ? futureValue : Futures.immediateFuture(newValue); 48 } 49 ListenableFuture<V> newValue = loader.reload(key, previousValue); 50 if (newValue == null) { 51 return Futures.immediateFuture(null); 52 } 53 // To avoid a race, make sure the refreshed value is set into loadingValueReference 54 // *before* returning newValue from the cache query. 55 return transform( 56 newValue, 57 new com.google.common.base.Function<V, V>() { 58 @Override 59 public V apply(V newValue) { 60 LoadingValueReference.this.set(newValue); 61 return newValue; 62 } 63 }, 64 directExecutor()); 65 } catch (Throwable t) { 66 ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t); 67 if (t instanceof InterruptedException) { 68 Thread.currentThread().interrupt(); 69 } 70 return result; 71 } 72 }
如上圖,最終刷新調用的是CacheBuilder中預先設置好的CacheLoader接口實現類的reload方法實現的異步刷新。
返回get主方法,如果當前segment中找不到key對應的實體,同步阻塞重載數據:
1 V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { 2 ReferenceEntry<K, V> e; 3 ValueReference<K, V> valueReference = null; 4 LoadingValueReference<K, V> loadingValueReference = null; 5 boolean createNewEntry = true; 6 7 lock(); 8 try { 9 // re-read ticker once inside the lock 10 long now = map.ticker.read(); 11 preWriteCleanup(now); 12 13 int newCount = this.count - 1; 14 AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; 15 int index = hash & (table.length() - 1); 16 ReferenceEntry<K, V> first = table.get(index); 17 18 for (e = first; e != null; e = e.getNext()) { 19 K entryKey = e.getKey(); 20 if (e.getHash() == hash 21 && entryKey != null 22 && map.keyEquivalence.equivalent(key, entryKey)) { 23 valueReference = e.getValueReference(); 24 if (valueReference.isLoading()) {//如果正在重載,那么不需要重新再新建實體對象 25 createNewEntry = false; 26 } else { 27 V value = valueReference.get(); 28 if (value == null) {//如果被GC回收,添加進移除隊列,等待remove監聽器執行 29 enqueueNotification( 30 entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED); 31 } else if (map.isExpired(e, now)) {//如果緩存過期,添加進移除隊列,等待remove監聽器執行 32 // This is a duplicate check, as preWriteCleanup already purged expired 33 // entries, but let's accomodate an incorrect expiration queue. 34 enqueueNotification( 35 entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED); 36 } else {//不在重載,直接返回value 37 recordLockedRead(e, now); 38 statsCounter.recordHits(1); 39 // we were concurrent with loading; don't consider refresh 40 return value; 41 } 42 43 // immediately reuse invalid entries 44 writeQueue.remove(e); 45 accessQueue.remove(e); 46 this.count = newCount; // write-volatile 47 } 48 break; 49 } 50 } 51 //需要新建實體對象 52 if (createNewEntry) { 53 loadingValueReference = new LoadingValueReference<K, V>(); 54 55 if (e == null) { 56 e = newEntry(key, hash, first); 57 e.setValueReference(loadingValueReference); 58 table.set(index, e);//把新的ReferenceEntry<K, V>引用實體對象添加進table 59 } else { 60 e.setValueReference(loadingValueReference); 61 } 62 } 63 } finally { 64 unlock(); 65 postWriteCleanup(); 66 } 67 //需要新建實體對象 68 if (createNewEntry) { 69 try { 70 // Synchronizes on the entry to allow failing fast when a recursive load is 71 // detected. This may be circumvented when an entry is copied, but will fail fast most 72 // of the time. 73 synchronized (e) {//同步重載數據 74 return loadSync(key, hash, loadingValueReference, loader); 75 } 76 } finally { 77 statsCounter.recordMisses(1); 78 } 79 } else { 80 // 重載中,說明實體已存在,等待重載完畢 81 return waitForLoadingValue(e, key, valueReference); 82 } 83 }
4.緩存回收機制
1)基於容量回收:CacheBuilder.maximumSize(long)
2)定時回收:
expireAfterAccess(long, TimeUnit):緩存項在給定時間內沒有被讀/寫訪問,則回收。
expireAfterWrite(long, TimeUnit):緩存項在給定時間內沒有被寫(創建或覆蓋),則回收。
3)基於引用回收:
CacheBuilder.weakKeys():使用弱引用存儲鍵。當鍵沒有其它(強或軟)引用時,緩存項可以被垃圾回收。因為垃圾回收僅依賴恆等式(==),使用弱引用鍵的緩存用==而不是equals比較鍵。
CacheBuilder.weakValues():使用弱引用存儲值。當值沒有其它(強或軟)引用時,緩存項可以被垃圾回收。因為垃圾回收僅依賴恆等式(==),使用弱引用值的緩存用==而不是equals比較值。
CacheBuilder.softValues():使用軟引用存儲值。軟引用只有在響應內存需要時,才按照全局最近最少使用的順序回收。考慮到使用軟引用的性能影響,我們通常建議使用更有性能預測性的緩存大小限定(見上文,基於容量回收)。使用軟引用值的緩存同樣用==而不是equals比較值。
4)顯式清除:任何時候,你都可以顯式地清除緩存項,而不是等到它被回收,具體如下
- 個別清除:Cache.invalidate(key)
- 批量清除:Cache.invalidateAll(keys)
- 清除所有緩存項:Cache.invalidateAll()
四、總結
優點:
- 采用鎖分段技術,鎖粒度減小,加大並發。
- API優雅,簡單可用,支持多種回收方式。
- 自帶統計功能。
缺點:
- 受內存大小限制不能存儲太多數據
- 單JVM有效,非分布式緩存。多台服務可能有不同效果。