一、什么樣的數據適合緩存

二、緩存穿透
緩存穿透是指查詢一個一定不存在的數據,由於緩存是不命中時需要從數據庫查詢,查不到數據則不寫入緩存,這將導致這個不存在的數據每次請求都要到數據庫去查詢,造成緩存穿透。在流量大時,可能DB就掛掉了,要是有人利用不存在的key頻繁攻擊我們的應用,這就是漏洞。
解決方案:
1)有很多種方法可以有效地解決緩存穿透問題,最常見的則是采用布隆過濾器,將所有可能存在的數據哈希到一個足夠大的bitmap中,一個一定不存在的數據會被這個bitmap攔截掉,從而避免了對底層數據庫的查詢壓力。
2)另外也有一個更為簡單粗暴的方法,如果一個查詢返回的數據為空(不管是數據不存在,還是系統故障),仍然把這個空結果進行緩存,但它的過期時間會很短,最長不超過五分鍾。
三、緩存雪崩:
緩存雪崩是指在設置緩存時采用了相同的過期時間,導致緩存在某一時刻同時失效,導致所有的查詢都落在數據庫上,造成了緩存雪崩。
解決方案:
1)在緩存失效后,通過加鎖或者隊列來控制讀數據庫寫緩存的線程數量。比如對某個key只允許一個線程查詢數據和寫緩存,其他線程等待。
2)可以通過緩存reload機制,預先去更新緩存,在即將發生大並發訪問前手動觸發加載緩存。
3)不同的key,設置不同的過期時間,讓緩存失效的時間點盡量均勻。
4)做二級緩存,或者雙緩存策略。A1為原始緩存,A2為拷貝緩存,A1失效時,可以訪問A2,A1緩存失效時間設置為短期,A2設置為長期。
四、緩存擊穿
對於一些設置了過期時間的key,如果這些key可能會在某些時間點被超高並發地訪問,是一種非常“熱點”的數據。這個時候,需要考慮一個問題:緩存被“擊穿”的問題,這個和緩存雪崩的區別在於這里針對某一key緩存,前者則是很多key。
緩存在某個時間點過期的時候,恰好在這個時間點對這個Key有大量的並發請求過來,這些請求發現緩存過期一般都會從后端DB加載數據並回設到緩存,這個時候大並發的請求可能會瞬間把后端DB壓垮。
解決方案:
1)后台刷新
后台定義一個job(定時任務)專門主動更新緩存數據.比如,一個緩存中的數據過期時間是30分鍾,那么job每隔29分鍾定時刷新數據(將從數據庫中查到的數據更新到緩存中).
注:這種方案比較容易理解,但會增加系統復雜度。比較適合那些 key 相對固定,cache 粒度較大的業務,key 比較分散的則不太適合,實現起來也比較復雜。
2)檢查更新
將緩存key的過期時間(絕對時間)一起保存到緩存中(可以拼接,可以添加新字段,可以采用單獨的key保存..不管用什么方式,只要兩者建立好關聯關系就行).在每次執行get操作后,都將get出來的緩存過期時間與當前系統時間做一個對比,如果緩存過期時間-當前系統時間<=1分鍾(自定義的一個值),則主動更新緩存.這樣就能保證緩存中的數據始終是最新的(和方案一一樣,讓數據不過期.)
注:這種方案在特殊情況下也會有問題。假設緩存過期時間是12:00,而 11:59 到 12:00這 1 分鍾時間里恰好沒有 get 請求過來,又恰好請求都在 11:30 分的時 候高並發過來,那就悲劇了。這種情況比較極端,但並不是沒有可能。因為“高 並發”也可能是階段性在某個時間點爆發。
3)分級緩存
采用 L1 (一級緩存)和 L2(二級緩存) 緩存方式,L1 緩存失效時間短,L2 緩存失效時間長。 請求優先從 L1 緩存獲取數據,如果 L1緩存未命中則加鎖,只有 1 個線程獲取到鎖,這個線程再從數據庫中讀取數據並將數據再更新到到 L1 緩存和 L2 緩存中,而其他線程依舊從 L2 緩存獲取數據並返回。
注:這種方式,主要是通過避免緩存同時失效並結合鎖機制實現。所以,當數據更 新時,只能淘汰 L1 緩存,不能同時將 L1 和 L2 中的緩存同時淘汰。L2 緩存中 可能會存在臟數據,需要業務能夠容忍這種短時間的不一致。而且,這種方案 可能會造成額外的緩存空間浪費。
4)加鎖
方法1:
// 方法1:
public synchronized List<String> getData01() {
List<String> result = new ArrayList<String>();
// 從緩存讀取數據
result = getDataFromCache();
if (result.isEmpty()) {
// 從數據庫查詢數據
result = getDataFromDB();
// 將查詢到的數據寫入緩存
setDataToCache(result);
}
return result;
}
注:這種方式確實能夠防止緩存失效時高並發到數據庫,但是緩存沒有失效的時候,在從緩存中拿數據時需要排隊取鎖,這必然會大大的降低了系統的吞吐量.
方法2:
// 方法2:
static Object lock = new Object();
public List<String> getData02() {
List<String> result = new ArrayList<String>();
// 從緩存讀取數據
result = getDataFromCache();
if (result.isEmpty()) {
synchronized (lock) {
// 從數據庫查詢數據
result = getDataFromDB();
// 將查詢到的數據寫入緩存
setDataToCache(result);
}
}
return result;
}
注:這個方法在緩存命中的時候,系統的吞吐量不會受影響,但是當緩存失效時,請求還是會打到數據庫,只不過不是高並發而是阻塞而已.但是,這樣會造成用戶體驗不佳,並且還給數據庫帶來額外壓力.
方法3:
//方法3
public List<String> getData03() {
List<String> result = new ArrayList<String>();
// 從緩存讀取數據
result = getDataFromCache();
if (result.isEmpty()) {
synchronized (lock) {
//雙重判斷,第二個以及之后的請求不必去找數據庫,直接命中緩存
// 查詢緩存
result = getDataFromCache();
if (result.isEmpty()) {
// 從數據庫查詢數據
result = getDataFromDB();
// 將查詢到的數據寫入緩存
setDataToCache(result);
}
}
}
return result;
}
注:雙重判斷雖然能夠阻止高並發請求打到數據庫,但是第二個以及之后的請求在命中緩存時,還是排隊進行的.比如,當30個請求一起並發過來,在雙重判斷時,第一個請求去數據庫查詢並更新緩存數據,剩下的29個請求則是依次排隊取緩存中取數據.請求排在后面的用戶的體驗會不爽.
方法4:
static Lock reenLock = new ReentrantLock();
public List<String> getData04() throws InterruptedException {
List<String> result = new ArrayList<String>();
// 從緩存讀取數據
result = getDataFromCache();
if (result.isEmpty()) {
if (reenLock.tryLock()) {
try {
System.out.println("我拿到鎖了,從DB獲取數據庫后寫入緩存");
// 從數據庫查詢數據
result = getDataFromDB();
// 將查詢到的數據寫入緩存
setDataToCache(result);
} finally {
reenLock.unlock();// 釋放鎖
}
} else {
result = getDataFromCache();// 先查一下緩存
if (result.isEmpty()) {
System.out.println("我沒拿到鎖,緩存也沒數據,先小憩一下");
Thread.sleep(100);// 小憩一會兒
return getData04();// 重試
}
}
}
return result;
}
注:最后使用互斥鎖的方式來實現,可以有效避免前面幾種問題.
當然,在實際分布式場景中,我們還可以使用 redis、tair、zookeeper 等提供的分布式鎖來實現.但是,如果我們的並發量如果只有幾千的話,何必殺雞焉用牛刀呢?
