一、需求
移動端系統里有用戶和文章,文章可設置權限對部分用戶開放。現要實現的功能是,用戶瀏覽自己能看的最新文章,並可以上滑分頁查看。
二、數據庫表設計
涉及到的數據庫表有:用戶表TbUser、文章表TbArticle、用戶可見文章表TbUserArticle。其中,TbUserArticle的結構和數據如下圖,字段有:自增長主鍵id、用戶編號uid、文章編號aid。
自增長主鍵和分布式增長主鍵如何選(題外討論):
TbUserArticle的主鍵是自增id,它有個缺陷是,當你的數據庫有主從復制時,主從庫的自增可能因死鎖等原因導致不同步。不過,我們可以知道,這里的TbUserArticle的主鍵id不會用在其它表里,所以可以是自增id。不像用戶表的主鍵,它就不能用自增id,因為用戶表主鍵(uid)會經常出現在其它表中,當主從庫自增不一致時,很多有uid字段的表數據在從庫中就不正確了。用戶表主鍵最好是用分布式增長主鍵算法生成的id(比如Snowflake雪花算法)。
那么你可能就要說了,TbUserArticle的主鍵為什么不直接用雪花算法產生,不管有沒有用,先讓主從庫主鍵值一致總是有恃無恐。要知道,雪花算法產生的id一般是18位,而redis的zset的score是double類型,只能表達到16位"整數"部分(精確的說是9007199254740992=2的53次方)。因此,TbUserArticle的主鍵選擇自增id。那么能不能產生一個16位(具體是53bit)的分布式增長id用於支持zset的score呢,當然也是可以的,因為目前的雪花算法是可以根據實際系統環境壓縮bit位的,怎么壓縮bit位呢,有許多方案,以后有需要我可以把它寫出來。
建議:主鍵一般都要選自增id或分布式增長id,這種主鍵好處多多,它符合自增長(物理存儲時都是在末尾追加數據,減少數據移動)、唯一性、長度小、查詢快的特性,是聚集索引的很好選擇。
三、redis緩存設計-zset
zset的作法及其優點說明:
1.zset的score倒序取數可以很好的滿足取最新數據的需求。
2.用TbUserArticle的文章編號當value,用自增長id當score。自增id的唯一性可很方便的取下一頁數據,直接取小於上次最后一筆的score即可(用lastScore表示)。而如果用文章的時間做score,則要考慮兩筆文章的時間是同分同秒問題,當lastScore落在同分同秒的兩篇文章之間時,就尷尬了,雖然有解,但麻煩了一點。有時的場景你用不了自增id當score,只能用文章時間,那怎么解決呢,方案就是當是同分同秒時,再根據文章編號做比較就好了,zset的score相同時,也是再根據value排序的,這塊的代碼實現請看下文第五點,只需稍微改點代碼即可。
3.當新增或重新添加一項時,zset也會保持score排序。而如果用的是redis的list,一般就得從db重載緩存,新增進來的數據項就算是最新的,也不敢直接添加到list第一筆,因為並發情況下,保證不了最新就是在第一筆;至於重新添加進非最新項,那更是要從db取數重新裝載緩存(一般是直接刪除緩存,要用的時候才裝載)。
4.第一次從db加載數據到zset時,可只取前N筆到zset。因為我們移動端的數據瀏覽,一般是只看最新N筆,當看到昨天瀏覽過的數據一般就不會再往下瀏覽。
5.控制zset為固定長度,防止一直增長,一是減少緩存開銷,二是隊列長度越短操作性能越高。而且redis服務端有兩個參數:zset-max-ziplist-entries(zset隊列長度,默認值128)和 zset-max-ziplist-value(zset每項大小,默認值64字節),它們的作用是,當zset長度小於128,且每個元素的大小小於64字節時,會啟用ziplist(壓縮雙向鏈表),它的內存空間可以減少8倍左右,而且操作性能也更快。如果不滿足這兩個條件則是普通的skiplist(跳躍表)。另,數據結構hash和list默認長度是512。如果系統有100萬個用戶,每個用戶都有自己的隊列緩存,那么使用ziplist將節省非常大的內存空間,並提升很大的性能。
注意,當從zset移除一項數據,則看場景是否需要清空隊列。否則有可能添加進來了一項很舊的數據,它會跑到緩存隊列最底部,如果此舊數據比db中未進隊列的數據還舊,那么隊列中的數據就不正確了。(此時,用戶滑到緩存最后一頁時,就有可能瀏覽到這項不正確的數據,為什么是“有可能”,因為當取到zset最后一筆,很可能不夠一頁(一頁10筆計算的話,90%會取不夠一頁),而不夠一頁就會從db直接取一頁,從db直接取就不會有這項不正確的數據。而當zset又添加進一項新數據,末端那筆舊數據就會被T出隊列(因為隊列保持固定長度),zset數據又恢復正確了。不管怎樣,這種問題幾率雖不高,也是有解決方案,可搞個臨界點處理此問題,不細說,否則又是長篇大論,最好的方案就是根據實際場景設計,比如從zset隊列移除數據的情況多不多)。而如果添加到zset的數據都是最新數據,則不會有此問題。
當用唯一主鍵id做score時,這可是非常有用,你可以直接根據id定位到項了,至於如何大用它,我會再出篇博客。
四、代碼實現
從redis緩存按頁取數一般要考慮的點:
1.當根據cacheKey未取到數據時(可能是緩存過期了導致redis無此cacheKey數據),則觸發重載數據(reload):從db取limit N筆數據,裝載到redis zset隊列中,並直接取N筆的第一頁數據返回;
2.如果db本身也無對應數據,則添加"no_db_mark"標識到cacheKey隊列中,下次請求則不會再觸發db重載數據;
3.當取到緩存末尾時,從db取一頁數據直接返回。這種情況是很少的,要根據業務場景合理規划緩存長度。
上代碼:
代碼注釋比較詳細和有用,請直接看代碼。
其中,批量添加數據到zset的函數AddItemsToZset很有用,它使用lua一次性添加多筆數據到zset(注意,使用lua時,要保證lua執行快,否則它會阻塞其它命令的執行),經測試:AddItemsToZset添加1w筆數據,只需要39ms;10w筆需要448ms。因為我們只取前N筆數據到緩存,因此一般不會添加超過1w筆。
另一個通用有用的函數是GetPageDataByLastScoreFromRedis,它支持從指定的score開始取pageSize筆數據,即支持了zset分頁。它是第二頁(及之后)的取數,而如果取第一頁取數,則直接用redis原生函數即可redis.GetRangeWithScoresFromSortedSetDesc(cacheKey, 0, pageSize - 1);。
/// <summary> /// 分頁取數幫助類 /// </summary> public class PageDataHelper { public readonly static string NoDbDataMark = "no_db_data";//在zset中標識db也無數據 public static RedisHandle RedisClient = new RedisHandle();//redis操作對象示例 public static DbHandleBase DbHandle = new SqlServerHandle("Data Source=.;Initial Catalog=Test;User Id=sa;Password=123ewq;");//db操作對象示例 /// <summary> /// 按頁取數。返回文章編號列表。 /// </summary> /// <param name="lastInfo">上一頁最后一筆的score,如果為空,則說明是取第一頁。</param> /// <param name="getPast">true,用戶上滑瀏覽下一頁數據;false,用戶上滑瀏覽最新一頁數據</param> /// <returns>返回key-value列表,key就是文章編號,value就是自增id(可用於lastScore)</returns> public static IDictionary<string, double> GetUserPageData(string uid, int pageSize, string lastInfo, bool getPast) { long lastScore = 0; //1.解析lastInfo信息。->getPast為false,則固定取最新第一頁數據,不用解析。lastInfo為空,則也不用解析,默認第一頁 if (getPast && !string.IsNullOrWhiteSpace(lastInfo)) { lastScore = long.Parse(lastInfo);//外層有try..catch.. } string cacheKey = $"usr:art:{uid}"; bool isFirstPage = lastScore <= 0; using (IRedisClient redis = RedisClient.GetRedisClient()) { if (isFirstPage) { //2.第一頁取數 var items = redis.GetRangeWithScoresFromSortedSetDesc(cacheKey, 0, pageSize - 1); if (items.Count == 0) { //2.1 無數據時,則從db reload數據 items = ReloadDataToRedis(redis, cacheKey, uid, pageSize); if (items.Count == 0 && pageSize > 0) { //如果db中也無數據,則向zset中添加一筆NoDbDataMark標識 redis.AddItemToSortedSet(cacheKey, NoDbDataMark, double.MaxValue); } } else if (items.Count == 1 && items.ContainsKey(NoDbDataMark)) { //2.2如果取到的是NoDbDataMark標識,則說明是空數據,則要Clear,返回空列表 items.Clear(); } //設置緩存有效期,要根據業務場景合理設置緩存有效期,這邊以7天為例。 redis.ExpireEntryIn(cacheKey, new TimeSpan(7, 0, 0, 0)); //2.3 第一頁,有多少就返回多少數據。數據如果不夠一頁,說明本身數據不夠。 return items; } else { //3.第二頁(及之后)取數 var items = GetPageDataByLastScoreFromRedis(redis, cacheKey, pageSize, lastScore); if (items.Count < pageSize) { //3.1 如果取不夠數據時,就到db取。如果db也不能取到一頁數據,前端會顯示無更多數據,不會一直db取。 return GetPageDataByLastScoreFromDb(uid, pageSize, lastScore); } //3.2 如果緩存數據足夠,則返回緩存的數據。 return items; } } } public static Dictionary<string, double> ReloadDataToRedis(IRedisClient redis, string cacheKey, string uid, int pageSize, string bizId = "") { //1.db取數 取top 1000筆數據。不需要全取到緩存。 IEnumerable<dynamic> models; using (var conn = DbHandle.CreateConnectionAndOpen()) { var sql = $"select top 1000 id,aid from TbUserArticle where uid=@uid order by id desc;";// limit 1000;"; models = conn.Query<dynamic>(sql, new { uid = uid }); } if (models.Count() <= 0) return new Dictionary<string, double>(); //2.數據加載到redis緩存。 var itemsParam = new Dictionary<string, double>(); foreach (dynamic model in models) { itemsParam.Add((string)model.aid, (double)model.id); } //使用lua一次性添加數據到緩存。lua語句要執行快,經測試添加1w筆數據,只需要39ms;10w筆需要448ms。因為sql中有limit,因此一般不會添加超過1w筆。 //因為是原子性操作、並且是zset結構,這邊不需要加鎖。db取到數據應第一時間加載到redis。 AddItemsToZset(redis, cacheKey, itemsParam, true, true); if (pageSize <= 0) return null; //3.直接由models返回第一頁數據。 return models.Take(pageSize).ToDictionary(x => (string)x.aid, y => (double)y.id); } public static Dictionary<string, double> GetPageDataByLastScoreFromDb(string uid, int pageSize, double lastScore) { //db取一頁數據。 var sql = $"select top {pageSize} id,aid from TbUserArticle where uid=@uid and id<{lastScore}order by id desc;";// limit {pageSize};"; using (var conn = DbHandle.CreateConnectionAndOpen()) { return conn.Query<dynamic>(sql, new { uid = uid }).ToDictionary(x => (string)x.aid, y => (double)y.id); } } #region 通用函數 /// <summary> /// ZSet第一頁之后的取數,從lastScore開始取pageSize筆數據(第一頁之后才有lastScore)。 /// 使用lua,保證原子性操作。 /// </summary> public static Dictionary<string, double> GetPageDataByLastScoreFromRedis(IRedisClient redis, string zsetKey, int pageSize, double lastScore) { //ZREVRANGEBYSCORE: from lastScore to '-inf'. var luaBody = @"local sets = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], '-inf', 'WITHSCORES'); local result = {}; local index=0; local pageSize=ARGV[2]*1; local lastScore=ARGV[1]*1; for i = 1, #sets, 2 do if index>=pageSize then break; end if (lastScore>sets[i+1]*1) then table.insert(result, sets[i]); table.insert(result, sets[i+1]); index=index+1; end end return result"; //ARGV[1]:lastScore ARGV[2]:pageSize var list = redis.ExecLuaAsList(luaBody, new string[] { zsetKey }, new string[] { lastScore.ToString(), pageSize.ToString() }); var result = new Dictionary<string, double>(); for (var i = 0; i < list.Count; i += 2) { result.Add(list[i], Convert.ToDouble(list[i + 1])); } return result; } /// <summary> /// 添加一項到zset緩存中。 /// </summary> /// <param name="item">要添加到zset的數據項</param> /// <param name="maxCount">控制zset最大長度,如果為0,則不控制。</param> /// <returns></returns> public static string AddItemToZset(IRedisClient redis, string zsetKey, KeyValuePair<string, double> item, int maxCount = 0) { var items = new Dictionary<string, double>() { { item.Key, item.Value } }; return AddItemsToZset(redis, zsetKey, items); } /// <summary> /// 添加多項到zset緩存中。 /// </summary> /// <param name="items">要添加到zset的數據列表</param> /// <param name="hasCacheExpire">緩存zsetKey是否有設置緩存有效期。如果有設置緩存有效期,則當緩存中無數據時,可能是緩存過期;而如果緩存無有效期,緩存中無數據,就是db和緩存都無數據</param> /// <param name="isReload">是否是reload情況,true重載情況;false追加</param> /// <param name="maxCount">控制zset最大長度,如果為0,則不控制。</param> /// <returns></returns> public static string AddItemsToZset(IRedisClient redis, string zsetKey, Dictionary<string, double> items, bool hasCacheExpire = true , bool isReload = false, int maxCount = 0) { //!isReload,是因為如果isReload=true情況無數據,則也要進來重載隊列為無數據(即,如果之前有數據要重載為無數據) if (!isReload && items.Count <= 0) return null; var argArr = new List<string>(items.Count * 2 + 2);//lua參數數組 //var hasCacheExpire = cacheValidTime != null; //第一個lua參數是hasCacheExpire argArr.Add(hasCacheExpire ? "1" : "0"); //第二個lua參數是maxCount argArr.Add(maxCount.ToString()); //組合lua其它參數列表:ZADD的參數 foreach (var item in items) { //Add score。 //ZADD KEY_NAME SCORE1 VALUE1 argArr.Add(item.Value.ToString()); argArr.Add(item.Key); } #region lua /* * 以下lua命令說明。 * 1.ZREVRANGE從大到小取第一筆數據firstMark; * 2.緩存有設置有效期時(hasCacheExpire=1),如果第一筆數據firstMark為nil,則說明列表是空(失效key、未生成key),則不做任何處理,直接返回字符串not_exist_key。因為可能是用戶失效數據,用戶長期未訪問,則不添加,后繼來訪問時重載數據。 * 3.如果firstMark標識為no_db_data,則是被api標識為db沒數據,而此時因要ZADD數據進來,因此要把此標識刪除。其中,ZREMRANGEBYRANK從小到大刪除,-1是倒數第一筆。 * 4.ZADD數據進來 * 5.KeepLength保持隊列長度操作。如果隊列長度(由ZCARD獲取)超過指定的maxCount,則從隊列第一筆開始刪除多余元素,即score最小開始刪除。 * 6.maxCount為>0才KeepLength。返回數值:curCount - maxCount。(可以用返回值簡單算出隊列當前長度curCount)。如果返回值小於等於0則說明沒有觸發刪除操作。 * 7.maxCount為<=0時,直接返回'no_remove'。 */ //清空原來,重新加載數據的情況 const string reloadLua = "redis.call('DEL', KEYS[1]) "; //追加數據到zset的情況 const string addToLua = @"local firstMark = redis.call('ZREVRANGE',KEYS[1],0,0); local hasCacheExpire=ARGV[1]*1; if hasCacheExpire==1 and firstMark and firstMark[1]==nil then return 'not_exist_key'; end if firstMark and firstMark[1]=='{0}' then redis.call('ZREMRANGEBYRANK', KEYS[1], -1,-1); end"; const string constAllLua = @"{0} for i=3, #ARGV, 2 do redis.call('ZADD', KEYS[1], ARGV[i], ARGV[i+1]); end local maxCount=ARGV[2]*1; if maxCount>0 then local curCount= redis.call('ZCARD', KEYS[1]); local removeCount=curCount - maxCount; if removeCount>0 then redis.call('ZREMRANGEBYRANK', KEYS[1], 0,removeCount-1); end return removeCount; end return 'no_remove';"; #endregion var luaBody = string.Format(constAllLua, isReload ? reloadLua : string.Format(addToLua, NoDbDataMark)); var luaResult = redis.ExecLuaAsString(luaBody, new string[] { zsetKey }, argArr.ToArray()); return luaResult; } #endregion }
五、用時間做score,同分同秒問題解決
如果是用時間做score,會有同分同秒問題,比如在TbUserArticle里增加了“時間”欄位。解決方法代碼只需稍作微改,參數除了lastScore(此時是“時間”),還需要傳lastAid(文章編號)。
1. 緩存處理修改,只動了以下紅色粗體字。(注:當zset的兩筆數據score相同時,是再根據value排序的):
public static Dictionary<string, double> GetPageDataByLastScoreFromRedis(IRedisClient redis, string zsetKey, int pageSize, double lastScore,string lastAid) { //ZREVRANGEBYSCORE: from lastScore to '-inf'. var luaBody = @"local sets = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], '-inf', 'WITHSCORES'); local result = {}; local index=0; local pageSize=ARGV[2]*1; local lastScore=ARGV[1]*1; local lastAid=ARGV[3]; for i = 1, #sets, 2 do if index>=pageSize then break; end if (lastScore>sets[i+1]*1) or (lastScore==sets[i+1]*1 and lastAid>sets[i]) then table.insert(result, sets[i]); table.insert(result, sets[i+1]); index=index+1; end end return result"; //ARGV[1]:lastScore ARGV[2]:pageSize var list = redis.ExecLuaAsList(luaBody, new string[] { zsetKey }, new string[] { lastScore.ToString(), pageSize.ToString(), lastAid }); var result = new Dictionary<string, double>(); for (var i = 0; i < list.Count; i += 2) { result.Add(list[i], Convert.ToDouble(list[i + 1])); } return result; }
2.db取數修改
reload sql
$"select top 1000 時間,aid from TbUserArticle where uid=@uid order by 時間 desc,aid desc;";
db中取一頁的sql
$"select top {pageSize} 時間,aid from TbUserArticle where uid=@uid and (時間<{lastScore} or (時間={lastScore} and aid<'{lastAid}')) order by 時間 desc,aid desc;";
這樣就可以了,中心思想就是:當“時間={lastScore} ”,那么就增加文章編號比較條件。