概要
Caffeine是一個高性能,高命中率,低內存占用,near optimal 的本地緩存,簡單來說它是 Guava Cache 的優化加強版,有些文章把 Caffeine 稱為“新一代的緩存”、“現代緩存之王”。
本文將重點講解 Caffeine 的高性能設計,以及對應部分的源碼分析。
與 Guava Cache 比較
大家都知道,Spring5 即將放棄掉 Guava Cache 作為緩存機制,而改用 Caffeine 作為新的本地 Cache 的組件,這對於 Caffeine 來說是一個很大的肯定。為什么 Spring 會這樣做呢?其實在 Caffeine 的Benchmarks里給出了好靚仔的數據,對讀和寫的場景,還有跟其他幾個緩存工具進行了比較,Caffeine 的性能都表現很突出。
使用 Caffeine
Caffeine 為了方便大家使用以及從 Guava Cache 切換過來(很有針對性啊~),借鑒了 Guava Cache 大部分的概念(諸如核心概念Cache
、LoadingCache
、CacheLoader
、CacheBuilder
等等),對於 Caffeine 的理解只要把它當作 Guava Cache 就可以了。
使用上,大家只要把 Caffeine 的包引進來,然后換一下 cache 的實現類,基本應該就沒問題了。這對與已經使用過 Guava Cache 的同學來說沒有任何難度,甚至還有一點熟悉的味道,如果你之前沒有使用過 Guava Cache,可以查看 Caffeine 的官方 API 說明文檔,其中Population
,Eviction
,Removal
,Refresh
,Statistics
,Cleanup
,Policy
等等這些特性都是跟 Guava Cache 基本一樣的。
下面給出一個例子說明怎樣創建一個 Cache:
private static LoadingCache<String, String> cache = Caffeine.newBuilder()
//最大個數限制
.maximumSize(256L)
//初始化容量
.initialCapacity(1)
//訪問后過期(包括讀和寫)
.expireAfterAccess(2, TimeUnit.DAYS)
//寫后過期
.expireAfterWrite(2, TimeUnit.HOURS)
//寫后自動異步刷新
.refreshAfterWrite(1, TimeUnit.HOURS)
//記錄下緩存的一些統計數據,例如命中率等
.recordStats()
//cache對緩存寫的通知回調
.writer(new CacheWriter<Object, Object>() {
@Override
public void write(@NonNull Object key, @NonNull Object value) {
log.info("key={}, CacheWriter write", key);
}
@Override
public void delete(@NonNull Object key, @Nullable Object value, @NonNull RemovalCause cause) {
log.info("key={}, cause={}, CacheWriter delete", key, cause);
}
})
//使用CacheLoader創建一個LoadingCache
.build(new CacheLoader<String, String>() {
//同步加載數據
@Nullable
@Override
public String load(@NonNull String key) throws Exception {
return "value_" + key;
}
//異步加載數據
@Nullable
@Override
public String reload(@NonNull String key, @NonNull String oldValue) throws Exception {
return "value_" + key;
}
});
Caffeine 的高性能設計
判斷一個緩存的好壞最核心的指標就是命中率,影響緩存命中率有很多因素,包括業務場景、淘汰策略、清理策略、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個很重要的指標。下面
我們來看看 Caffeine 在這幾個方面是怎么着手的,如何做優化的。
(注:本文不會分析 Caffeine 全部源碼,只會對核心設計的實現進行分析,但我建議讀者把 Caffeine 的源碼都涉獵一下,有個 overview 才能更好理解本文。如果你看過 Guava Cache 的源碼也行,代碼的數據結構和處理邏輯很類似的。
源碼基於:caffeine-2.8.0.jar)
W-TinyLFU 整體設計
上面說到淘汰策略是影響緩存命中率的因素之一,一般比較簡單的緩存就會直接用到 LFU(Least Frequently Used,即最不經常使用) 或者LRU(Least Recently Used,即最近最少使用) ,而 Caffeine 就是使用了 W-TinyLFU 算法。
W-TinyLFU 看名字就能大概猜出來,它是 LFU 的變種,也是一種緩存淘汰算法。那為什么要使用 W-TinyLFU 呢?
LRU 和 LFU 的缺點
- LRU 實現簡單,在一般情況下能夠表現出很好的命中率,是一個“性價比”很高的算法,平時也很常用。雖然 LRU 對突發性的稀疏流量(sparse bursts)表現很好,但同時也會產生緩存污染,舉例來說,如果偶然性的要對全量數據進行遍歷,那么“歷史訪問記錄”就會被刷走,造成污染。
- 如果數據的分布在一段時間內是固定的話,那么 LFU 可以達到最高的命中率。但是 LFU 有兩個缺點,第一,它需要給每個記錄項維護頻率信息,每次訪問都需要更新,這是個巨大的開銷;第二,對突發性的稀疏流量無力,因為前期經常訪問的記錄已經占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被訪問的記錄在將來也不一定會使用上,這樣就一直把“坑”占着了。
無論 LRU 還是 LFU 都有其各自的缺點,不過,現在已經有很多針對其缺點而改良、優化出來的變種算法。
TinyLFU
TinyLFU 就是其中一個優化算法,它是專門為了解決 LFU 上述提到的兩個問題而被設計出來的。
解決第一個問題是采用了 Count–Min Sketch 算法。
解決第二個問題是讓記錄盡量保持相對的“新鮮”(Freshness Mechanism),並且當有新的記錄插入時,可以讓它跟老的記錄進行“PK”,輸者就會被淘汰,這樣一些老的、不再需要的記錄就會被剔除。
下圖是 TinyLFU 設計圖(來自官方)
統計頻率 Count–Min Sketch 算法
如何對一個 key 進行統計,但又可以節省空間呢?(不是簡單的使用HashMap
,這太消耗內存了),注意哦,不需要精確的統計,只需要一個近似值就可以了,怎么樣,這樣場景是不是很熟悉,如果你是老司機,或許已經聯想到布隆過濾器(Bloom Filter)的應用了。
沒錯,將要介紹的 Count–Min Sketch 的原理跟 Bloom Filter 一樣,只不過 Bloom Filter 只有 0 和 1 的值,那么你可以把 Count–Min Sketch 看作是“數值”版的 Bloom Filter。
更多關於 Count–Min Sketch 的介紹請自行搜索。
在 TinyLFU 中,近似頻率的統計如下圖所示:
對一個 key 進行多次 hash 函數后,index 到多個數組位置后進行累加,查詢時取多個值中的最小值即可。
Caffeine 對這個算法的實現在FrequencySketch
類。但 Caffeine 對此有進一步的優化,例如 Count–Min Sketch 使用了二維數組,Caffeine 只是用了一個一維的數組;再者,如果是數值類型的話,這個數需要用 int 或 long 來存儲,但是 Caffeine 認為緩存的訪問頻率不需要用到那么大,只需要 15 就足夠,一般認為達到 15 次的頻率算是很高的了,而且 Caffeine 還有另外一個機制來使得這個頻率進行衰退減半(下面就會講到)。如果最大是 15 的話,那么只需要 4 個 bit 就可以滿足了,一個 long 有 64bit,可以存儲 16 個這樣的統計數,Caffeine 就是這樣的設計,使得存儲效率提高了 16 倍。
Caffeine 對緩存的讀寫(afterRead
和afterWrite
方法)都會調用onAccess
s 方法,而onAccess
方法里有一句:
frequencySketch().increment(key);
這句就是追加記錄的頻率,下面我們看看具體實現
//FrequencySketch的一些屬性
//種子數
static final long[] SEED = { // A mixture of seeds from FNV-1a, CityHash, and Murmur3
0xc3a5c85c97cb3127L, 0xb492b66fbe98f273L, 0x9ae16a3b2f90404fL, 0xcbf29ce484222325L};
static final long RESET_MASK = 0x7777777777777777L;
static final long ONE_MASK = 0x1111111111111111L;
int sampleSize;
//為了快速根據hash值得到table的index值的掩碼
//table的長度size一般為2的n次方,而tableMask為size-1,這樣就可以通過&操作來模擬取余操作,速度快很多,老司機都知道
int tableMask;
//存儲數據的一維long數組
long[] table;
int size;
/**
* Increments the popularity of the element if it does not exceed the maximum (15). The popularity
* of all elements will be periodically down sampled when the observed events exceeds a threshold.
* This process provides a frequency aging to allow expired long term entries to fade away.
*
* @param e the element to add
*/
public void increment(@NonNull E e) {
if (isNotInitialized()) {
return;
}
//根據key的hashCode通過一個哈希函數得到一個hash值
//本來就是hashCode了,為什么還要再做一次hash?怕原來的hashCode不夠均勻分散,再打散一下。
int hash = spread(e.hashCode());
//這句光看有點難理解
//就如我剛才說的,Caffeine把一個long的64bit划分成16個等分,每一等分4個bit。
//這個start就是用來定位到是哪一個等分的,用hash值低兩位作為隨機數,再左移2位,得到一個小於16的值
int start = (hash & 3) << 2;
//indexOf方法的意思就是,根據hash值和不同種子得到table的下標index
//這里通過四個不同的種子,得到四個不同的下標index
int index0 = indexOf(hash, 0);
int index1 = indexOf(hash, 1);
int index2 = indexOf(hash, 2);
int index3 = indexOf(hash, 3);
//根據index和start(+1, +2, +3)的值,把table[index]對應的等分追加1
//這個incrementAt方法有點難理解,看我下面的解釋
boolean added = incrementAt(index0, start);
added |= incrementAt(index1, start + 1);
added |= incrementAt(index2, start + 2);
added |= incrementAt(index3, start + 3);
//這個reset等下說
if (added && (++size == sampleSize)) {
reset();
}
}
/**
* Increments the specified counter by 1 if it is not already at the maximum value (15).
*
* @param i the table index (16 counters)
* @param j the counter to increment
* @return if incremented
*/
boolean incrementAt(int i, int j) {
//這個j表示16個等分的下標,那么offset就是相當於在64位中的下標(這個自己想想)
int offset = j << 2;
//上面提到Caffeine把頻率統計最大定為15,即0xfL
//mask就是在64位中的掩碼,即1111后面跟很多個0
long mask = (0xfL << offset);
//如果&的結果不等於15,那么就追加1。等於15就不會再加了
if ((table[i] & mask) != mask) {
table[i] += (1L << offset);
return true;
}
return false;
}
/**
* Returns the table index for the counter at the specified depth.
*
* @param item the element's hash
* @param i the counter depth
* @return the table index
*/
int indexOf(int item, int i) {
long hash = SEED[i] * item;
hash += hash >>> 32;
return ((int) hash) & tableMask;
}
/**
* Applies a supplemental hash function to a given hashCode, which defends against poor quality
* hash functions.
*/
int spread(int x) {
x = ((x >>> 16) ^ x) * 0x45d9f3b;
x = ((x >>> 16) ^ x) * 0x45d9f3b;
return (x >>> 16) ^ x;
}
知道了追加方法,那么讀取方法frequency
就很容易理解了。
/**
* Returns the estimated number of occurrences of an element, up to the maximum (15).
*
* @param e the element to count occurrences of
* @return the estimated number of occurrences of the element; possibly zero but never negative
*/
@NonNegative
public int frequency(@NonNull E e) {
if (isNotInitialized()) {
return 0;
}
//得到hash值,跟上面一樣
int hash = spread(e.hashCode());
//得到等分的下標,跟上面一樣
int start = (hash & 3) << 2;
int frequency = Integer.MAX_VALUE;
//循環四次,分別獲取在table數組中不同的下標位置
for (int i = 0; i < 4; i++) {
int index = indexOf(hash, i);
//這個操作就不多說了,其實跟上面incrementAt是一樣的,定位到table[index] + 等分的位置,再根據mask取出計數值
int count = (int) ((table[index] >>> ((start + i) << 2)) & 0xfL);
//取四個中的較小值
frequency = Math.min(frequency, count);
}
return frequency;
}
通過代碼和注釋或者讀者可能難以理解,下圖是我畫出來幫助大家理解的結構圖。
注意紫色虛線框,其中藍色小格就是需要計算的位置:
保新機制
為了讓緩存保持“新鮮”,剔除掉過往頻率很高但之后不經常的緩存,Caffeine 有一個 Freshness Mechanism。做法很簡答,就是當整體的統計計數(當前所有記錄的頻率統計之和,這個數值內部維護)達到某一個值時,那么所有記錄的頻率統計除以 2。
從上面的代碼
//size變量就是所有記錄的頻率統計之,即每個記錄加1,這個size都會加1
//sampleSize一個閾值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍
if (added && (++size == sampleSize)) {
reset();
}
看到reset
方法就是做這個事情
/** Reduces every counter by half of its original value. */
void reset() {
int count = 0;
for (int i = 0; i < table.length; i++) {
count += Long.bitCount(table[i] & ONE_MASK);
table[i] = (table[i] >>> 1) & RESET_MASK;
}
size = (size >>> 1) - (count >>> 2);
}
關於這個 reset 方法,為什么是除以 2,而不是其他,及其正確性,在最下面的參考資料的 TinyLFU 論文中 3.3 章節給出了數學證明,大家有興趣可以看看。
增加一個 Window?
Caffeine 通過測試發現 TinyLFU 在面對突發性的稀疏流量(sparse bursts)時表現很差,因為新的記錄(new items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。
於是 Caffeine 設計出一種新的 policy,即 Window Tiny LFU(W-TinyLFU),並通過實驗和實踐發現 W-TinyLFU 比 TinyLFU 表現的更好。
W-TinyLFU 的設計如下所示(兩圖等價):
它主要包括兩個緩存模塊,主緩存是 SLRU(Segmented LRU,即分段 LRU),SLRU 包括一個名為 protected 和一個名為 probation 的緩存區。通過增加一個緩存區(即 Window Cache),當有新的記錄插入時,會先在 window 區呆一下,就可以避免上述說的 sparse bursts 問題。在公眾號頂級架構師回復“架構整潔”,獲取驚喜禮包。
淘汰策略(eviction policy)
當 window 區滿了,就會根據 LRU 把 candidate(即淘汰出來的元素)放到 probation 區,如果 probation 區也滿了,就把 candidate 和 probation 將要淘汰的元素 victim,兩個進行“PK”,勝者留在 probation,輸者就要被淘汰了。
而且經過實驗發現當 window 區配置為總容量的 1%,剩余的 99%當中的 80%分給 protected 區,20%分給 probation 區時,這時整體性能和命中率表現得最好,所以 Caffeine 默認的比例設置就是這個。
不過這個比例 Caffeine 會在運行時根據統計數據(statistics)去動態調整,如果你的應用程序的緩存隨着時間變化比較快的話,那么增加 window 區的比例可以提高命中率,相反緩存都是比較固定不變的話,增加 Main Cache 區(protected 區 +probation 區)的比例會有較好的效果。
下面我們看看上面說到的淘汰策略是怎么實現的:
一般緩存對讀寫操作后都有后續的一系列“維護”操作,Caffeine 也不例外,這些操作都在maintenance
方法,我們將要說到的淘汰策略也在里面。
這方法比較重要,下面也會提到,所以這里只先說跟“淘汰策略”有關的evictEntries
和climb
。
/**
* Performs the pending maintenance work and sets the state flags during processing to avoid
* excess scheduling attempts. The read buffer, write buffer, and reference queues are
* drained, followed by expiration, and size-based eviction.
*
* @param task an additional pending task to run, or {@code null} if not present
*/
@GuardedBy("evictionLock")
void maintenance(@Nullable Runnable task) {
lazySetDrainStatus(PROCESSING_TO_IDLE);
try {
drainReadBuffer();
drainWriteBuffer();
if (task != null) {
task.run();
}
drainKeyReferences();
drainValueReferences();
expireEntries();
//把符合條件的記錄淘汰掉
evictEntries();
//動態調整window區和protected區的大小
climb();
} finally {
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
lazySetDrainStatus(REQUIRED);
}
}
}
先說一下 Caffeine 對上面說到的 W-TinyLFU 策略的實現用到的數據結構:
//最大的個數限制
long maximum;
//當前的個數
long weightedSize;
//window區的最大限制
long windowMaximum;
//window區當前的個數
long windowWeightedSize;
//protected區的最大限制
long mainProtectedMaximum;
//protected區當前的個數
long mainProtectedWeightedSize;
//下一次需要調整的大小(還需要進一步計算)
double stepSize;
//window區需要調整的大小
long adjustment;
//命中計數
int hitsInSample;
//不命中的計數
int missesInSample;
//上一次的緩存命中率
double previousSampleHitRate;
final FrequencySketch<K> sketch;
//window區的LRU queue(FIFO)
final AccessOrderDeque<Node<K, V>> accessOrderWindowDeque;
//probation區的LRU queue(FIFO)
final AccessOrderDeque<Node<K, V>> accessOrderProbationDeque;
//protected區的LRU queue(FIFO)
final AccessOrderDeque<Node<K, V>> accessOrderProtectedDeque;
以及默認比例設置(意思看注釋)
/** The initial percent of the maximum weighted capacity dedicated to the main space. */
static final double PERCENT_MAIN = 0.99d;
/** The percent of the maximum weighted capacity dedicated to the main's protected space. */
static final double PERCENT_MAIN_PROTECTED = 0.80d;
/** The difference in hit rates that restarts the climber. */
static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d;
/** The percent of the total size to adapt the window by. */
static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
/** The rate to decrease the step size to adapt by. */
static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d;
/** The maximum number of entries that can be transfered between queues. */
重點來了,evictEntries
和climb
方法:
/** Evicts entries if the cache exceeds the maximum. */
@GuardedBy("evictionLock")
void evictEntries() {
if (!evicts()) {
return;
}
//淘汰window區的記錄
int candidates = evictFromWindow();
//淘汰Main區的記錄
evictFromMain(candidates);
}
/**
* Evicts entries from the window space into the main space while the window size exceeds a
* maximum.
*
* @return the number of candidate entries evicted from the window space
*/
//根據W-TinyLFU,新的數據都會無條件的加到admission window
//但是window是有大小限制,所以要“定期”做一下“維護”
@GuardedBy("evictionLock")
int evictFromWindow() {
int candidates = 0;
//查看window queue的頭部節點
Node<K, V> node = accessOrderWindowDeque().peek();
//如果window區超過了最大的限制,那么就要把“多出來”的記錄做處理
while (windowWeightedSize() > windowMaximum()) {
// The pending operations will adjust the size to reflect the correct weight
if (node == null) {
break;
}
//下一個節點
Node<K, V> next = node.getNextInAccessOrder();
if (node.getWeight() != 0) {
//把node定位在probation區
node.makeMainProbation();
//從window區去掉
accessOrderWindowDeque().remove(node);
//加入到probation queue,相當於把節點移動到probation區(晉升了)
accessOrderProbationDeque().add(node);
candidates++;
//因為移除了一個節點,所以需要調整window的size
setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight());
}
//處理下一個節點
node = next;
}
return candidates;
}
evictFromMain
方法:
/**
* Evicts entries from the main space if the cache exceeds the maximum capacity. The main space
* determines whether admitting an entry (coming from the window space) is preferable to retaining
* the eviction policy's victim. This is decision is made using a frequency filter so that the
* least frequently used entry is removed.
*
* The window space candidates were previously placed in the MRU position and the eviction
* policy's victim is at the LRU position. The two ends of the queue are evaluated while an
* eviction is required. The number of remaining candidates is provided and decremented on
* eviction, so that when there are no more candidates the victim is evicted.
*
* @param candidates the number of candidate entries evicted from the window space
*/
//根據W-TinyLFU,從window晉升過來的要跟probation區的進行“PK”,勝者才能留下
@GuardedBy("evictionLock")
void evictFromMain(int candidates) {
int victimQueue = PROBATION;
//victim是probation queue的頭部
Node<K, V> victim = accessOrderProbationDeque().peekFirst();
//candidate是probation queue的尾部,也就是剛從window晉升來的
Node<K, V> candidate = accessOrderProbationDeque().peekLast();
//當cache不夠容量時才做處理
while (weightedSize() > maximum()) {
// Stop trying to evict candidates and always prefer the victim
if (candidates == 0) {
candidate = null;
}
//對candidate為null且victim為bull的處理
if ((candidate == null) && (victim == null)) {
if (victimQueue == PROBATION) {
victim = accessOrderProtectedDeque().peekFirst();
victimQueue = PROTECTED;
continue;
} else if (victimQueue == PROTECTED) {
victim = accessOrderWindowDeque().peekFirst();
victimQueue = WINDOW;
continue;
}
// The pending operations will adjust the size to reflect the correct weight
break;
}
//對節點的weight為0的處理
if ((victim != null) && (victim.getPolicyWeight() == 0)) {
victim = victim.getNextInAccessOrder();
continue;
} else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
candidate = candidate.getPreviousInAccessOrder();
candidates--;
continue;
}
// Evict immediately if only one of the entries is present
if (victim == null) {
@SuppressWarnings("NullAway")
Node<K, V> previous = candidate.getPreviousInAccessOrder();
Node<K, V> evict = candidate;
candidate = previous;
candidates--;
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
} else if (candidate == null) {
Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
}
// Evict immediately if an entry was collected
K victimKey = victim.getKey();
K candidateKey = candidate.getKey();
if (victimKey == null) {
@NonNull Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
} else if (candidateKey == null) {
candidates--;
@NonNull Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
}
//放不下的節點直接處理掉
if (candidate.getPolicyWeight() > maximum()) {
candidates--;
Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
}
//根據節點的統計頻率frequency來做比較,看看要處理掉victim還是candidate
//admit是具體的比較規則,看下面
candidates--;
//如果candidate勝出則淘汰victim
if (admit(candidateKey, victimKey)) {
Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
candidate = candidate.getPreviousInAccessOrder();
} else {
//如果是victim勝出,則淘汰candidate
Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
}
}
}
/**
* Determines if the candidate should be accepted into the main space, as determined by its
* frequency relative to the victim. A small amount of randomness is used to protect against hash
* collision attacks, where the victim's frequency is artificially raised so that no new entries
* are admitted.
*
* @param candidateKey the key for the entry being proposed for long term retention
* @param victimKey the key for the entry chosen by the eviction policy for replacement
* @return if the candidate should be admitted and the victim ejected
*/
@GuardedBy("evictionLock")
boolean admit(K candidateKey, K victimKey) {
//分別獲取victim和candidate的統計頻率
//frequency這個方法的原理和實現上面已經解釋了
int victimFreq = frequencySketch().frequency(victimKey);
int candidateFreq = frequencySketch().frequency(candidateKey);
//誰大誰贏
if (candidateFreq > victimFreq) {
return true;
//如果相等,candidate小於5都當輸了
} else if (candidateFreq <= 5) {
// The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
// exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
// candidate reduces the number of random acceptances to minimize the impact on the hit rate.
return false;
}
//如果相等且candidate大於5,則隨機淘汰一個
int random = ThreadLocalRandom.current().nextInt();
return ((random & 127) == 0);
}
climb
方法主要是用來調整 window size 的,使得 Caffeine 可以適應你的應用類型(如 OLAP 或 OLTP)表現出最佳的命中率。
下圖是官方測試的數據:
我們看看 window size 的調整是怎么實現的。
調整時用到的默認比例數據:
//與上次命中率之差的閾值
static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d;
//步長(調整)的大小(跟最大值maximum的比例)
static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
//步長的衰減比例
static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d;
/** Adapts the eviction policy to towards the optimal recency / frequency configuration. */
//climb方法的主要作用就是動態調整window區的大小(相應的,main區的大小也會發生變化,兩個之和為100%)。
//因為區域的大小發生了變化,那么區域內的數據也可能需要發生相應的移動。
@GuardedBy("evictionLock")
void climb() {
if (!evicts()) {
return;
}
//確定window需要調整的大小
determineAdjustment();
//如果protected區有溢出,把溢出部分移動到probation區。因為下面的操作有可能需要調整到protected區。
demoteFromMainProtected();
long amount = adjustment();
if (amount == 0) {
return;
} else if (amount > 0) {
//增加window的大小
increaseWindow();
} else {
//減少window的大小
decreaseWindow();
}
}
下面分別展開每個方法來解釋:
/** Calculates the amount to adapt the window by and sets {@link #adjustment()} accordingly. */
@GuardedBy("evictionLock")
void determineAdjustment() {
//如果frequencySketch還沒初始化,則返回
if (frequencySketch().isNotInitialized()) {
setPreviousSampleHitRate(0.0);
setMissesInSample(0);
setHitsInSample(0);
return;
}
//總請求量 = 命中 + miss
int requestCount = hitsInSample() + missesInSample();
//沒達到sampleSize則返回
//默認下sampleSize = 10 * maximum。用sampleSize來判斷緩存是否足夠”熱“。
if (requestCount < frequencySketch().sampleSize) {
return;
}
//命中率的公式 = 命中 / 總請求
double hitRate = (double) hitsInSample() / requestCount;
//命中率的差值
double hitRateChange = hitRate - previousSampleHitRate();
//本次調整的大小,是由命中率的差值和上次的stepSize決定的
double amount = (hitRateChange >= 0) ? stepSize() : -stepSize();
//下次的調整大小:如果命中率的之差大於0.05,則重置為0.065 * maximum,否則按照0.98來進行衰減
double nextStepSize = (Math.abs(hitRateChange) >= HILL_CLIMBER_RESTART_THRESHOLD)
? HILL_CLIMBER_STEP_PERCENT * maximum() * (amount >= 0 ? 1 : -1)
: HILL_CLIMBER_STEP_DECAY_RATE * amount;
setPreviousSampleHitRate(hitRate);
setAdjustment((long) amount);
setStepSize(nextStepSize);
setMissesInSample(0);
setHitsInSample(0);
}
/** Transfers the nodes from the protected to the probation region if it exceeds the maximum. */
//這個方法比較簡單,減少protected區溢出的部分
@GuardedBy("evictionLock")
void demoteFromMainProtected() {
long mainProtectedMaximum = mainProtectedMaximum();
long mainProtectedWeightedSize = mainProtectedWeightedSize();
if (mainProtectedWeightedSize <= mainProtectedMaximum) {
return;
}
for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
if (mainProtectedWeightedSize <= mainProtectedMaximum) {
break;
}
Node<K, V> demoted = accessOrderProtectedDeque().poll();
if (demoted == null) {
break;
}
demoted.makeMainProbation();
accessOrderProbationDeque().add(demoted);
mainProtectedWeightedSize -= demoted.getPolicyWeight();
}
setMainProtectedWeightedSize(mainProtectedWeightedSize);
}
/**
* Increases the size of the admission window by shrinking the portion allocated to the main
* space. As the main space is partitioned into probation and protected regions (80% / 20%), for
* simplicity only the protected is reduced. If the regions exceed their maximums, this may cause
* protected items to be demoted to the probation region and probation items to be demoted to the
* admission window.
*/
//增加window區的大小,這個方法比較簡單,思路就像我上面說的
@GuardedBy("evictionLock")
void increaseWindow() {
if (mainProtectedMaximum() == 0) {
return;
}
long quota = Math.min(adjustment(), mainProtectedMaximum());
setMainProtectedMaximum(mainProtectedMaximum() - quota);
setWindowMaximum(windowMaximum() + quota);
demoteFromMainProtected();
for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
Node<K, V> candidate = accessOrderProbationDeque().peek();
boolean probation = true;
if ((candidate == null) || (quota < candidate.getPolicyWeight())) {
candidate = accessOrderProtectedDeque().peek();
probation = false;
}
if (candidate == null) {
break;
}
int weight = candidate.getPolicyWeight();
if (quota < weight) {
break;
}
quota -= weight;
if (probation) {
accessOrderProbationDeque().remove(candidate);
} else {
setMainProtectedWeightedSize(mainProtectedWeightedSize() - weight);
accessOrderProtectedDeque().remove(candidate);
}
setWindowWeightedSize(windowWeightedSize() + weight);
accessOrderWindowDeque().add(candidate);
candidate.makeWindow();
}
setMainProtectedMaximum(mainProtectedMaximum() + quota);
setWindowMaximum(windowMaximum() - quota);
setAdjustment(quota);
}
/** Decreases the size of the admission window and increases the main's protected region. */
//同上increaseWindow差不多,反操作
@GuardedBy("evictionLock")
void decreaseWindow() {
if (windowMaximum() <= 1) {
return;
}
long quota = Math.min(-adjustment(), Math.max(0, windowMaximum() - 1));
setMainProtectedMaximum(mainProtectedMaximum() + quota);
setWindowMaximum(windowMaximum() - quota);
for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) {
Node<K, V> candidate = accessOrderWindowDeque().peek();
if (candidate == null) {
break;
}
int weight = candidate.getPolicyWeight();
if (quota < weight) {
break;
}
quota -= weight;
setMainProtectedWeightedSize(mainProtectedWeightedSize() + weight);
setWindowWeightedSize(windowWeightedSize() - weight);
accessOrderWindowDeque().remove(candidate);
accessOrderProbationDeque().add(candidate);
candidate.makeMainProbation();
}
setMainProtectedMaximum(mainProtectedMaximum() - quota);
setWindowMaximum(windowMaximum() + quota);
setAdjustment(-quota);
}
以上,是 Caffeine 的 W-TinyLFU 策略的設計原理及代碼實現解析。
異步的高性能讀寫
一般的緩存每次對數據處理完之后(讀的話,已經存在則直接返回,不存在則 load 數據,保存,再返回;寫的話,則直接插入或更新),但是因為要維護一些淘汰策略,則需要一些額外的操作,諸如:
- 計算和比較數據的是否過期
- 統計頻率(像 LFU 或其變種)
- 維護 read queue 和 write queue
- 淘汰符合條件的數據
- 等等。。。
這種數據的讀寫伴隨着緩存狀態的變更,Guava Cache 的做法是把這些操作和讀寫操作放在一起,在一個同步加鎖的操作中完成,雖然 Guava Cache 巧妙地利用了 JDK 的 ConcurrentHashMap(分段鎖或者無鎖 CAS)來降低鎖的密度,達到提高並發度的目的。但是,對於一些熱點數據,這種做法還是避免不了頻繁的鎖競爭。Caffeine 借鑒了數據庫系統的 WAL(Write-Ahead Logging)思想,即先寫日志再執行操作,這種思想同樣適合緩存的,執行讀寫操作時,先把操作記錄在緩沖區,然后在合適的時機異步、批量地執行緩沖區中的內容。但在執行緩沖區的內容時,也是需要在緩沖區加上同步鎖的,不然存在並發問題,只不過這樣就可以把對鎖的競爭從緩存數據轉移到對緩沖區上。在公眾號編程技術圈后台回復“Java”,獲取Java面試題和答案驚喜禮包。
ReadBuffer
在 Caffeine 的內部實現中,為了很好的支持不同的 Features(如 Eviction,Removal,Refresh,Statistics,Cleanup,Policy 等等),擴展了很多子類,它們共同的父類是BoundedLocalCache
,而readBuffer
就是作為它們共有的屬性,即都是用一樣的 readBuffer,看定義:
final Buffer<Node<K, V>> readBuffer;
readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
? new BoundedBuffer<>()
: Buffer.disabled();
上面提到 Caffeine 對每次緩存的讀操作都會觸發afterRead
/**
* Performs the post-processing work required after a read.
*
* @param node the entry in the page replacement policy
* @param now the current time, in nanoseconds
* @param recordHit if the hit count should be incremented
*/
void afterRead(Node<K, V> node, long now, boolean recordHit) {
if (recordHit) {
statsCounter().recordHits(1);
}
//把記錄加入到readBuffer
//判斷是否需要立即處理readBuffer
//注意這里無論offer是否成功都可以走下去的,即允許寫入readBuffer丟失,因為這個
boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
}
refreshIfNeeded(node, now);
}
/**
* Returns whether maintenance work is needed.
*
* @param delayable if draining the read buffer can be delayed
*/
//caffeine用了一組狀態來定義和管理“維護”的過程
boolean shouldDrainBuffers(boolean delayable) {
switch (drainStatus()) {
case IDLE:
return !delayable;
case REQUIRED:
return true;
case PROCESSING_TO_IDLE:
case PROCESSING_TO_REQUIRED:
return false;
default:
throw new IllegalStateException();
}
}
重點看BoundedBuffer
/**
* A striped, non-blocking, bounded buffer.
*
* @author ben.manes@gmail.com (Ben Manes)
* @param <E> the type of elements maintained by this buffer
*/
final class BoundedBuffer<E> extends StripedBuffer<E>
它是一個 striped、非阻塞、有界限的 buffer,繼承於StripedBuffer
類。下面看看StripedBuffer
的實現:
/**
* A base class providing the mechanics for supporting dynamic striping of bounded buffers. This
* implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64}
* class, which is used by atomic counters. The approach was modified to lazily grow an array of
* buffers in order to minimize memory usage for caches that are not heavily contended on.
*
* @author dl@cs.oswego.edu (Doug Lea)
* @author ben.manes@gmail.com (Ben Manes)
*/
abstract class StripedBuffer<E> implements Buffer<E>
這個StripedBuffer
設計的思想是跟Striped64
類似的,通過擴展結構把競爭熱點分離。
具體實現是這樣的,StripedBuffer
維護一個Buffer[]
數組,每個元素就是一個RingBuffer
,每個線程用自己threadLocalRandomProbe
屬性作為 hash 值,這樣就相當於每個線程都有自己“專屬”的RingBuffer
,就不會產生競爭啦,而不是用 key 的hashCode
作為 hash 值,因為會產生熱點數據問題。
看看StripedBuffer
的屬性
/** Table of buffers. When non-null, size is a power of 2. */
//RingBuffer數組
transient volatile Buffer<E> @Nullable[] table;
//當進行resize時,需要整個table鎖住。tableBusy作為CAS的標記。
static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy");
static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe");
/** Number of CPUS. */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The bound on the table size. */
//table最大size
static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU);
/** The maximum number of attempts when trying to expand the table. */
//如果發生競爭時(CAS失敗)的嘗試次數
static final int ATTEMPTS = 3;
/** Table of buffers. When non-null, size is a power of 2. */
//核心數據結構
transient volatile Buffer<E> @Nullable[] table;
/** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */
transient volatile int tableBusy;
/** CASes the tableBusy field from 0 to 1 to acquire lock. */
final boolean casTableBusy() {
return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1);
}
/**
* Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of
* packaging restrictions.
*/
static final int getProbe() {
return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
}
offer
方法,當沒初始化或存在競爭時,則擴容為 2 倍。
實際是調用RingBuffer
的 offer 方法,把數據追加到RingBuffer
后面。
@Override
public int offer(E e) {
int mask;
int result = 0;
Buffer<E> buffer;
//是否不存在競爭
boolean uncontended = true;
Buffer<E>[] buffers = table
//是否已經初始化
if ((buffers == null)
|| (mask = buffers.length - 1) < 0
//用thread的隨機值作為hash值,得到對應位置的RingBuffer
|| (buffer = buffers[getProbe() & mask]) == null
//檢查追加到RingBuffer是否成功
|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
//其中一個符合條件則進行擴容
expandOrRetry(e, uncontended);
}
return result;
}
/**
* Handles cases of updates involving initialization, resizing, creating new Buffers, and/or
* contention. See above for explanation. This method suffers the usual non-modularity problems of
* optimistic retry code, relying on rechecked sets of reads.
*
* @param e the element to add
* @param wasUncontended false if CAS failed before call
*/
//這個方法比較長,但思路還是相對清晰的。
@SuppressWarnings("PMD.ConfusingTernary")
final void expandOrRetry(E e, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
Buffer<E>[] buffers;
Buffer<E> buffer;
int n;
if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
if ((buffer = buffers[(n - 1) & h]) == null) {
if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
boolean created = false;
try { // Recheck under lock
Buffer<E>[] rs;
int mask, j;
if (((rs = table) != null) && ((mask = rs.length) > 0)
&& (rs[j = (mask - 1) & h] == null)) {
rs[j] = create(e);
created = true;
}
} finally {
tableBusy = 0;
}
if (created) {
break;
}
continue; // Slot is now non-empty
}
collide = false;
} else if (!wasUncontended) { // CAS already known to fail
wasUncontended = true; // Continue after rehash
} else if (buffer.offer(e) != Buffer.FAILED) {
break;
} else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
collide = false; // At max size or stale
} else if (!collide) {
collide = true;
} else if (tableBusy == 0 && casTableBusy()) {
try {
if (table == buffers) { // Expand table unless stale
table = Arrays.copyOf(buffers, n << 1);
}
} finally {
tableBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
} else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
boolean init = false;
try { // Initialize table
if (table == buffers) {
@SuppressWarnings({"unchecked", "rawtypes"})
Buffer<E>[] rs = new Buffer[1];
rs[0] = create(e);
table = rs;
init = true;
}
} finally {
tableBusy = 0;
}
if (init) {
break;
}
}
}
}
最后看看RingBuffer
,注意RingBuffer
是BoundedBuffer
的內部類。
/** The maximum number of elements per buffer. */
static final int BUFFER_SIZE = 16;
// Assume 4-byte references and 64-byte cache line (16 elements per line)
//256長度,但是是以16為單位,所以最多存放16個元素
static final int SPACED_SIZE = BUFFER_SIZE << 4;
static final int SPACED_MASK = SPACED_SIZE - 1;
static final int OFFSET = 16;
//RingBuffer數組
final AtomicReferenceArray<E> buffer;
//插入方法
@Override
public int offer(E e) {
long head = readCounter;
long tail = relaxedWriteCounter();
//用head和tail來限制個數
long size = (tail - head);
if (size >= SPACED_SIZE) {
return Buffer.FULL;
}
//tail追加16
if (casWriteCounter(tail, tail + OFFSET)) {
//用tail“取余”得到下標
int index = (int) (tail & SPACED_MASK);
//用unsafe.putOrderedObject設值
buffer.lazySet(index, e);
return Buffer.SUCCESS;
}
//如果CAS失敗則返回失敗
return Buffer.FAILED;
}
//用consumer來處理buffer的數據
@Override
public void drainTo(Consumer<E> consumer) {
long head = readCounter;
long tail = relaxedWriteCounter();
//判斷數據多少
long size = (tail - head);
if (size == 0) {
return;
}
do {
int index = (int) (head & SPACED_MASK);
E e = buffer.get(index);
if (e == null) {
// not published yet
break;
}
buffer.lazySet(index, null);
consumer.accept(e);
//head也跟tail一樣,每次遞增16
head += OFFSET;
} while (head != tail);
lazySetReadCounter(head);
}
注意,ring buffer 的 size(固定是 16 個)是不變的,變的是 head 和 tail 而已。
總的來說ReadBuffer
有如下特點:
- 使用
Striped-RingBuffer
來提升對 buffer 的讀寫 - 用 thread 的 hash 來避開熱點 key 的競爭
- 允許寫入的丟失
WriteBuffer
writeBuffer
跟readBuffer
不一樣,主要體現在使用場景的不一樣。本來緩存的一般場景是讀多寫少的,讀的並發會更高,且 afterRead 顯得沒那么重要,允許延遲甚至丟失。寫不一樣,寫afterWrite
不允許丟失,且要求盡量馬上執行。Caffeine 使用MPSC(Multiple Producer / Single Consumer)作為 buffer 數組,實現在MpscGrowableArrayQueue
類,它是仿照JCTools
的MpscGrowableArrayQueue
來寫的。
MPSC 允許無鎖的高並發寫入,但只允許一個消費者,同時也犧牲了部分操作。
MPSC 我打算另外分析,這里不展開了。
TimerWheel
除了支持expireAfterAccess
和expireAfterWrite
之外(Guava Cache 也支持這兩個特性),Caffeine 還支持expireAfter
。因為expireAfterAccess
和expireAfterWrite
都只能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據某些條件而不一樣的,這就需要用戶自定義過期時間。
先看看expireAfter
的用法
private static LoadingCache<String, String> cache = Caffeine.newBuilder()
.maximumSize(256L)
.initialCapacity(1)
//.expireAfterAccess(2, TimeUnit.DAYS)
//.expireAfterWrite(2, TimeUnit.HOURS)
.refreshAfterWrite(1, TimeUnit.HOURS)
//自定義過期時間
.expireAfter(new Expiry<String, String>() {
//返回創建后的過期時間
@Override
public long expireAfterCreate(@NonNull String key, @NonNull String value, long currentTime) {
return 0;
}
//返回更新后的過期時間
@Override
public long expireAfterUpdate(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) {
return 0;
}
//返回讀取后的過期時間
@Override
public long expireAfterRead(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) {
return 0;
}
})
.recordStats()
.build(new CacheLoader<String, String>() {
@Nullable
@Override
public String load(@NonNull String key) throws Exception {
return "value_" + key;
}
});
通過自定義過期時間,使得不同的 key 可以動態的得到不同的過期時間。
注意,我把expireAfterAccess
和expireAfterWrite
注釋了,因為這兩個特性不能跟expireAfter
一起使用。
而當使用了expireAfter
特性后,Caffeine 會啟用一種叫“時間輪”的算法來實現這個功能。
好,重點來了,為什么要用時間輪?
對expireAfterAccess
和expireAfterWrite
的實現是用一個AccessOrderDeque
雙端隊列,它是 FIFO 的,因為它們的過期時間是固定的,所以在隊列頭的數據肯定是最早過期的,要處理過期數據時,只需要首先看看頭部是否過期,然后再挨個檢查就可以了。但是,如果過期時間不一樣的話,這需要對accessOrderQueue
進行排序&插入,這個代價太大了。於是,Caffeine 用了一種更加高效、優雅的算法-時間輪。
時間輪的結構:
因為在我的對時間輪分析的文章里已經說了時間輪的原理和機制了,所以我就不展開 Caffeine 對時間輪的實現了。
Caffeine 對時間輪的實現在TimerWheel
,它是一種多層時間輪(hierarchical timing wheels )。
看看元素加入到時間輪的schedule
方法:
/**
* Schedules a timer event for the node.
*
* @param node the entry in the cache
*/
public void schedule(@NonNull Node<K, V> node) {
Node<K, V> sentinel = findBucket(node.getVariableTime());
link(sentinel, node);
}
/**
* Determines the bucket that the timer event should be added to.
*
* @param time the time when the event fires
* @return the sentinel at the head of the bucket
*/
Node<K, V> findBucket(long time) {
long duration = time - nanos;
int length = wheel.length - 1;
for (int i = 0; i < length; i++) {
if (duration < SPANS[i + 1]) {
long ticks = (time >>> SHIFT[i]);
int index = (int) (ticks & (wheel[i].length - 1));
return wheel[i][index];
}
}
return wheel[length][0];
}
/** Adds the entry at the tail of the bucket's list. */
void link(Node<K, V> sentinel, Node<K, V> node) {
node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
node.setNextInVariableOrder(sentinel);
sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
sentinel.setPreviousInVariableOrder(node);
}
其他
Caffeine 還有其他的優化性能的手段,如使用軟引用和弱引用、消除偽共享、CompletableFuture
異步等等。
總結
Caffeien 是一個優秀的本地緩存,通過使用 W-TinyLFU 算法, 高性能的 readBuffer 和 WriteBuffer,時間輪算法等,使得它擁有高性能,高命中率(near optimal),低內存占用等特點。