緩存子系統如何設計(Cachable tag, Memcache/redis support, xml config support, LRU/LFU/本地緩存命中率)


大家對這段代碼肯定很熟悉吧:

public List<UserInfo> SearchUsers(string userName)
        {
            string cacheKey=string.Format("SearchUsers_{0}", userName);
            List<UserInfo>  users = cache.Find(cacheKey) as List<UserInfo>;
            if (users == null)
            {
                users = repository.GetUsersByUserName(userName);
                cache.Set(cacheKey, users);
            }
            return users;
        }

class HttpRuntimeCache
    {
        public object Find(string key)
        {
            return HttpRuntime.Cache[key];
        }
        public void Set(string key, object value)
        {
            HttpRuntime.Cache[key] = value;
        }
    }

導致了如下這些問題:

  1. 業務邏輯函數中引入了很多無關的緩存代碼,導致DDD模型不夠純
  2. 更換緩存Provider不方便
  3. 加入緩存冗余機制不方便
  4. 沒辦法同時使用多個緩存系統
  5. 緩存大對象出現異常,比如Memcache有1M的value限制

有諸多問題,因此我們需要引入緩存子系統來解決上述問題,帶來的好處:

  1. DDD模型更加純
  2. 具體的Cache實現機制可以很靈活,比如HttpRuntimeCache, Memcache, Redis可以同時使用
  3. 加入了Cache冗余機制,不會由於某一台Memcache或者Redis down機導致系統速度很慢,實際上,系統還是會保持飛快(除非backup也down了的情況)
  4. 開發人員更加致力於核心業務,不會分散注意力
  5. 緩存位置透明化,都會在xml配置文件中進行配置

解決方案,要用到這2篇文章的技術:C# 代理應用 - Cachable 和 聊聊Memcached的應用。 

主要的思路分2個:

模型端:通過代理來嵌入AOP方法,來判斷是否需要緩存,有緩存value則直接返回value;緩存value的寫入是通過AOP的后置方法寫入的,因此不需要在業務函數中寫代碼,當然也支持代碼調用。

Cache核心對象:這個對象要解決一致性hash算法、cache value大對象分解功能、冗余機制

代理嵌入AOP的方法,已經在這篇文章中說明了 C# 代理應用 - Cachable,有興趣的看看,這里就不說了,我們來主要看看CacheCoordinator對象的實現

結構圖如下:

先來看看UML圖:

CacheCore代碼(算法核心):

public class CacheCore
    {
        private ICacheCoordinator cacheProvider = null;
        public CacheCore(ICacheCoordinator cacheProvider)
        {
            this.cacheProvider = cacheProvider;
        }

        public void Set(string location, string key, object value)
        {
            AssureSerializable(value);
            string xml = Serializer2XMLConvert(value);
            CacheParsedObject parsedObj = new CacheParsedObject();

            string classType = string.Format("{0}", value.GetType().FullName);
            if (xml.Length > CacheConfig.CacheConfiguration.MaxCacheEntitySize)
            {
                /*
                    key:1@3@ConcreteType
                    key_1:subvalue1
                    key_2:subvalue2
                    key_3:subvalue3
                */
                //拆分成更小的單元
                int splitCount = xml.Length / CacheConfig.CacheConfiguration.MaxCacheEntitySize;
                if (CacheConfig.CacheConfiguration.MaxCacheEntitySize * splitCount < xml.Length)
                    splitCount++;
                parsedObj.MainObject = new KeyValuePair<string, string>(key, string.Format("1@{0}@{1}", splitCount, classType));
                for (int i = 0; i < splitCount;i++ )
                {
                    if (i == splitCount - 1)  //最后一段,直接截取到最后,不用給出長度
                        parsedObj.SplittedElements.Add(xml.Substring(i * CacheConfig.CacheConfiguration.MaxCacheEntitySize));
                    else                      //其他,要給出長度
                        parsedObj.SplittedElements.Add(xml.Substring(i * CacheConfig.CacheConfiguration.MaxCacheEntitySize, CacheConfig.CacheConfiguration.MaxCacheEntitySize));
                }
            }
            else
            {
                /*
                    key:1@1@ConcreteType
                    key_1:value
                */
                parsedObj.MainObject = new KeyValuePair<string, string>(key, string.Format("1@1@{0}", classType));
                parsedObj.SplittedElements.Add(xml);
            }

            //針對CacheParsedObject進行逐項保存
            this.cacheProvider.Put(parsedObj.MainObject.Key, parsedObj.MainObject.Value);
            int curIndex = 0;
            foreach(string xmlValue in parsedObj.SplittedElements)
            {
                curIndex++;
                string tkey=string.Format("{0}_{1}", parsedObj.MainObject.Key, curIndex);
                this.cacheProvider.Put(tkey, xmlValue);
            }
        }

        public object Get(string location, string key)
        {
            string mainObjKeySetting = (string)cacheProvider.Get(key);
            if (mainObjKeySetting == null || mainObjKeySetting.Length == 0)
                return null;

            string classType;
            CacheParsedObject parsedObj;
            GetParsedObject(key, mainObjKeySetting, out classType, out parsedObj);

            string xmlValue=string.Empty;
            parsedObj.SplittedElements.ForEach(t=>xmlValue+=t);

            using (StringReader rdr = new StringReader(xmlValue))
            {
                //Assembly.Load("Core");
                Type t = Type.GetType(classType);
                XmlSerializer serializer = new XmlSerializer(t);
                return serializer.Deserialize(rdr);
            }
        }

        public void Remove(string location, string key)
        {
            string mainObjKeySetting = (string)cacheProvider.Get(key);
            if (mainObjKeySetting == null || mainObjKeySetting.Length == 0)
                return;

            string classType;
            CacheParsedObject parsedObj;
            GetParsedObject(key, mainObjKeySetting, out classType, out parsedObj);

            int i = 1;
            parsedObj.SplittedElements.ForEach(t => this.cacheProvider.Remove(string.Format("{0}_{1}", parsedObj.MainObject.Key, i++)));
            this.cacheProvider.Remove(parsedObj.MainObject.Key);
        }
        private void GetParsedObject(string key, string mainObjKeySetting, out string classType, out CacheParsedObject parsedObj)
        {
            int from = 1, end = 1;
            classType = string.Empty;
            if (mainObjKeySetting.IndexOf('@') > 0)
            {
                end = int.Parse(mainObjKeySetting.Split('@')[1]);
                classType = mainObjKeySetting.Split('@')[2];
            }

            parsedObj = new CacheParsedObject();
            parsedObj.MainObject = new KeyValuePair<string, string>(key, string.Format("1@{0}@{1}", end, classType));
            for (int i = from; i <= end; i++)
                parsedObj.SplittedElements.Add((string)this.cacheProvider.Get(string.Format("{0}_{1}", parsedObj.MainObject.Key, i)));
        }
        private string Serializer2XMLConvert(object value)
        {
            using (StringWriter sw = new StringWriter())
            {
                XmlSerializer xz = new XmlSerializer(value.GetType());
                xz.Serialize(sw, value);
                return sw.ToString();
            } 
        }
        private void AssureSerializable(object value)
        {
            if (value == null)
                throw new Exception("cache object must be Serializable");
            if (value.GetType().GetCustomAttributes(typeof(SerializableAttribute), true).Count()<=0)
                throw new Exception("cache object must be Serializable");
        }
    }

 

下面是CacheCoordinator的代碼,這個類的加入目的是要加入緩存的冗余機制:

class CacheCoordinator : ICacheCoordinator
    {
        CacheServerWrapper backupCacheServer = new CacheServerWrapper(CacheConfig.CacheConfiguration.BackupCacheServer);
        CacheServersWrapper peerCacheServer = new CacheServersWrapper(CacheConfig.CacheConfiguration.PeerCacheServers);

        public void Put(string key, object value)
        {
            peerCacheServer.Put(key, value); 
            backupCacheServer.Put(key, value); //緩存冗余
        }

        public object Get(string key)
        {
            object o=peerCacheServer.Get(key);
            if (o != null)
                return o;
            return backupCacheServer.Get(key);
        }

        public void Remove(string key)
        {
            peerCacheServer.Remove(key);
            backupCacheServer.Remove(key);
        }
    }

 

