Redis到底該如何利用(三)?


上兩篇受益匪淺,秉着趁熱打鐵,不挖到最深不罷休的精神,我決定追加這篇。上一篇里最后我有提到實現分級緩存管理應該是個可行的方案,因此今天特別實踐了一下。不過緩存分級之后也發現了一些問題,例如下圖:

當appServerA修改了數據,並同步到Redis/DB之后,如何讓appServerB也能更新本地緩存呢?雖然Redis的出現是為了解決這個問題的,但是分級方案里,MemoryCache還是需要保留。那么如何保存呢?我嘗試了下面的幾種方式,現在我們逐一來看。

 全數據增量同步

所謂全數據校驗,即所有的緩存數據首先都同步至Redis,然后根據數據的時間戳來進行同步。分解步驟如下:

  1. 首先將緩存的數據初始化,同步至Redis和MemoryCache,保持初始數據的同步
  2. 第二步,每當操作了數據之后,給記錄一個時間戳標識最近的更新。
  3. MemoryCache定時或者每次取數據的時候,以最近的一個同步的時間戳開始同步到現在的時間戳

上面的方案咱們落地到.NET+Redis又該怎么處理呢?

第一步很簡單直接跳過,第二步就有點問題了,這個時間戳要便於redis的排序和獲取,考慮到這些問題,我覺得取Linux的數字型時間戳比較靠譜,再配合SortSet來建立索引,進行同步,看起來的確不錯。那么來看下代碼:

本篇還是以User為例,可能場景不適合,大家將就理解

public void UpdateToRedis(User user)
{
            var redis = ConnectionMultiplexer.Connect("localhost");
            var db = redis.GetDatabase();
            
            TimeSpan ts = DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1, 0, 0, 0, 0);
            double time = Convert.ToInt64(ts.TotalSeconds);//計算Unix時間戳
            db.SortedSetAdd("capqueen:user:index", user.Id, time);//更新記錄
            var json = JsonConvert.SerializeObject(user);
            db.StringSet("capqueen:user" + user.Id, json);//user記錄
}

 同步:

public void Sync(double timespan)
{
            var redis = ConnectionMultiplexer.Connect("localhost");
            var db = redis.GetDatabase();

            TimeSpan ts = DateTime.Now.ToUniversalTime() - new DateTime(1970, 1, 1, 0, 0, 0, 0);
            double time = Convert.ToInt64(ts.TotalSeconds);//計算Unix時間戳
            
            //同步時間范圍內的記錄,這里增加時間范圍,以防止一些數據不准確的問題
            var members = db.SortedSetRangeByScore("capqueen:user:index", timespan, time);

            var keys = members.ToList().Select<RedisValue, RedisKey>(i => i.ToString());
            var values = db.StringGet(keys.ToArray());

            //構造List Json以加速解析
            var portsJson = new StringBuilder("[");

            values.ToList().ForEach(item =>
            {
                if (!string.IsNullOrWhiteSpace(item))
                {
                    portsJson.Append(item).Append(",");
                }
            });

            portsJson.Append("]");

            var users = JsonConvert.DeserializeObject<List<User>>(portsJson.ToString());

            //和內存的List<User>做同步
            ...
            //END
}

上面的代碼只是實例,實際運行的時候感覺還不是特別靠譜。

消息通知

增量同步是好,但是同時時機也是個問題,時機不對,就會影響用戶體驗。而且感覺我們還是無法很好的掌控數據。那么有沒有一種方式可以及時的通知到appServer更新緩存呢?這里我腦子里冒出來的是.NET Event機制。

我覺得.NET因為有了優秀的Event機制,才讓我覺得它使用起來很方便

但是這里是服務器之間的通信,我想非Scoket莫屬了,之間做過Scoket雙向通信,印象特別深刻,這樣的場景讓我自然而然的想到了這個方案。可是特別的增加一個socket,本身復雜的架構要更復雜了,還是尋求一個靠譜的中間件比較適合。看過Redis的功能列表的同學,一定沒忘記Redis還有一個訂閱發布,pub/sub功能。於是我就利用了這個功能,設計了如下的方案:

  1. 為app增加一個sub線程,用於接收redis訂閱消息
  2. 在每一個緩存對象更新的同時,增加異步pub消息到redis

先來看下Redis的,pub/sub功能,請看文檔

redis提供指定channel的訂閱發布功能,每一個Client可以訂閱指定的channel消息,也可以向指定的channel發送消息。

利用ServiceStack.Redis的pub/sub功能實現如下:

using (var redisConsumer = new RedisClient(TestConfig.SingleHost))
using (var subscription = redisConsumer.CreateSubscription())
{
    subscription.OnSubscribe = channel =>
    {
        //訂閱事件
    };
    subscription.OnUnSubscribe = channel =>
    {
       //退訂事件
    };
    subscription.OnMessage = (channel, msg) =>
    {
        // 這里的msg,我為了測試定義成了userid
        var user= redisConsumer.As<User>().GetById(msg);//從Redis獲取記錄
        UpdateLocalCache(user);//更新本地
    };

  
    subscription.SubscribeToChannels("capqueen:redis:events"); //blocking
}

 

發送:

using (var redisPublisher = new RedisClient("localhost"))
{
    redisPublisher.PublishMessage("capqueen:redis:events", userId);//發送消息
}

 

這里我覺得message應該定義成一個消息體,接收處理應該根據具體的消息定義來具體處理,因為訂閱發布的通道顯然應該是合理利用,如果一個業務一個通道,有點太多了。

當然這個實現起來又要講一大篇了,這里不多說明。

總結

本篇文章,根據上一篇提的方案做了一些實現,不足之處太多了。例如事件丟了怎么辦?如何正確的處理消息缺失,記錄少了之類的特殊情況應該是很重要的。我暫時還沒想到方案,不知道前輩們如何看待這樣的問題。可能我還是濫用了Redis,為了技術而技術了,不過不嘗試碰下壁,怎么能得到成長。所謂兼聽則明 偏信則暗,我感覺自己有點不見棺材不落淚了。請前輩們不吝指教!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM