大家對這段代碼肯定很熟悉吧:
public List<UserInfo> SearchUsers(string userName) { string cacheKey=string.Format("SearchUsers_{0}", userName); List<UserInfo> users = cache.Find(cacheKey) as List<UserInfo>; if (users == null) { users = repository.GetUsersByUserName(userName); cache.Set(cacheKey, users); } return users; } class HttpRuntimeCache { public object Find(string key) { return HttpRuntime.Cache[key]; } public void Set(string key, object value) { HttpRuntime.Cache[key] = value; } }
導致了如下這些問題:
- 業務邏輯函數中引入了很多無關的緩存代碼,導致DDD模型不夠純
- 更換緩存Provider不方便
- 加入緩存冗余機制不方便
- 沒辦法同時使用多個緩存系統
- 緩存大對象出現異常,比如Memcache有1M的value限制
有諸多問題,因此我們需要引入緩存子系統來解決上述問題,帶來的好處:
- DDD模型更加純
- 具體的Cache實現機制可以很靈活,比如HttpRuntimeCache, Memcache, Redis可以同時使用
- 加入了Cache冗余機制,不會由於某一台Memcache或者Redis down機導致系統速度很慢,實際上,系統還是會保持飛快(除非backup也down了的情況)
- 開發人員更加致力於核心業務,不會分散注意力
- 緩存位置透明化,都會在xml配置文件中進行配置
解決方案,要用到這2篇文章的技術:C# 代理應用 - Cachable 和 聊聊Memcached的應用。
主要的思路分2個:
模型端:通過代理來嵌入AOP方法,來判斷是否需要緩存,有緩存value則直接返回value;緩存value的寫入是通過AOP的后置方法寫入的,因此不需要在業務函數中寫代碼,當然也支持代碼調用。
Cache核心對象:這個對象要解決一致性hash算法、cache value大對象分解功能、冗余機制
代理嵌入AOP的方法,已經在這篇文章中說明了 C# 代理應用 - Cachable,有興趣的看看,這里就不說了,我們來主要看看CacheCoordinator對象的實現
結構圖如下:
先來看看UML圖:
CacheCore代碼(算法核心):
public class CacheCore { private ICacheCoordinator cacheProvider = null; public CacheCore(ICacheCoordinator cacheProvider) { this.cacheProvider = cacheProvider; } public void Set(string location, string key, object value) { AssureSerializable(value); string xml = Serializer2XMLConvert(value); CacheParsedObject parsedObj = new CacheParsedObject(); string classType = string.Format("{0}", value.GetType().FullName); if (xml.Length > CacheConfig.CacheConfiguration.MaxCacheEntitySize) { /* key:1@3@ConcreteType key_1:subvalue1 key_2:subvalue2 key_3:subvalue3 */ //拆分成更小的單元 int splitCount = xml.Length / CacheConfig.CacheConfiguration.MaxCacheEntitySize; if (CacheConfig.CacheConfiguration.MaxCacheEntitySize * splitCount < xml.Length) splitCount++; parsedObj.MainObject = new KeyValuePair<string, string>(key, string.Format("1@{0}@{1}", splitCount, classType)); for (int i = 0; i < splitCount;i++ ) { if (i == splitCount - 1) //最后一段,直接截取到最后,不用給出長度 parsedObj.SplittedElements.Add(xml.Substring(i * CacheConfig.CacheConfiguration.MaxCacheEntitySize)); else //其他,要給出長度 parsedObj.SplittedElements.Add(xml.Substring(i * CacheConfig.CacheConfiguration.MaxCacheEntitySize, CacheConfig.CacheConfiguration.MaxCacheEntitySize)); } } else { /* key:1@1@ConcreteType key_1:value */ parsedObj.MainObject = new KeyValuePair<string, string>(key, string.Format("1@1@{0}", classType)); parsedObj.SplittedElements.Add(xml); } //針對CacheParsedObject進行逐項保存 this.cacheProvider.Put(parsedObj.MainObject.Key, parsedObj.MainObject.Value); int curIndex = 0; foreach(string xmlValue in parsedObj.SplittedElements) { curIndex++; string tkey=string.Format("{0}_{1}", parsedObj.MainObject.Key, curIndex); this.cacheProvider.Put(tkey, xmlValue); } } public object Get(string location, string key) { string mainObjKeySetting = (string)cacheProvider.Get(key); if (mainObjKeySetting == null || mainObjKeySetting.Length == 0) return null; string classType; CacheParsedObject parsedObj; GetParsedObject(key, mainObjKeySetting, out classType, out parsedObj); string xmlValue=string.Empty; parsedObj.SplittedElements.ForEach(t=>xmlValue+=t); using (StringReader rdr = new StringReader(xmlValue)) { //Assembly.Load("Core"); Type t = Type.GetType(classType); XmlSerializer serializer = new XmlSerializer(t); return serializer.Deserialize(rdr); } } public void Remove(string location, string key) { string mainObjKeySetting = (string)cacheProvider.Get(key); if (mainObjKeySetting == null || mainObjKeySetting.Length == 0) return; string classType; CacheParsedObject parsedObj; GetParsedObject(key, mainObjKeySetting, out classType, out parsedObj); int i = 1; parsedObj.SplittedElements.ForEach(t => this.cacheProvider.Remove(string.Format("{0}_{1}", parsedObj.MainObject.Key, i++))); this.cacheProvider.Remove(parsedObj.MainObject.Key); } private void GetParsedObject(string key, string mainObjKeySetting, out string classType, out CacheParsedObject parsedObj) { int from = 1, end = 1; classType = string.Empty; if (mainObjKeySetting.IndexOf('@') > 0) { end = int.Parse(mainObjKeySetting.Split('@')[1]); classType = mainObjKeySetting.Split('@')[2]; } parsedObj = new CacheParsedObject(); parsedObj.MainObject = new KeyValuePair<string, string>(key, string.Format("1@{0}@{1}", end, classType)); for (int i = from; i <= end; i++) parsedObj.SplittedElements.Add((string)this.cacheProvider.Get(string.Format("{0}_{1}", parsedObj.MainObject.Key, i))); } private string Serializer2XMLConvert(object value) { using (StringWriter sw = new StringWriter()) { XmlSerializer xz = new XmlSerializer(value.GetType()); xz.Serialize(sw, value); return sw.ToString(); } } private void AssureSerializable(object value) { if (value == null) throw new Exception("cache object must be Serializable"); if (value.GetType().GetCustomAttributes(typeof(SerializableAttribute), true).Count()<=0) throw new Exception("cache object must be Serializable"); } }
下面是CacheCoordinator的代碼,這個類的加入目的是要加入緩存的冗余機制:
class CacheCoordinator : ICacheCoordinator { CacheServerWrapper backupCacheServer = new CacheServerWrapper(CacheConfig.CacheConfiguration.BackupCacheServer); CacheServersWrapper peerCacheServer = new CacheServersWrapper(CacheConfig.CacheConfiguration.PeerCacheServers); public void Put(string key, object value) { peerCacheServer.Put(key, value); backupCacheServer.Put(key, value); //緩存冗余 } public object Get(string key) { object o=peerCacheServer.Get(key); if (o != null) return o; return backupCacheServer.Get(key); } public void Remove(string key) { peerCacheServer.Remove(key); backupCacheServer.Remove(key); } }
剩下的就是具體的CacheProvider和CacheProviderWrapper類了:
public class CacheServerWrapper : ICacheExecutor { ICacheExecutor executor = null; private CacheServerInfo configInfo; public CacheServerWrapper(CacheServerInfo configInfo) { this.configInfo = configInfo; ICacheExecutor tmpExecutor = null; switch(this.configInfo.ServerType) { case CacheServerType.HttpRuntime: tmpExecutor = new CacheProvider.HttpRuntimeCacheProvider(configInfo); break; case CacheServerType.InMemory: tmpExecutor = new CacheProvider.InMemoryCacheProvider(configInfo); break; case CacheServerType.Memcached: tmpExecutor = new CacheProvider.MemcachedCacheProvider(configInfo); break; case CacheServerType.Redis: tmpExecutor = new CacheProvider.RedisCacheProvider(configInfo); break; default: tmpExecutor = new CacheProvider.HttpRuntimeCacheProvider(configInfo); break; } executor = tmpExecutor; } public string FullServerAddress { get { return this.configInfo.FullServerAddress; } } public void Put(string key, object value) { executor.Put(key, value); } public object Get(string key) { return executor.Get(key); } public void Remove(string key) { executor.Remove(key); } }
只貼出Memcache的操作類
class MemcachedCacheProvider : ICacheExecutor { private MemcachedClient mc = new MemcachedClient(); private CacheServerInfo configInfo; public MemcachedCacheProvider(CacheServerInfo configInfo) { this.configInfo = configInfo; //初始化池 SockIOPool pool = SockIOPool.GetInstance(); pool.SetServers(new string[] { string.Format("{0}:{1}", configInfo.ServerAddress, configInfo.ServerPort) });//設置連接池可用的cache服務器列表,server的構成形式是IP:PORT(如:127.0.0.1:11211) pool.InitConnections = 3;//初始連接數 pool.MinConnections = 3;//最小連接數 pool.MaxConnections = 5;//最大連接數 pool.SocketConnectTimeout = 1000;//設置連接的套接字超時 pool.SocketTimeout = 3000;//設置套接字超時讀取 pool.MaintenanceSleep = 30;//設置維護線程運行的睡眠時間。如果設置為0,那么維護線程將不會啟動,30就是每隔30秒醒來一次 //獲取或設置池的故障標志。 //如果這個標志被設置為true則socket連接失敗,將試圖從另一台服務器返回一個套接字如果存在的話。 //如果設置為false,則得到一個套接字如果存在的話。否則返回NULL,如果它無法連接到請求的服務器。 pool.Failover = true; pool.Nagle = false;//如果為false,對所有創建的套接字關閉Nagle的算法 pool.Initialize(); } public void Put(string key, object value) { mc.Set(key, value); } public object Get(string key) { return mc.Get(key); } public void Remove(string key) { mc.Delete(key); } }
不能忘了可配置性,xml定義及代碼如下:
<?xml version="1.0" encoding="utf-8" ?> <CacheConfig> <MaxCacheEntitySize>1048576</MaxCacheEntitySize><!--1*1024*1024--> <PeerCacheServers> <CacheServer> <ServerType>InMemory</ServerType> <ServerAddress>127.0.0.1</ServerAddress> <ServerPort>11211</ServerPort> </CacheServer> <CacheServer> <ServerType>InMemory</ServerType> <ServerAddress>127.0.0.1</ServerAddress> <ServerPort>11212</ServerPort> </CacheServer> </PeerCacheServers> <BackupCacheServer> <CacheServer> <ServerType>InMemory</ServerType> <ServerAddress>127.0.0.1</ServerAddress> <ServerPort>11213</ServerPort> </CacheServer> </BackupCacheServer> </CacheConfig>
讀取配置信息的代碼:
public static class CacheConfiguration { static CacheConfiguration() { Load(); } private static void Load() { PeerCacheServers = new List<CacheServerInfo>(); BackupCacheServer = null; XElement root = XElement.Load(System.IO.Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "CacheConfig.xml")); MaxCacheEntitySize = int.Parse(root.Element("MaxCacheEntitySize").Value); foreach (var elm in root.Element("PeerCacheServers").Elements("CacheServer")) { CacheServerInfo srv = new CacheServerInfo(); srv.ServerAddress = elm.Element("ServerAddress").Value; srv.ServerPort = int.Parse(elm.Element("ServerPort").Value); srv.ServerType = (CacheServerType)Enum.Parse(typeof(CacheServerType), elm.Element("ServerType").Value); PeerCacheServers.Add(srv); } foreach (var elm in root.Element("BackupCacheServer").Elements("CacheServer")) { CacheServerInfo srv = new CacheServerInfo(); srv.ServerAddress = elm.Element("ServerAddress").Value; srv.ServerPort = int.Parse(elm.Element("ServerPort").Value); srv.ServerType = (CacheServerType)Enum.Parse(typeof(CacheServerType), elm.Element("ServerType").Value); BackupCacheServer = srv; break; } if (PeerCacheServers.Count <= 0) throw new Exception("Peer cache servers not found."); if (BackupCacheServer == null) throw new Exception("Backup cache server not found."); AssureDistinctFullServerAddress(PeerCacheServers); } private static void AssureDistinctFullServerAddress(List<CacheServerInfo> css) { Dictionary<string, int> map = new Dictionary<string, int>(); foreach(CacheServerInfo csInfo in css) { if (map.ContainsKey(csInfo.FullServerAddress)) throw new Exception(string.Format("Duplicated server address found [{0}].", csInfo.FullServerAddress)); else map[csInfo.FullServerAddress] = 1; } } public static int MaxCacheEntitySize { get; set; } public static List<CacheServerInfo> PeerCacheServers { get; set; } public static CacheServerInfo BackupCacheServer { get; set; } }
Append New
其實,我們忽略了一些重要的東西:
- 如果Memcached, Redis服務器超過了5台以上,通信量上升很快,怎么辦?
- 由於取數據牽涉到網絡I/O操作,因此速度依然比較慢,怎么辦?
讓我們來解決吧。
把新的UML圖貼上(下圖中左邊紅框中的是新增的):
本地緩存替換策略:LFU/LRU,其他的有很多。
EventBus是分布式的,下面有講為什么要分布式的。
當Domain層需要獲取數據時的邏輯:
- 先查看本地緩存中是否存在數據副本,存在則立刻返回(也沒有網絡I/O了)
- 沒有則去redis/memcached獲取,有則返回;並且把數據放入本地cache中
- 最后,實在沒有數據,就db里取
當Domain層需要更新數據時的邏輯:
- 在本地cache中進行更新操作
- 更新分布式緩存
- 發布分布式事件,通知其他app server的cache manager去主動拉數據到他們本地緩存
看得出來,加入這個新的角色后,能對下面2項有改善作用:
- 降低網絡間的通信流量
- 增大本地緩存的命中率