基本寫法
為了方便演示,這里使用Runtime.Cache做緩存容器,並定義個簡單操作類。如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
public
class
CacheHelper
{
public
static
object
Get(
string
cacheKey)
{
return
HttpRuntime.Cache[cacheKey];
}
public
static
void
Add(
string
cacheKey,
object
obj,
int
cacheMinute)
{
HttpRuntime.Cache.Insert(cacheKey, obj,
null
, DateTime.Now.AddMinutes(cacheMinute),
Cache.NoSlidingExpiration, CacheItemPriority.Normal,
null
);
}
}
|
簡單讀取:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
object
GetMemberSigninDays1()
{
const
int
cacheTime = 5;
const
string
cacheKey =
"mushroomsir"
;
var
cacheValue = CacheHelper.Get(cacheKey);
if
(cacheValue !=
null
)
return
cacheValue;
cacheValue =
"395"
;
//這里一般是 sql查詢數據。 例:395 簽到天數
CacheHelper.Add(cacheKey, cacheValue, cacheTime);
return
cacheValue;
}
|
在項目中,有不少這樣寫法,這樣寫並沒有錯,但在並發量上來后就容易出問題。
緩存雪崩
緩存雪崩是由於緩存失效(過期),新緩存未到期間。
這個中間時間內,所有請求都去查詢數據庫,而對數據庫CPU和內存造成巨大壓力,前端連接數不夠、查詢阻塞。
這個中間時間並沒有那么短,比如sql查詢1秒,加上傳輸解析0.5秒。 就是說1.5秒內所有用戶查詢,都是直接查詢數據庫的。
碰到這種情況,使用最多的解決方案就是加鎖排隊。
全局鎖,實例鎖
public static object obj1 = new object(); public object GetMemberSigninDays2() { const int cacheTime = 5; const string cacheKey = "mushroomsir"; var cacheValue = CacheHelper.Get(cacheKey); if (cacheValue != null) return cacheValue; //lock (obj1) //全局鎖 //{ // cacheValue = CacheHelper.Get(cacheKey); // if (cacheValue != null) // return cacheValue; // cacheValue = "395"; //這里一般是 sql查詢數據。 例:395 簽到天數 // CacheHelper.Add(cacheKey, cacheValue, cacheTime); //} lock (this) { cacheValue = CacheHelper.Get(cacheKey); if (cacheValue != null) return cacheValue; cacheValue = "395"; //這里一般是 sql查詢數據。 例:395 簽到天數 CacheHelper.Add(cacheKey, cacheValue, cacheTime); } return cacheValue; }
第一種:lock (obj1) 是全局鎖可以滿足,但要為每個函數都聲明一個obj,不然在A、B函數都鎖obj1時,必然會讓其中一個阻塞。
第二種:lock (this) 這個鎖當前實例,對其他實例無效,那這個鎖就沒什么效果了,當然使用單例模式的對象可以鎖。
在當前實例中:A函數鎖當前實例,其他也鎖當前實例的函數的讀寫,也被阻塞,這種做法也不可取。
字符串鎖
既然鎖對象不行,利用字符串的特性,直接鎖緩存的key呢
public object GetMemberSigninDays3() { const int cacheTime = 5; const string cacheKey = "mushroomsir"; var cacheValue = CacheHelper.Get(cacheKey); if (cacheValue != null) return cacheValue; const string lockKey = cacheKey + "n(*≧▽≦*)n"; //lock (cacheKey) //{ // cacheValue = CacheHelper.Get(cacheKey); // if (cacheValue != null) // return cacheValue; // cacheValue = "395"; //這里一般是 sql查詢數據。 例:395 簽到天數 // CacheHelper.Add(cacheKey, cacheValue, cacheTime); //} lock (lockKey) { cacheValue = CacheHelper.Get(cacheKey); if (cacheValue != null) return cacheValue; cacheValue = "395"; //這里一般是 sql查詢數據。 例:395 簽到天數 CacheHelper.Add(cacheKey, cacheValue, cacheTime); } return cacheValue; }
第一種:lock (cacheName) 有問題,因為字符串也是共享的,會阻塞其他使用這個字符串的操作行為。
具體請參考之前的博文 c#語言-多線程中的鎖系統(一)。
因為字符串被公共語言運行庫 (CLR)暫留,這意味着整個程序中任何給定字符串都只有一個實例,所以才會用下面第二種方法。
第二種:lock (lockKey) 可以滿足。其目的就是為了保證鎖的粒度最小並且全局唯一性,只鎖當前緩存的查詢行為。
緩存穿透
先舉個簡單例子:一般網站經常會緩存用戶搜索的結果,如果數據庫查詢不到,是不會做緩存的。但如果頻繁查這個空關鍵字,會導致每次請求都直接查詢數據庫了。
例子就是緩存穿透,請求繞過緩存直接查數據庫,這也是經常提的緩存命中率問題。
public object GetMemberSigninDays4() { const int cacheTime = 5; const string cacheKey = "mushroomsir"; var cacheValue = CacheHelper.Get(cacheKey); if (cacheValue != null) return cacheValue; const string lockKey = cacheKey + "n(*≧▽≦*)n"; lock (lockKey) { cacheValue = CacheHelper.Get(cacheKey); if (cacheValue != null) return cacheValue; cacheValue = null; //數據庫查詢不到,為空。 //if (cacheValue2 == null) //{ // return null; //一般為空,不做緩存 //} if (cacheValue == null) { cacheValue = string.Empty; //如果發現為空,我設置個默認值,也緩存起來。 } CacheHelper.Add(cacheKey, cacheValue, cacheTime); } return cacheValue; }
如果把查詢不到的空結果,也給緩存起來,這樣下次同樣的請求就可以直接返回null了,即可以避免當查詢的值為空時引起的緩存穿透。
可以單獨設置個緩存區域存儲空值,對要查詢的key進行預先校驗,然后再放行給后面的正常緩存處理邏輯。
再談緩存雪崩
前面不是用加鎖排隊方式就解決了嗎?其實加鎖排隊只是為了減輕數據庫的壓力,本質上並沒有提高系統吞吐量。
假設在高並發下,緩存重建期間key是鎖着的,這是過來1000個請求999個都在阻塞的。導致的結果是用戶等待超時,這是非常不優化的體驗。
這種行為本質上是把多線程的Web服務器,在此時給變成單線程處理了,會導致大量的阻塞。對於系統資源也是一種浪費,因緩存重建而阻塞的線程本可以處理更多請求的。
這里提出一種解決方案是:
public object GetMemberSigninDays5() { const int cacheTime = 5; const string cacheKey = "mushroomsir"; //緩存標記。 const string cacheSign = cacheKey + "_Sign"; var sign = CacheHelper.Get(cacheSign); //獲取緩存值 var cacheValue = CacheHelper.Get(cacheKey); if (sign != null) return cacheValue; //未過期,直接返回。 lock (cacheSign) { sign = CacheHelper.Get(cacheSign); if (sign != null) return cacheValue; CacheHelper.Add(cacheSign, "1", cacheTime); ThreadPool.QueueUserWorkItem((arg) => { cacheValue = "395"; //這里一般是 sql查詢數據。 例:395 簽到天數 CacheHelper.Add(cacheKey, cacheValue, cacheTime*2); //日期設緩存時間的2倍,用於臟讀。 }); } return cacheValue; }
從代碼中看出,我們多使用了一個緩存標記key,並使用雙檢鎖校驗保證后面邏輯不會多次執行。
緩存標記key: 緩存標記key只是一個記錄實際key過期時間的標記,它的緩存值可以是任意值,比如1。 它主要用來在實際key過期后,觸發通知另外的線程在后台去更新實際key的緩存。
實際key: 它的過期時間會延長1倍,例:本來5分鍾,現在設置為10分鍾。 這樣做的目的是,當緩存標記key過期后,實際緩存還能以臟數據返回給調用端,直到另外的線程在后台更新完成后,才會返回新緩存。
關於實際key的過期時間延長1倍,還是2、3倍都是可以的。只要大於正常緩存過期時間,並且能保證在延長的時間內足夠拉取數據即可。
還一個好處就是,如果突然db掛了,臟數據的存在可以保證前端系統不會拿不到數據。
這樣做后,就可以一定程度上提高系統吞吐量。
總結
文中說的阻塞其他函數指的是,並發情況下鎖同一對象,比如一個函數鎖A對象,另外的函數就必須等待A對象的鎖釋放后才能再次進鎖。
關於更新緩存,可以單開一個線程去專門跑緩存更新,圖方便的話扔線程池里面即可。
實際項目中,緩存層框架的封裝往往要復雜的多,如果並發量比較小,這樣寫反而會增加代碼的復雜度,具體要根據實際情況來取舍。
**************************************************************
緩存預熱
上次有同學問過,在第一次加載時緩存都為空,怎么進行預熱。
單機Web情況下一般使用RunTimeCache,這種情況下:
可以在啟動事件里面刷新
void Application_Start(object sender, EventArgs e) { //刷新 }
另外可以單寫個刷新緩存頁面,上線后手動刷新下或發布時自動調用刷新,再或者由用戶自行觸發。
分布式緩存(Redis、Memcached)情況下:
比如在幾十台服務器緩存時,單刷滿緩存都需要不少一段時間。
這種預熱就復雜一些,有的會單寫個應用程序去跑,也有的會單寫套框架機制去處理(更智能化)。
其目的是在系統上線之前,所有的緩存都預先加載完畢。
多級緩存
計算機結構中CPU和內存之間一般都配有一級緩存、二級緩存來增加交換速度。
這樣當CPU調用大量數據時,就可避開內存直接從CPU緩存中調用,加快讀取速度。
根據CPU緩存得出多級緩存的特點:
1:每一級緩存中儲存的是下一級緩存的一部分。
2:讀取速度按級別依次遞減,成本也依次遞減,容量依次遞增。
3:當前級別未命中時,才會去下一級尋找。
而在企業應用級開發中,使用多級緩存是同樣的目的及設計,只是粒度更粗,更靈活。
根據速度依次排列lv1-lv6的緩存類型圖:
3級緩存的命中流程圖例子:
線程緩存
Web應用是天生的多線程開發,對於一些公共資源必須考慮線程安全,為止不得不通過鎖來保證數據的完整性和正確性。
在實際當中一台web服務器至少也得處理成百上千的請求,想一想在業務復雜的處理流程,函數每調用一次都得鎖一下,對服務器也是個不小的浪費。
而通過線程緩存,可以讓當前處理用戶請求的線程只拿自己需要的數據。
public static ThreadLocal<UserScore> localUserInfo = new ThreadLocal<UserScore>();
借助Net提供的線程本地變量,可以在請求入口去拉取當前用戶的數據。
在之后線程整個生命周期里面,業務邏輯可以毫無顧慮的使用這些數據,而不需要考慮線程安全。
因為不用重新拿新緩存數據,所以也不用擔心數據撕裂的問題。
其當前線程周期里面的數據是完整無誤的,只有用戶第二次發起請求才會重新去拿新數據。
這樣就能提高不少服務器吞吐量,注意要在線程的出口處銷毀數據。
內存緩存
無論是遠程數據庫讀取,還是緩存服務器讀取。避免不了要跨進程,跨網絡通信,有的還跨機房。
而應用程序頻繁讀寫,對Web、DB服務器都是個不小的消耗,速度相較內存也慢的多。
代碼上加鎖、異步,甚至加服務器在內,都不是一個很好的辦法。因為加載速度,對用戶體驗非常重要。
所以在有要求的項目中使用本地內存做二級緩存,是非常有必要的。目的就是1:抗並發,2:加快讀取速度。
有個著名的緩存五分鍾法則法則,就是說如果一個數據頻繁被訪問,那么就應該放內存中。
舉個例子: 有100並發過來,加鎖會導致前端99線程等候,這個99線程等候着,其實是一直在消耗Web服務器資源。不加就是緩存雪崩。
如果每分鍾拉取一份緩存,緩存到內存,這樣99線程等候時間極大縮短。
文件緩存
相對於內存,硬盤容量大,速度相較於走網絡還更快。
所以我們完全可以把一些不經常變更,放在內存又比較浪費的數據緩存到本地硬盤。
比如使用sqlite一些文件數據庫,我們很容易做到。
分布式緩存
基於內存緩存的redis、memcached等。
基於文件nosql的Casssandra、mongodb等。
redis、memcached是主流的分布式內存緩存,也是應用和DB中間最大的緩存層。
nosql這類的其實不單單只是做緩存用了,完全用在一些非核心業務的DB層了。
DB緩存
這一層DB主要是緩存由原始數據計算出的結果,從而避免由Web程序通過SQL或在使用中直接計算。
當然也可以把計算好的數據,存儲到redis中當緩存。
多層緩存
多層緩存概念在很多地方都用到過:
1:上面介紹的多級緩存就是一種,把內容根據讀取頻率,分不同的等級、不同的層次進行存儲,頻率越高離查詢越近。
2:還一種多層是緩存索引的做法,類似B樹查找,這樣能提高檢索效率。
3:從架構上來說瀏覽器緩存、CDN緩存、反向代理緩存、服務端緩存、也是多層緩存。
總結
在使用上大家根據實際場景,進行各種組合搭配。本篇談的比較理論些,有些內容細節沒展開。
比如分布式緩存的使用,緩存置換策略及算法,緩存過期機制等。
***************************************************************
分析設計
假設有個項目有比較高的並發量,要用到多級緩存,如下:
在實際設計一個內存緩存前,需要考慮的問題:
1:內存與Redis的數據置換,盡可能在內存中提高數據命中率,減少下一級的壓力。
2:內存容量的限制,需要控制緩存數量。
3:熱點數據更新不同,需要可配置單個key過期時間。
4:良好的緩存過期刪除策略。
5:緩存數據結構的復雜度盡可能的低。
關於置換及命中率:采用LRU算法,因為它實現簡單,緩存key命中率也很好。
LRU即是:把最近最少訪問的數據給淘汰掉,經常被訪問到即是熱點數據。
關於LRU數據結構:因為key優先級提升和key淘汰,所以需要順序結構,網上大多實現都采用的這種鏈表結構。
即新數據插入到鏈表頭部、被命中時的數據移動到頭部,添加復雜度O(1),移動和獲取復雜度O(N)。
有沒復雜度更低的呢? 有Dictionary,復雜度為O(1),性能最好。 那如何保證緩存的優先級提升呢?
O(1)LRU實現
定義個LRUCache<TValue>類,構造參數maxKeySize 來控制緩存最大數量。
使用ConcurrentDictionary來作為我們的緩存容器,並能保證線程安全。
public class LRUCache<TValue> : IEnumerable<KeyValuePair<string, TValue>> { private long ageToDiscard = 0; //淘汰的年齡起點 private long currentAge = 0; //當前緩存最新年齡 private int maxSize = 0; //緩存最大容量 private readonly ConcurrentDictionary<string, TrackValue> cache; public LRUCache(int maxKeySize) { cache = new ConcurrentDictionary<string, TrackValue>(); maxSize = maxKeySize; } }
上面定義了 ageToDiscard、currentAge 這2個自增值參數,作用是標記緩存列表中各個key的新舊程度。
實現步驟如下:
每次添加key時,currentAge自增並將currentAge值分配給這個緩存值的age,currentAge一直自增。
public void Add(string key, TValue value) { Adjust(key); var result = new TrackValue(this, value); cache.AddOrUpdate(key, result, (k, o) => result); } public class TrackValue { public readonly TValue Value; public long Age; public TrackValue(LRUCache<TValue> lv, TValue tv) { Age = Interlocked.Increment(ref lv.currentAge); Value = tv; } }
在添加時,如超過最大數量,檢查字典里是否有ageToDiscard年齡的key,如沒有循環自增檢查,有則刪除、添加成功。
其ageToDiscard+maxSize= currentAge ,這樣設計就能在O(1)下保證可以淘汰舊數據,而不是使用鏈表移動。
public void Adjust(string key) { while (cache.Count >= maxSize) { long ageToDelete = Interlocked.Increment(ref ageToDiscard); var toDiscard = cache.FirstOrDefault(p => p.Value.Age == ageToDelete); if (toDiscard.Key == null) continue; TrackValue old; cache.TryRemove(toDiscard.Key, out old); } }
獲取key的時候表示它又被人訪問,將最新的currentAge賦值給它,增加它的年齡:
public TValue Get(string key) { TrackValue value=null; if (cache.TryGetValue(key, out value)) { value.Age = Interlocked.Increment(ref currentAge); } return value.Value; }
過期刪除策略
大多數情況下,LRU算法對熱點數據命中率是很高的。 但如果突然大量偶發性的數據訪問,會讓內存中存放大量冷數據,也即是緩存污染。
會引起LRU無法命中熱點數據,導致緩存系統命中率急劇下降,也可以使用LRU-K、2Q、MQ等變種算法來提高命中率。
過期配置
通過設定最大過期時間來盡量避免冷數據常駐內存。
多數情況每個數據緩存的時間要求不一致的,所以需要再增加單個key的過期時間字段。
private TimeSpan maxTime; public LRUCache(int maxKeySize,TimeSpan maxExpireTime){} //TrackValue增加創建時間和過期時間 public readonly DateTime CreateTime; public readonly TimeSpan ExpireTime;
刪除策略
關於key過期刪除,最好的方式是使用定時刪除,這樣可以最快的釋放被占用的內存,但很明顯大量的定時器對CPU來說是非常不友好的。
所以需要采用惰性刪除、在獲取key的時檢查是否過期,過期直接刪除。
public Tuple<TrackValue, bool> CheckExpire(string key) { TrackValue result; if (cache.TryGetValue(key, out result)) { var age = DateTime.Now.Subtract(result.CreateTime); if (age >= maxTime || age >= result.ExpireTime) { TrackValue old; cache.TryRemove(key, out old); return Tuple.Create(default(TrackValue), false); } } return Tuple.Create(result, true); }
惰性刪除雖然性能最好,但對於冷數據來說還是沒解決緩存污染的問題,所以還需增加個定期清理和惰性刪除配合使用。
比如單開個線程每5分鍾去遍歷檢查key是否過期,這個時間策略是可配置的,如果緩存數量較多可分批遍歷檢查。
public void Inspection() { foreach (var item in this) { CheckExpire(item.Key); } }
惰性刪除配合定期刪除基本上能滿足絕大多數要求了。
總結
本篇參考了redis、Orleans的相關實現。
如果繼續完善下去就是內存數據庫的雛形,類似redis,比如增加刪除key的通知回調,支持更多的數據類型存儲。
http://www.cnblogs.com/kevingrace/p/5575385.html