cache
時至今日,大家對緩存想必不在陌生。我們身邊各種系統中或多或少的都存在緩存,自從有個緩存,我們可以減少很多計算壓力,提高應用程序的QPS。
你將某些需要大量計算或查詢的結果,設置過期時間后放入緩存。下次需要使用的時候,先去緩存處查詢是否存在緩存,沒有就直接計算/查詢,並將結果塞入緩存中。
Object result = cache.get(CACHE_KEY); if(result == null){ //重新獲取緩存 result = xxxx(xxx); cache.put(CACHE_KEY,CACHE_TTL,result); } return result;
Bingo~~,一切都在掌握之中,程序如此完美,可以支撐更大的訪問壓力了。
不過,這樣的獲取緩存的邏輯,真的沒有問題嗎?
高並發下暴露問題
你的程序一直正常運行,直到某一日,運營的同事急匆匆的跑來找到你,你的程序掛了,可能是XXX在大量抓你的數據。我們重啟了應用也沒用,沒幾秒程序又掛了。
機智的你通過簡單的排查,得出數據庫頂不住訪問壓力,順利的將鍋甩走。 不過仔細一想,我們不是有緩存嗎,怎么緩存沒起作用? 查看下緩存,一切正常,也沒發現什么問題啊?
進過各種debug、查日志、測試環境模擬,花了整整一下午,你終於找到罪魁禍首,原因很簡單,正是我們沒有使用正確的姿勢使用緩存~~~
問題分析
這里我們排除熔斷、限流等外部措施,單純討論緩存問題。
假設你的應用需要訪問某個資源(數據庫/服務),其能支撐的最大QPS為100。為了提高應用QPS,我們加入緩存,並將緩存過期時間設置為X秒。此時,有個200並發的請求訪問我們系統中某一路徑,這些請求對應的都是同一個緩存KEY,但是這個鍵已經過期了。此時,則會瞬間產生200個線程訪問下游資源,下游資源便有可能瞬間就奔潰了~~~
我們有什么更好的方法獲取緩存嗎?當然有,這里通過guava cache來看下google是怎么處理獲取緩存的。
guava 和 guava cache
guava是一個google發布的一個開源java工具庫,其中guava cacha提供了一個輕量級的本地緩存實現機制,通過guava cache,我們可以輕松實現本地緩存。其中,guava cacha對緩存不存在或者過期情況下,獲取緩存值得過程稱之為Loading。
直接上代碼,看看guava cache是如何get一個緩存的。
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { ... try { if(this.count != 0) { LocalCache.ReferenceEntry ee = this.getEntry(key, hash); if(ee != null) { long cause1 = this.map.ticker.read(); Object value = this.getLiveValue(ee, cause1); if(value != null) { this.recordRead(ee, cause1); this.statsCounter.recordHits(1); Object valueReference1 = this.scheduleRefresh(ee, key, hash, value, cause1, loader); return valueReference1; } LocalCache.ValueReference valueReference = ee.getValueReference(); if(valueReference.isLoading()) { Object var9 = this.waitForLoadingValue(ee, key, valueReference); return var9; } } } Object ee1 = this.lockedGetOrLoad(key, hash, loader); return ee1; } catch (ExecutionException var13) { ... } finally { ... } }
可見,核心邏輯主要在scheduleRefresh(...)和lockedGetOrLoad(...)中。
先看和lockedGetOrLoad,
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { LocalCache.ValueReference valueReference = null; LocalCache.LoadingValueReference loadingValueReference = null; boolean createNewEntry = true; //先加鎖 this.lock(); LocalCache.ReferenceEntry e; try { long now = this.map.ticker.read(); this.preWriteCleanup(now); int newCount = this.count - 1; AtomicReferenceArray table = this.table; int index = hash & table.length() - 1; LocalCache.ReferenceEntry first = (LocalCache.ReferenceEntry)table.get(index); for(e = first; e != null; e = e.getNext()) { Object entryKey = e.getKey(); if(e.getHash() == hash && entryKey != null && this.map.keyEquivalence.equivalent(key, entryKey)) { valueReference = e.getValueReference(); //判斷是否有其他線程正在執行loading動作 if(valueReference.isLoading()) { createNewEntry = false; } else { Object value = valueReference.get(); if(value == null) { this.enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED); } else { //有值且沒有過期,直接返回 if(!this.map.isExpired(e, now)) { this.recordLockedRead(e, now); this.statsCounter.recordHits(1); Object var16 = value; return var16; } this.enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED); } this.writeQueue.remove(e); this.accessQueue.remove(e); this.count = newCount; } break; } } //創建一個LoadingValueReference if(createNewEntry) { loadingValueReference = new LocalCache.LoadingValueReference(); if(e == null) { e = this.newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { ... } if(createNewEntry) { Object var9; try { //沒有其他線程在loading情況下,同步Loading獲取值 synchronized(e) { var9 = this.loadSync(key, hash, loadingValueReference, loader); } } finally { this.statsCounter.recordMisses(1); } return var9; } else { //等待其他線程返回值 return this.waitForLoadingValue(e, key, valueReference); } }
可見正常情況下,guava會單線程處理回源動作,其他並發的線程等待處理線程Loading完成后直接返回其結果。這樣也就避免了多線程同時對同一資源並發Loading的情況發生。
不過,這樣雖然只有一個線程去執行loading動作,但是其他線程會等待loading線程接受后才能一同返回接口。此時,guava cache通過刷新策略,直接返回舊的緩存值,並生成一個線程去處理loading,處理完成后更新緩存值和過期時間。guava 稱之為異步模式。
V scheduleRefresh(LocalCache.ReferenceEntry<K, V> entry, K key, int hash, V oldValue, long now, CacheLoader<? super K, V> loader) { if(this.map.refreshes() && now - entry.getWriteTime() > this.map.refreshNanos && !entry.getValueReference().isLoading()) { Object newValue = this.refresh(key, hash, loader, true); if(newValue != null) { return newValue; } } return oldValue; }
Refreshing is not quite the same as eviction. As specified in LoadingCache.refresh(K), refreshing a key loads a new value for the key, possibly asynchronously. The old value (if any) is still returned while the key is being refreshed, in contrast to eviction, which forces retrievals to wait until the value is loaded anew.
此外guava還提供了同步模式,相對於異步模式,唯一的區別是有一個請求線程去執行loading,其他線程返回過期值。(目前spirng cache中,還未支持guava cahce的同步刷新)
@Beta @GwtIncompatible("To be supported (synchronously).") public CacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) { Preconditions.checkNotNull(unit); Preconditions.checkState(this.refreshNanos == -1L, "refresh was already set to %s ns", new Object[]{Long.valueOf(this.refreshNanos)}); Preconditions.checkArgument(duration > 0L, "duration must be positive: %s %s", new Object[]{Long.valueOf(duration), unit}); this.refreshNanos = unit.toNanos(duration); return this; }
總結
看似簡單的獲取緩存值的業務邏輯沒想到還暗藏玄機。當然,這里guava cache只是本地緩存,如果依葫蘆畫瓢用在redis等分布式緩存時,勢必還要考慮更多的地方。
最后,如果喜歡本文,請點贊~~~~