一、思考和猜想
首先看一下三種基於時間的清理或刷新緩存數據的方式:
expireAfterAccess: 當緩存項在指定的時間段內沒有被讀或寫就會被回收。
expireAfterWrite:當緩存項在指定的時間段內沒有更新就會被回收。
refreshAfterWrite:當緩存項上一次更新操作之后的多久會被刷新。
考慮到時效性,我們可以使用expireAfterWrite,使每次更新之后的指定時間讓緩存失效,然后重新加載緩存。guava cache會嚴格限制只有1個加載操作,這樣會很好地防止緩存失效的瞬間大量請求穿透到后端引起雪崩效應。
然而,通過分析源碼,guava cache在限制只有1個加載操作時進行加鎖,其他請求必須阻塞等待這個加載操作完成;而且,在加載完成之后,其他請求的線程會逐一獲得鎖,去判斷是否已被加載完成,每個線程必須輪流地走一個“獲得鎖,獲得值,釋放鎖”的過程,這樣性能會有一些損耗。這里由於我們計划本地緩存1秒,所以頻繁的過期和加載,鎖等待等過程會讓性能有較大的損耗。
因此我們考慮使用refreshAfterWrite。refreshAfterWrite的特點是,在refresh的過程中,嚴格限制只有1個重新加載操作,而其他查詢先返回舊值,這樣可以有效地減少等待和鎖爭用,所以refreshAfterWrite會比expireAfterWrite性能好。但是它也有一個缺點,因為到達指定時間后,它不能嚴格保證所有的查詢都獲取到新值。了解過guava cache的定時失效(或刷新)原來的同學都知道,guava cache並沒使用額外的線程去做定時清理和加載的功能,而是依賴於查詢請求。在查詢的時候去比對上次更新的時間,如超過指定時間則進行加載或刷新。所以,如果使用refreshAfterWrite,在吞吐量很低的情況下,如很長一段時間內沒有查詢之后,發生的查詢有可能會得到一個舊值(這個舊值可能來自於很長時間之前),這將會引發問題。
可以看出refreshAfterWrite和expireAfterWrite兩種方式各有優缺點,各有使用場景。那么能否在refreshAfterWrite和expireAfterWrite找到一個折中?比如說控制緩存每1s進行refresh,如果超過2s沒有訪問,那么則讓緩存失效,下次訪問時不會得到舊值,而是必須得待新值加載。由於guava官方文檔沒有給出一個詳細的解釋,查閱一些網上資料也沒有得到答案,因此只能對源碼進行分析,尋找答案。經過分析,當同時使用兩者的時候,可以達到預想的效果,這真是一個好消息吶!
二、源碼分析
通過追蹤LoadingCache的get方法源碼,發現最終會調用以下核心方法,下面貼出源碼:
com.google.common.cache.LocalCache.Segment.get方法:
這個緩沖的get方法,編號1是判斷是否有存活值,即根據expireAfterAccess和expireAfterWrite進行判斷是否過期,如果過期,則value為null,執行編號3。編號2指不過期的情況下,根據refreshAfterWrite判斷是否需要refresh。而編號3是需要進行加載(load而非reload),原因是沒有存活值,可能因為過期,可能根本就沒有過該值。從段代碼來看,在get的時候,是先判斷過期,再判斷refresh,所以我們可以通過設置refreshAfterWrite為1s,將expireAfterWrite 設為2s,當訪問頻繁的時候,會在每秒都進行refresh,而當超過2s沒有訪問,下一次訪問必須load新值。
我們繼續順藤摸瓜,順帶看看load和refresh分別都做了什么事情,驗證以下上面說的理論。
下面看看 com.google.common.cache.LocalCache.Segment.lockedGetOrLoad方法:
這個方法有點長,限於篇幅,沒有貼出全部代碼,關鍵步驟有7步:
1、獲得鎖;
2、獲得key對應的valueReference;
3、判斷是否該緩存值正在loading,如果loading,則不再進行load操作(通過設置createNewEntry為false),后續會等待獲取新值;
4、如果不是在loading,判斷是否已經有新值了(被其他請求load完了),如果是則返回新值;
5、准備loading,設置為loadingValueReference。loadingValueReference 會使其他請求在步驟3的時候會發現正在loding;
6、釋放鎖;
7、如果真的需要load,則進行load操作。
通過分析發現,只會有1個load操作,其他get會先阻塞住,驗證了之前的理論。
下面看看com.google.common.cache.LocalCache.Segment.scheduleRefresh方法:
1、判斷是否需要refresh,且當前非loading狀態,如果是則進行refresh操作,並返回新值。
2、步驟2是我加上去的,為后面的測試做准備。如果需要refresh,但是有其他線程正在對該值進行refreshing,則打印,最終會返回舊值。
繼續深入步驟1中調用的refresh方法:
1、插入loadingValueReference,表示該值正在loading,其他請求根據此判斷是需要進行refresh還是返回舊值。insertLoadingValueReference里有加鎖操作,確保只有1個refresh穿透到后端。限於篇幅,這里不再展開。但是,這里加鎖的范圍比load時候加鎖的范圍要小,在expire->load的過程,所有的get一旦知道expire,則需要獲得鎖,直到得到新值為止,阻塞的影響范圍會是從expire到load到新值為止;而refresh->reload的過程,一旦get發現需要refresh,會先判斷是否有loading,再去獲得鎖,然后釋放鎖之后再去reload,阻塞的范圍只是insertLoadingValueReference的一個小對象的new和set操作,幾乎可以忽略不計,所以這是之前說refresh比expire高效的原因之一。
2、進行refresh操作,這里不對loadAsync進行展開,它調用了CacheLoader的reload方法,reload方法支持重載去實現異步的加載,而當前線程返回舊值,這樣性能會更好,其默認是同步地調用了CacheLoader的load方法實現。
到這里,我們知道了refresh和expire的區別了吧!refresh執行reload,而expire后會重新執行load,和初始化時一樣。
三、測試和驗證
在上面貼出的源碼,大家應該注意到一些System.out.println語句,這些是我加上去的,便於后續進行測試驗證。現在就來對剛剛的分析進行程序驗證。
貼出測試的源碼:
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache ; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; public class ConcurrentTest { private static final int CONCURRENT_NUM = 10;//並發數 private volatile static int value = 1; private static LoadingCache <String, String> cache = CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(5, TimeUnit. SECONDS) .refreshAfterWrite(1, TimeUnit. SECONDS) .build(newCacheLoader<String, String>() { public String load(String key) throws InterruptedException { System. out.println( "load by " + Thread.currentThread().getName()); return createValue(key); } @Override public Listenable Future<String> reload(String key, String oldValue) throwsException { System. out.println( "reload by " + Thread.currentThread().getName()); return Futures.immediateFuture(createValue(key )); } } ); //創建value private static String createValue(String key) throws InterruptedException{ Thread. sleep(1000L);//讓當前線程sleep 1秒,是為了測試load和reload時候的並發特性 return String.valueOf(value++); } public static void main(String[] args) throws InterruptedException, ExecutionException { CyclicBarrier barrier = newCyclicBarrier(CONCURRENT_NUM ); CountDownLatch latch = newCountDownLatch(CONCURRENT_NUM ); for(inti = 0; i < CONCURRENT_NUM; i++) { finalClientRunnable runnable = newClientRunnable(barrier, latch ); Thread thread = newThread( runnable, "client-"+ i); thread.start(); } //測試一段時間不訪問后是否執行expire而不是refresh latch.await(); Thread.sleep(5100L); System.out.println( "\n超過expire時間未讀之后..."); System.out.println(Thread. currentThread().getName() + ",val:"+ cache .get("key")); } static class Client Runnable implementsRunnable{ CyclicBarrier barrier; CountDownLatch latch; public Client Runnable(CyclicBarrier barrier, CountDownLatch latch){ this.barrier = barrier; this.latch = latch; } public void run() { try{ barrier.await(); Thread.sleep((long)(Math.random()*4000));//每個client隨機睡眠,為了充分測試refresh和load System.out.println(Thread. currentThread().getName() + ",val:"+ cache .get("key")); latch.countDown(); }catch(Exception e) { e.printStackTrace(); } } } }
執行結果:
驗證結果和預期一致:
1、在緩存還沒初始化的時候,client-1最新獲得了load鎖,進行load操作,在進行load的期間,其他client也到達進入load過程,阻塞,等待client-1釋放鎖,再依次獲得鎖。最終只load by client-1。
2、當超過了refreshAfterWrite設定的時間之內沒有訪問,需要進行refresh,client-5進行 refresh,在這個過程中,其他client並沒有獲得鎖,而是直接查詢舊值,直到refresh后才得到新值,過渡平滑。
3、在超過了expireAfterWrite設定的時間內沒有訪問,main線程在訪問的時候,值已經過期,需要進行load操作,而不會得到舊值。
轉載於:https://blog.csdn.net/abc86319253/article/details/53020432