剩下的就是具體的CacheProvider和CacheProviderWrapper類了:

public class CacheServerWrapper : ICacheExecutor
    {
        ICacheExecutor executor = null;
        private CacheServerInfo configInfo;
        public CacheServerWrapper(CacheServerInfo configInfo)
        {
            this.configInfo = configInfo;
            ICacheExecutor tmpExecutor = null;
            switch(this.configInfo.ServerType)
            {
                case CacheServerType.HttpRuntime:
                    tmpExecutor = new CacheProvider.HttpRuntimeCacheProvider(configInfo);
                    break;
                case CacheServerType.InMemory:
                    tmpExecutor = new CacheProvider.InMemoryCacheProvider(configInfo);
                    break;
                case CacheServerType.Memcached:
                    tmpExecutor = new CacheProvider.MemcachedCacheProvider(configInfo);
                    break;
                case CacheServerType.Redis:
                    tmpExecutor = new CacheProvider.RedisCacheProvider(configInfo);
                    break;
                default:
                    tmpExecutor = new CacheProvider.HttpRuntimeCacheProvider(configInfo);
                    break;
            }
            executor = tmpExecutor;
        }

        public string FullServerAddress
        {
            get
            {
                return this.configInfo.FullServerAddress;
            }
        }

        public void Put(string key, object value)
        {
            executor.Put(key, value);
        }

        public object Get(string key)
        {
            return executor.Get(key);
        }

        public void Remove(string key)
        {
            executor.Remove(key);
        }
    }

 

只貼出Memcache的操作類

class MemcachedCacheProvider : ICacheExecutor
    {
        private MemcachedClient mc = new MemcachedClient();
        private CacheServerInfo configInfo;
        public MemcachedCacheProvider(CacheServerInfo configInfo)
        {
            this.configInfo = configInfo;

            //初始化池  
            SockIOPool pool = SockIOPool.GetInstance();
            pool.SetServers(new string[] { string.Format("{0}:{1}", configInfo.ServerAddress, configInfo.ServerPort) });//設置連接池可用的cache服務器列表,server的構成形式是IP:PORT(如:127.0.0.1:11211)  
            pool.InitConnections = 3;//初始連接數  
            pool.MinConnections = 3;//最小連接數  
            pool.MaxConnections = 5;//最大連接數  
            pool.SocketConnectTimeout = 1000;//設置連接的套接字超時  
            pool.SocketTimeout = 3000;//設置套接字超時讀取  
            pool.MaintenanceSleep = 30;//設置維護線程運行的睡眠時間。如果設置為0,那么維護線程將不會啟動,30就是每隔30秒醒來一次  

            //獲取或設置池的故障標志。  
            //如果這個標志被設置為true則socket連接失敗,將試圖從另一台服務器返回一個套接字如果存在的話。  
            //如果設置為false,則得到一個套接字如果存在的話。否則返回NULL,如果它無法連接到請求的服務器。  
            pool.Failover = true;

            pool.Nagle = false;//如果為false,對所有創建的套接字關閉Nagle的算法  
            pool.Initialize();
        }
        public void Put(string key, object value)
        {
            mc.Set(key, value);
        }

        public object Get(string key)
        {
            return mc.Get(key);
        }

        public void Remove(string key)
        {
            mc.Delete(key);
        }
    }

 

 

不能忘了可配置性,xml定義及代碼如下:

