前言
近兩篇博客寫的都是與數據緩存相關的,這篇還是繼續緩存相關的話題,主要是二級緩存間的數據同步問題。
緩存可以分為本地緩存(進程內)和分布式緩存(進程外),單獨用其中一種是比較常見的。
組合起來用的,或許也有不少企業在用!本文要討論的內容是屬於這種組合起來用的情形。
先簡單啰嗦一下什么是二級緩存?
何為二級緩存?
二級,可以理解成有兩個不同的級別。二級緩存,可以理解成有兩個不同級別的緩存。
甚至三級,四級也是同樣的概念。這里可以看看CPU的多級緩存概念,很相似。
第一級的緩存一般指的就是進程內的緩存,也就是常說的本地緩存!
在傳統的ASP.NET網站中,我們用的最多的可能就是HttpRuntime.Cache;
在ASP.NET Core中,常用的就是MemoryCache這些。當然也可以自己用ConcurrentDictionary去實現一個定制版的。
第二級的緩存指的是進程外的緩存,這一級往往是我們說的分布式緩存,也就是常用的Redis、Memcached這些。
二級緩存的用法還是比較常規的,先從本地緩存中取,沒有命中就去分布式緩存中取,要是再沒命中,只能去數據庫取咯。
需要注意的是,本地緩存的容量肯定是遠不如分布式緩存大的,所以本地緩存中的緩存數據是相對比較熱點的數據。不然應用服務器的內存就會爆掉!!
到這里,有人可能會問這樣一個問題,直接用Redis或Memcached不就好了嗎?它們的效率又不會差,沒事扯多一個本地緩存干嘛?
首先,需要說明的是,直接用Redis或Memcached是絕對沒有問題的,畢竟已經有那么多成功的案例了,我們也從中受益了不少。
至於扯多一個本地緩存,是因為在使用Redis、Memcached的時候,是需要建立遠程的連接,這里也是還需要花一定的時間的。
畢竟在當前服務器的內存中取數據肯定是比在另一台服務器的內存中取要快很多的!!
針對不同緩存的選擇,可能還會涉及序列化與反序列化的過程。對於這些的耗時,我們還是有必要處理一下的。
當然在用了二級緩存之后,也會引發一些問題。最主要的還是級間緩存的同步問題。
下面我們先通過一個簡單的例子看看這個問題是怎么產生的。
簡單案例
現在有一個商品詳情頁(后面簡稱為單品頁)的站點,主要是用於向用戶展示商品數據,它有三台負載。
每台負載都有使用了本地緩存去緩存熱門的商品數據,當本地緩存沒有命中的時候,會從Redis(Cluster或主從)中讀取數據。
還有一個商品資料管理的平台(后面簡稱平台),用於維護相應的商品資料數據。它有兩台負載。
當管理人員在平台更新了商品資料之后,會將更新后的數據寫進Redis。便於單品頁讀到這些最新的數據。
注:單品頁在這個案例中僅是用作展示作用,並不包含下單之類的操作,它的數據來源有兩塊,一個是本地數據緩存,一個是Redis。
下面我們思考一個問題:
當在平台更新了商品資料后,緩存數據會不會出現問題?
當然這個是需要分情形來討論的。
情形一:
當在平台更新商品資料后,會同時操作Redis中的數據,以確保Redis中的數據是最新的。
這個時候,對於分布式緩存是沒有產生影響的!
情形二:
假設單品頁三台負載的本地緩存中,都沒有平台剛更新的那個商品資料的緩存信息。
在這個時候,從Nginx進來的任何一個請求,都不會直接命中本地緩存,都是需要從Redis中去獲取這個數據。
由於Redis中的數據是最新的,從而也就說明,這種情形也是不會對系統產生影響的!
情形三:
如果說單品頁這三台負載,有其中一台或多台負載的本地緩存中已經有了那個剛更新的商品資料的緩存(這里的緩存數據是更新前的舊數據)。
這個時候,當用戶打開這個商品的單品頁時,可能會從某台負載的本地緩存中讀取到這個舊的商品數據。
從而也就造成用戶看到的商品數據與實際並不相符!!試想一下,如果說正確的價格是1000,而在緩存中的數據是10。
那么一不小心可能就損失了幾個億,今年的年終獎說不定也黃了。
可想而知,如果緩存處理不得當,那會對我們的系統造成十分嚴重的影響。
我遇到過一個公司,就是因為緩存沒處理好,經常導致他們商品資料顯示不正確,而且還只是用了一級緩存(本地緩存)。
情形三已經引出了我們本文要討論的重點問題了,還有其他情形就不再列出來了。
既然我們在使用二級緩存的時候會遇到這個緩存同步的問題,那么我們是不是就不要用二級緩存呢?
答案當然是否定的,用肯定是要用的,遇到問題,自然要想辦法去解決的!
下面來看看這個二級緩存同步問題的解決方案。
解決方案
其實,對於二級緩存之間的同步問題,解決方案還是比較簡單的!
思路就是:當更新分布式緩存(第二級緩存)的時候,順便告訴一下那三台負載,讓它們也更新一下本地緩存中的數據就可以了。
順着這個思路,自然而然就想到了發布訂閱這種機制。
三台負載都去訂閱Redis緩存的更新,在更新商品資料后,同時向三台負載通知剛才更新的商品資料!
三台負載收到通知之后,去更新相應的緩存數據就可以了。
簡單示意圖:
這個可能是最為簡單有效的處理方案,我個人暫時也沒有想到其他更好的方案,如果您有好的建議,可以在下方留言評論或直接聯系我!
背景,案例,解決方案都有了,下面就是要如何去實踐這個二級緩存的同步問題了。
下面會通過ASP.NET Core做一個簡單的Demo來實現這個緩存的同步。
簡單實踐
本地緩存采用:Microsoft.Extensions.Caching.Memory
分布式緩存采用:Redis
發布訂閱采用:Redis的Pub/Sub
首先定義一個緩存操作的基類接口。
public interface ICacheProvider
{
void Set(string cacheKey, object cacheValue, TimeSpan expiration);
object Get(string cacheKey, Func<object> dataRetriever, TimeSpan expiration);
void Remove(string cacheKey);
}
這個基類接口包含了最簡單的三個操作。其中Get方法還帶了一個額外的操作,當這次緩存沒有取到數據,會從dateRetriever中拿數據。
定義兩個新接口去繼承上面這個基類接口,一個代表是本地緩存,一個代表是分布式緩存。
public interface ILocalCacheProvider : ICacheProvider
{
}
public interface IRemoteCacheProvider : ICacheProvider
{
}
這兩個接口並沒有定義什么額外的操作。
定義一個序列化的接口,用於處理緩存值的序列化操作。
public interface ISerializer
{
string Serialize(object obj);
object Deserialize(string str);
T Deserialize<T>(string str);
}
這里只實現了針對Json的序列化操作。
public class JsonSerializer : ISerializer
{
public object Deserialize(string str)
{
return Newtonsoft.Json.JsonConvert.DeserializeObject(str);
}
public T Deserialize<T>(string str)
{
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(str);
}
public string Serialize(object obj)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(obj);
}
}
然后是基本的緩存操作的實現。
Memory緩存的實現:
public class MemoryCacheProvider : ILocalCacheProvider
{
private IMemoryCache _cache;
public MemoryCacheProvider(IMemoryCache cache)
{
_cache = cache;
}
public object Get(string cacheKey, Func<object> dataRetriever, TimeSpan expiration)
{
var result = _cache.Get(cacheKey);
if (result != null)
return result;
var obj = dataRetriever.Invoke();
if (obj != null)
this.Set(cacheKey, obj, expiration);
return obj;
}
//省略部分。。。
}
由於MemoryCache是可以直接操作object類型的,所以這里就不用進行序列化操作。
Redis緩存的實現:
public class RedisCacheProvider : IRemoteCacheProvider
{
private readonly ISerializer _serializer;
public RedisCacheProvider(ISerializer serializer)
{
this._serializer = serializer;
}
public void Set(string cacheKey, object cacheValue, TimeSpan expiration)
{
var value = _serializer.Serialize(cacheValue);
RedisCacheConfig.Connection.GetDatabase().StringSet(cacheKey, value, expiration);
}
//省略部分。。。
}
這里需要用前面定義的序列化接口,因為我們不能像MemoryCache那樣直接將object扔進Redis中。
這里還偷了一下懶,直接將Redis的連接信息寫死到一個靜態類里面了。
到這一步,基本的緩存操作已經有了。下面要關注的就是訂閱和發布的內容了。
雖然說這個小Demo是用Redis來處理發布訂閱,但是能完成發布訂閱的還有MQ,所以發布訂閱也還是要面向接口,便於更換調整。
首先是訂閱分布式緩存變更的接口。
public interface ICacheSubscriber
{
void Subscribe(string channel, NotifyType notifyType);
}
向本地緩存通知變更的接口:
public interface ICachePublisher
{
void Notify(NotifyType notifyType ,string cacheKey, object cacheValue, TimeSpan expiration);
}
基於Redis實現的ICachePublisher接口
public class RedisCachePublisher : ICachePublisher
{
private readonly ISerializer _serialize;
public RedisCachePublisher(ISerializer serialize)
{
this._serialize = serialize;
}
public void Notify(NotifyType notifyType, string cacheKey, object cacheValue, TimeSpan expiration)
{
//省略部分。。。
RedisPubSubConfig.Connection.GetSubscriber().Publish(channelName, _serialize.Serialize(args));
}
}
這里是直接調用StackExchange.Redis里面的Publish方法來向本地緩存發起通知。這里通知的內容是經過序列化后的值。
基於Redis實現的ICacheSubscriber接口:
public class RedisCacheSubscriber : ICacheSubscriber
{
private readonly ILocalCacheProvider _localCache;
private readonly ISerializer _serialize;
public RedisCacheSubscriber(ILocalCacheProvider localCache , ISerializer serialize)
{
this._localCache = localCache;
this._serialize = serialize;
}
public void Subscribe(string channel, NotifyType notifyType)
{
switch (notifyType)
{
case NotifyType.Add:
RedisPubSubConfig.Connection.GetSubscriber().Subscribe(channel, CacheAddAction);
break;
case NotifyType.Update:
RedisPubSubConfig.Connection.GetSubscriber().Subscribe(channel, CacheUpdateAction);
break;
case NotifyType.Delete:
RedisPubSubConfig.Connection.GetSubscriber().Subscribe(channel, CacheUpdateAction);
break;
}
}
private void CacheDeleteAction(RedisChannel channel, RedisValue message)
{
var deleteNotification = _serialize.Deserialize<CacheNotificationObject>(message);
_localCache.Remove(deleteNotification.CacheKey);
}
//省略部分...
}
在使用Redis的訂閱之后,需要進行一個Action的處理,這里處理的就是上面Publish后的內容!
所以Action處理的第一步就是反序列化拿到變更信息,然后調用本地緩存的接口進行相應的操作。
由於在平台上面操作的時候,在更新Redis緩存的同時會發一個通知給本地緩存。
這就意味着調用方會進行兩個操作,一是更新Redis緩存,二是通知本地緩存。
為了簡化平台調用時的操作,這里也對其進行了整合。
定義一個平台更新緩存操作時用的接口。
public interface IPublishCacheProvider
{
void Add(string cacheKey, object cacheValue, TimeSpan expiration, bool isNeedToNotify = false);
void Update(string cacheKey, object cacheValue, TimeSpan expiration);
void Delete(string cacheKey);
}
要注意的是,我們在平台進行添加操作的時候,不一定要通知本地緩存的!
因為添加的商品,必然是新產品,這個時候本地緩存是不會存在的相應數據的,可以讓其去Redis緩存中取,這就和上面的情形一是一樣的。
加上是否需要通知這個選項是為了預防有一些活動熱門或特殊要求之類的產品要上新,這個時候可以考慮直接在本地緩存中也加上去。
但是,修改和刪除就沒得商量了,必須要去通知,不然造成數據不一致,那可就不好玩了。
下面是具體的實現:
public class PublishCacheProvider : IPublishCacheProvider
{
private readonly IRemoteCacheProvider _remoteCache;
private readonly ICachePublisher _cachePublisher;
public PublishCacheProvider(IRemoteCacheProvider remoteCache, ICachePublisher publisher)
{
this._remoteCache = remoteCache;
this._cachePublisher = publisher;
}
public void Add(string cacheKey, object cacheValue, TimeSpan expiration, bool isNeedToNotify = false)
{
_remoteCache.Set(cacheKey, cacheValue, expiration);
if (isNeedToNotify)
_cachePublisher.Notify(NotifyType.Add, cacheKey, cacheValue, expiration);
}
//省略部分。。。
}
到這里,我們對緩存的操作已經處理好了。下面就是單品頁和平台兩個地方的處理了。
先來看看平台這一塊。
這里直接用一個API站點來模擬平台的操作。
[Route("api/[controller]")]
public class ValuesController : Controller
{
private readonly IPublishCacheProvider _cache;
public ValuesController(IPublishCacheProvider cache)
{
_cache = cache;
}
[HttpGet]
public string Get(int type)
{
if(type == 1)
{
_cache.Update("Test", DateTime.Now.ToString(), TimeSpan.FromMinutes(5));
}
else if (type == 2)
{
_cache.Add("Test", DateTime.Now.ToString(), TimeSpan.FromMinutes(5),true);
}
else
{
_cache.Delete("Test");
}
return "Update Redis Cache And Notify Succeed!";
}
}
這里固定了一個緩存的key,比較簡單粗暴。
接下來是單品頁。
單品頁的操作會相對麻煩一點。
我們先來搞定比較棘手的一個東西,訂閱。
如果是在MVC或Web Forms時代,我們會把訂閱的代碼寫在Globle.acsx中。
但是在ASP.NET Core中就沒有這個東西了,我們要轉向Startup上面去了。
public class Startup
{
//省略部分。。。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddTransient<ICacheSubscriber, RedisCacheSubscriber>();
services.AddTransient<ILocalCacheProvider, MemoryCacheProvider>();
services.AddTransient<IRemoteCacheProvider, RedisCacheProvider>();
services.AddTransient<IDemoService,DemoService>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
var subscriber = app.ApplicationServices.GetRequiredService<ICacheSubscriber>();
//channel name should read from database or settings
subscriber.Subscribe("CacheAdd", NotifyType.Add);
subscriber.Subscribe("CacheUpdate", NotifyType.Update);
subscriber.Subscribe("CacheDelete", NotifyType.Delete);
}
}
我們是在Configure方法里面進行訂閱操作的。這里需要注意的是ConfigureServices是在Configure方法之前執行的。
所以我們可以在Configure方法中拿到ICacheSubscriber的實現類,從而去完成訂閱的操作。
另外這里的Channel和前面一樣是硬編碼的,這里應該要從配置中心讀取才是比較好的。
這里還有一個模擬從數據庫中拿數據的操作。
public interface IDemoService
{
object Get();
}
public class DemoService : IDemoService
{
public object Get()
{
return "Demo";
}
}
最后就是單品頁的使用了,這里用一個MVC項目來展示。
public class HomeController : Controller
{
private readonly ILocalCacheProvider _localCache;
private readonly IRemoteCacheProvider _remoteCache;
private readonly IDemoService _service;
public HomeController(ILocalCacheProvider localCache, IRemoteCacheProvider remoteCache, IDemoService service)
{
this._localCache = localCache;
this._remoteCache = remoteCache;
this._service = service;
}
public IActionResult Index()
{
TimeSpan ts = TimeSpan.FromMinutes(5);
//ViewBag.Cache = _localCache.Get("Test", () => "123", ts).ToString();
ViewBag.Cache = _localCache.Get("Test", () =>
{
return _remoteCache.Get("Test", () => _service.Get(), ts);
}, ts).ToString();
return View();
}
}
在單品頁中,我們只做了讀的操作,代碼比較簡單就不一一解釋了。
下面來看看效果如何。
上述效果圖中,一開始是直接從IDemoService中取的值:Demo,因為本地緩存和分布式緩存中都沒有相應的數據。
然后在平台中進行了操作,寫入了新的數據,單品頁中的本地緩存就更新了。最后是在平台執行了刪除操作,單品頁的緩存數據自然也就被刪除了。
最后看一下平台更新后,單品頁訂閱那里的斷點調試:
總結
二級緩存的同步問題處理起來還是比較簡單的。如果使用了二級(多級)緩存,我們還是應該要考慮到這個問題的,不然到時踩雷了就不好了。
但是文中的Demo還是很粗糙,也並不優雅。后面還會對這個進行一些改造。
最后附上文中Demo的地址