<?xml version="1.0" encoding="utf-8" ?>
<CacheConfig>
  <MaxCacheEntitySize>1048576</MaxCacheEntitySize><!--1*1024*1024-->
  <PeerCacheServers>
    <CacheServer>
      <ServerType>InMemory</ServerType>
      <ServerAddress>127.0.0.1</ServerAddress>
      <ServerPort>11211</ServerPort>
    </CacheServer>
    <CacheServer>
      <ServerType>InMemory</ServerType>
      <ServerAddress>127.0.0.1</ServerAddress>
      <ServerPort>11212</ServerPort>
    </CacheServer>
  </PeerCacheServers>
  <BackupCacheServer>
    <CacheServer>
      <ServerType>InMemory</ServerType>
      <ServerAddress>127.0.0.1</ServerAddress>
      <ServerPort>11213</ServerPort>
    </CacheServer>
  </BackupCacheServer>
</CacheConfig>

 

讀取配置信息的代碼:

public static class CacheConfiguration
    {
        static CacheConfiguration()
        {
            Load();
        }

        private static void Load()
        {
            PeerCacheServers = new List<CacheServerInfo>();
            BackupCacheServer = null;

            XElement root = XElement.Load(System.IO.Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "CacheConfig.xml"));

            MaxCacheEntitySize = int.Parse(root.Element("MaxCacheEntitySize").Value);
            foreach (var elm in root.Element("PeerCacheServers").Elements("CacheServer"))
            {
                CacheServerInfo srv = new CacheServerInfo();
                srv.ServerAddress = elm.Element("ServerAddress").Value;
                srv.ServerPort = int.Parse(elm.Element("ServerPort").Value);
                srv.ServerType = (CacheServerType)Enum.Parse(typeof(CacheServerType), elm.Element("ServerType").Value);
                PeerCacheServers.Add(srv);
            }
            foreach (var elm in root.Element("BackupCacheServer").Elements("CacheServer"))
            {
                CacheServerInfo srv = new CacheServerInfo();
                srv.ServerAddress = elm.Element("ServerAddress").Value;
                srv.ServerPort = int.Parse(elm.Element("ServerPort").Value);
                srv.ServerType = (CacheServerType)Enum.Parse(typeof(CacheServerType), elm.Element("ServerType").Value);
                BackupCacheServer = srv;
                break;
            }
            if (PeerCacheServers.Count <= 0)
                throw new Exception("Peer cache servers not found.");
            if (BackupCacheServer == null)
                throw new Exception("Backup cache server not found.");
            AssureDistinctFullServerAddress(PeerCacheServers);
        }

        private static void AssureDistinctFullServerAddress(List<CacheServerInfo> css)
        {
            Dictionary<string, int> map = new Dictionary<string, int>();
            foreach(CacheServerInfo csInfo in css)
            {
                if (map.ContainsKey(csInfo.FullServerAddress))
                    throw new Exception(string.Format("Duplicated server address found [{0}].", csInfo.FullServerAddress));
                else
                    map[csInfo.FullServerAddress] = 1;
            }
        }
        public static int MaxCacheEntitySize { get; set; }
        public static List<CacheServerInfo> PeerCacheServers { get; set; }
        public static CacheServerInfo BackupCacheServer { get; set; }
    }

 

 

代碼下載

 

Append New

其實,我們忽略了一些重要的東西:

  1. 如果Memcached, Redis服務器超過了5台以上,通信量上升很快,怎么辦?
  2. 由於取數據牽涉到網絡I/O操作,因此速度依然比較慢,怎么辦?

讓我們來解決吧。

把新的UML圖貼上(下圖中左邊紅框中的是新增的):

本地緩存替換策略:LFU/LRU,其他的有很多。

EventBus是分布式的,下面有講為什么要分布式的。

當Domain層需要獲取數據時的邏輯:

  1. 先查看本地緩存中是否存在數據副本,存在則立刻返回(也沒有網絡I/O了)
  2. 沒有則去redis/memcached獲取,有則返回;並且把數據放入本地cache中
  3. 最后,實在沒有數據,就db里取

當Domain層需要更新數據時的邏輯:

  1. 在本地cache中進行更新操作
  2. 更新分布式緩存
  3. 發布分布式事件,通知其他app server的cache manager去主動拉數據到他們本地緩存

看得出來,加入這個新的角色后,能對下面2項有改善作用:

  1. 降低網絡間的通信流量
  2. 增大本地緩存的命中率

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM