高並發時,使用Redis應注意的問題【緩存穿透、緩存擊穿.、緩存雪崩】


十年河東,十年河西,莫欺少年窮

學無止境,精益求精

首先說下,我的 Redis 系列博客如下:

[置頂] 高並發時,使用Redis應注意的問題【緩存穿透、緩存擊穿.、緩存雪崩】

windows環境下配置Redis主從復制-一主二仆,薪火相傳、反客為主、哨兵模式

Redis 持久化技術 ,大名鼎鼎的Rdb和Aof,你會選誰呢?

簡單介紹下Redis消息隊列,實際生產環境中,大數據高並發時,不建議使用Redis做消息隊列中間件

Redis 事務,和傳統的關系型數據庫ACID並不同,別搞混了

Redis常用配置redis.conf介紹,別把默認配置部署到到服務器,否則,會被領導罵的

C# Nuget程序集StackExchange.Redis操作Redis 及 Redis 視頻資源 及 相關入門指令 牛逼不,全都有

Redis 的基礎數據類型

Window環境下安裝Redis 並 自啟動Redis 及 Redis Desktop Manager

進入正文

緩存的出現解決了數據庫壓力的問題,但是當以下情況發生的時候,緩存就不在起到作用了,緩存穿透、緩存擊穿、緩存雪崩這三種情況。

1. 緩存穿透:

我們的程序中用緩存的時候一般采取的是先去緩存中查詢我們想要的緩存數據,如果緩存中不存在我們想要的數據的話,緩存就失去了作用(譬如緩存失效),這時我們就是需要伸手向DB庫要數據,如果這種動作過多數據庫就崩潰了。

這種情況需要我們去預防了,比如說:我們向緩存獲取一個用戶信息,但是故意去輸入一個緩存中不存在的用戶Key,這樣就避過了緩存,把壓力重新轉移到數據上面了。

對於這種問題我們可以采取:

因為緩存查不到用戶信息,數據庫也查詢不到用戶信息,我們就把訪問的數據進行緩存,這時候就可以避免重復訪問,順利把壓力重新轉向緩存中,有人會有疑問了,當訪問的參數有上萬個都是不重復的參數,並且都是可以躲避緩存的怎么辦,我們同樣把數據存起來設置一個較短過期時間清理緩存。

示例代碼如下:

        [HttpGet]
        [Route("RedisGet")]
        public IActionResult RedisGet(string key)
        {
            if (rd.KeyExists(key))
            {
                /*
                 * 如果緩存中存在,則直接返回結果
                 */
                var result = rd.StringGet(key);
                return Ok(result);
            }
            else
            {
                /*
                 * 如果緩存中不存在,則需要結合數據庫進行查詢,但必須采用相應的策略,防止惡意【緩存擊穿】。
                 * 數據庫查詢部分,
                 * 如果數據庫查詢到結果,則對結果進行緩存,並返回結果。
                 * 如果數據庫查詢不到結果,則對請求的數據進行緩存,防止緩存擊穿。
                 * 除了上述比較被動的防御以外,我們還可以采取一段時間內限制請求次數來達到惡意攻擊行為。
                 */
                return Ok();
            }
        }

2. 緩存擊穿:

事情是這樣的,對於一些設置了過期時間的緩存KEY,過期的時候,程序被高並發訪問了(此時緩存已失效),這個時候由於緩存失效,訪問壓力也就轉移到了數據庫身上,高並發情況下,數據庫往往扛不住那么多請求。

針對這種情況,我們可以使用互斥鎖(Mutex)來解決問題,

互斥鎖原理:通俗的描述就是,一萬個用戶訪問了,但是只有一個用戶可以拿到訪問數據庫的權限。

當這個用戶拿到這個權限之后重新創建緩存,這個時候剩下的訪問者因為沒有拿到權限,就原地等待着去訪問緩存。

邏輯上‘永不過期’:有人就會想了,我設置了過期時間,但我的系統中有一個定時的服務一直在跑,這個服務是用於判斷緩存是否即將過期,如果發現即將過期的緩存,通過定時服務來更新緩存,這個時候緩存中的數據在邏輯上就會‘永不過期’了。

 比如,定時服務每10分鍾跑一次,但當我們發現緩存的過期時間小於10分鍾了,我們通過服務來更新緩存,達到‘永不過期’的目的。

互斥鎖解決方案:

using System;
using System.Threading;

namespace ConsoleApp1
{
    class shareRes
    {
        public static int count = 0;
        public static Mutex mutex = new Mutex();
    }

    class IncThread
    {
        int number;
        public Thread thrd;
        public IncThread(string name, int n)
        {
            thrd = new Thread(this.run);
            number = n;
            thrd.Name = name;
            thrd.Start();
        }
        void run()
        {
            Console.WriteLine(thrd.Name + "正在等待 the mutex");
            //申請
            shareRes.mutex.WaitOne();
            Console.WriteLine(thrd.Name + "申請到 the mutex");
            do
            {
                Thread.Sleep(1000);
                shareRes.count++;
                Console.WriteLine("In " + thrd.Name + "ShareRes.count is " + shareRes.count);
                number--;
            } while (number > 0);
            Console.WriteLine(thrd.Name + "釋放 the nmutex");
            //  釋放
            shareRes.mutex.ReleaseMutex();
        }
    }
    class DecThread
    {
        int number;
        public Thread thrd;
        public DecThread(string name, int n)
        {
            thrd = new Thread(this.run);
            number = n;
            thrd.Name = name;
            thrd.Start();
        }
        void run()
        {
            Console.WriteLine(thrd.Name + "正在等待 the mutex");
            //申請
            shareRes.mutex.WaitOne();
            Console.WriteLine(thrd.Name + "申請到 the mutex");
            do
            {
                Thread.Sleep(1000);
                shareRes.count--;
                Console.WriteLine("In " + thrd.Name + "ShareRes.count is " + shareRes.count);
                number--;
            } while (number > 0);
            Console.WriteLine(thrd.Name + "釋放 the nmutex");
            //  釋放
            shareRes.mutex.ReleaseMutex();
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            IncThread mthrd1 = new IncThread("線程1 thread ", 5);
            DecThread mthrd2 = new DecThread("線程2 thread ", 5);
            mthrd1.thrd.Join();
            mthrd2.thrd.Join();
            Console.Read();
        }
    }
}

關於互斥鎖解決方案,我們可以通過了解互斥鎖(Mutex)來進行解決。

邏輯上‘永不過期’解決方案:

需要定義一個定時的服務,具體請參考 .netcore控制台->定時任務Quartz ,總之通過定時檢測緩存過期時間來更新即將過期的緩存。

3. 緩存雪崩:

是指多種緩存設置了同一時間過期,這個時候大批量的數據訪問來了,(緩存失效)數據庫DB的壓力又上來了。

解決方法在設置過期時間的時候,在過期時間的基礎上增加一個隨機數,盡可能的保證緩存不會大面積的同時失效,說白了,就是緩存的過期時間不能大批量相同。

以上便是使用Redis緩存應注意的三個方面及解決方案。

最后,

順便貼出一個NetCore的緩存幫助類,封裝的比較簡單,但也是有用的,如下:

using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace FranchiseeCommon
{
    /// <summary>
    /// Redis操作類
    /// 老版用的是ServiceStack.Redis
    /// .Net Core使用StackExchange.Redis的nuget包
    /// </summary>
    public class RedisHelper
    {
        //redis數據庫連接字符串
        private string _conn = "127.0.0.1:6379";
        private int _db = 0;

        //靜態變量 保證各模塊使用的是不同實例的相同鏈接
        private static ConnectionMultiplexer connection;

        /// <summary>
        /// 構造函數
        /// </summary>
        public RedisHelper() { }
        /// <summary>
        /// 構造函數
        /// </summary>
        /// <param name="db"></param>
        /// <param name="connectStr"></param>
        public RedisHelper(int db, string connectStr)
        {
            _db = db;
            _conn = connectStr;
        }

        /// <summary>
        /// 緩存數據庫,數據庫連接
        /// </summary>
        public ConnectionMultiplexer CacheConnection
        {
            get
            {
                try
                {
                    if (connection == null || !connection.IsConnected)
                    {
                        connection = new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(_conn)).Value;
                    }
                }
                catch (Exception ex)
                {
                    return null;
                }
                return connection;
            }
        }

        /// <summary>
        /// 緩存數據庫
        /// </summary>
        public IDatabase CacheRedis => CacheConnection.GetDatabase(_db);


        #region --KEY/VALUE存取--
        /// <summary>
        /// 單條存值
        /// </summary>
        /// <param name="key">key</param>
        /// <param name="value">The value.</param>
        /// <returns><c>true</c> if XXXX, <c>false</c> otherwise.</returns>
        public bool StringSet(string key, string value)
        {
            return CacheRedis.StringSet(key, value);
        }

        /// <summary>
        /// 保存單個key value
        /// </summary>
        /// <param name="key">Redis Key</param>
        /// <param name="value">保存的值</param>
        /// <param name="expiry">過期時間</param>
        /// <returns></returns>
        public bool StringSet(string key, string value, TimeSpan? expiry = default(TimeSpan?))
        {
            return CacheRedis.StringSet(key, value, expiry);
        }

        /// <summary>
        /// 保存多個key value
        /// </summary>
        /// <param name="arr">key</param>
        /// <returns></returns>
        public bool StringSet(KeyValuePair<RedisKey, RedisValue>[] arr)
        {
            return CacheRedis.StringSet(arr);
        }

        /// <summary>
        /// 批量存值
        /// </summary>
        /// <param name="keysStr">key</param>
        /// <param name="valuesStr">The value.</param>
        /// <returns><c>true</c> if XXXX, <c>false</c> otherwise.</returns>
        public bool StringSetMany(string[] keysStr, string[] valuesStr)
        {
            var count = keysStr.Length;
            var keyValuePair = new KeyValuePair<RedisKey, RedisValue>[count];
            for (int i = 0; i < count; i++)
            {
                keyValuePair[i] = new KeyValuePair<RedisKey, RedisValue>(keysStr[i], valuesStr[i]);
            }

            return CacheRedis.StringSet(keyValuePair);
        }

        /// <summary>
        /// 保存一個對象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="obj"></param>
        /// <returns></returns>
        public bool SetStringKey<T>(string key, T obj, TimeSpan? expiry = default(TimeSpan?))
        {
            string json = JsonConvert.SerializeObject(obj);
            return CacheRedis.StringSet(key, json, expiry);
        }

        /// <summary>
        /// 追加值
        /// </summary>
        /// <param name="key"></param>
        /// <param name="value"></param>
        public void StringAppend(string key, string value)
        {
            ////追加值,返回追加后長度
            long appendlong = CacheRedis.StringAppend(key, value);
        }

        /// <summary>
        /// 獲取單個key的值
        /// </summary>
        /// <param name="key">Redis Key</param>
        /// <returns></returns>
        public RedisValue GetStringKey(string key)
        {
            return CacheRedis.StringGet(key);
        }

        /// <summary>
        /// 根據Key獲取值
        /// </summary>
        /// <param name="key">鍵值</param>
        /// <returns>System.String.</returns>
        public string StringGet(string key)
        {
            try
            {
                return CacheRedis.StringGet(key);
            }
            catch (Exception ex)
            {
                return null;
            }
        }

        /// <summary>
        /// 獲取多個Key
        /// </summary>
        /// <param name="listKey">Redis Key集合</param>
        /// <returns></returns>
        public RedisValue[] GetStringKey(List<RedisKey> listKey)
        {
            return CacheRedis.StringGet(listKey.ToArray());
        }

        /// <summary>
        /// 批量獲取值
        /// </summary>
        public string[] StringGetMany(string[] keyStrs)
        {
            var count = keyStrs.Length;
            var keys = new RedisKey[count];
            var addrs = new string[count];

            for (var i = 0; i < count; i++)
            {
                keys[i] = keyStrs[i];
            }
            try
            {

                var values = CacheRedis.StringGet(keys);
                for (var i = 0; i < values.Length; i++)
                {
                    addrs[i] = values[i];
                }
                return addrs;
            }
            catch (Exception ex)
            {
               
                return null;
            }
        }

        /// <summary>
        /// 獲取一個key的對象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <returns></returns>
        public T GetStringKey<T>(string key)
        {
            return JsonConvert.DeserializeObject<T>(CacheRedis.StringGet(key));
        }
        #endregion


        #region --刪除設置過期--
        /// <summary>
        /// 刪除單個key
        /// </summary>
        /// <param name="key">redis key</param>
        /// <returns>是否刪除成功</returns>
        public bool KeyDelete(string key)
        {
            return CacheRedis.KeyDelete(key);
        }

        /// <summary>
        /// 刪除多個key
        /// </summary>
        /// <param name="keys">rediskey</param>
        /// <returns>成功刪除的個數</returns>
        public long KeyDelete(RedisKey[] keys)
        {
            return CacheRedis.KeyDelete(keys);
        }

        /// <summary>
        /// 判斷key是否存儲
        /// </summary>
        /// <param name="key">redis key</param>
        /// <returns></returns>
        public bool KeyExists(string key)
        {
            return CacheRedis.KeyExists(key);
        }

        /// <summary>
        /// 重新命名key
        /// </summary>
        /// <param name="key">就的redis key</param>
        /// <param name="newKey">新的redis key</param>
        /// <returns></returns>
        public bool KeyRename(string key, string newKey)
        {
            return CacheRedis.KeyRename(key, newKey);
        }

        /// <summary>
        /// 刪除hasekey
        /// </summary>
        /// <param name="key"></param>
        /// <param name="hashField"></param>
        /// <returns></returns>
        public bool HaseDelete(RedisKey key, RedisValue hashField)
        {
            return CacheRedis.HashDelete(key, hashField);
        }

        /// <summary>
        /// 移除hash中的某值
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="dataKey"></param>
        /// <returns></returns>
        public bool HashRemove(string key, string dataKey)
        {
            return CacheRedis.HashDelete(key, dataKey);
        }

        /// <summary>
        /// 設置緩存過期
        /// </summary>
        /// <param name="key"></param>
        /// <param name="datetime"></param>
        public void SetExpire(string key, DateTime datetime)
        {
            CacheRedis.KeyExpire(key, datetime);
        }
        #endregion
    }
}
View Code

NetCore配置文件為:

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "tokenManagement": {
    "secret": "123456123456123456",
    "issuer": "webapi.cn",
    "audience": "WebApi",
    "accessExpiration": 120,
    "refreshExpiration": 60
  },
  "ConnectionStrings": {
    "aixueshi_temp1Context": "我的數據庫連接;",
    "RedisConnectionStrings": "127.0.0.1:6379"
  }
}

控制器端調用的代碼如下:

using System;
using System.Collections.Generic;
using System.IdentityModel.Tokens.Jwt;
using System.Linq;
using System.Threading.Tasks;
using FranchiseeApi.Helper;
using FranchiseeCommon;
using FranchiseeDto;
using FranchiseeDto.Franchisee;
using FranchiseeInterface.Franchisee;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.RazorPages;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace FranchiseeApi
{
    [Route("Api/V1/RedisApi")]
    [ApiExplorerSettings(GroupName = "V1")]
    public class RedisApiController : ControllerBase
    {
        private IConfigurationRoot ConfigRoot;
        private readonly RedisHelper rd;
        public RedisApiController(IConfiguration configRoot)
        {
            ConfigRoot = (IConfigurationRoot)configRoot;
            rd = new RedisHelper(0, ConfigRoot["ConnectionStrings:RedisConnectionStrings"]);
        }

        [HttpGet]
        [Route("RedisSet")]
        public IActionResult RedisSet()
        {
            rd.StringSet("sName", "陳卧龍");
            return Ok();
        }

        [HttpGet]
        [Route("RedisGet")]
        public IActionResult RedisGet(string key)
        {
            if (rd.KeyExists(key))
            {
                /*
                 * 如果緩存中存在,則直接返回結果
                 */
                var result = rd.StringGet(key);
                return Ok(result);
            }
            else
            {
                /*
                 * 如果緩存中不存在,則需要結合數據庫進行查詢,但必須采用相應的策略,防止惡意【緩存擊穿】。
                 * 數據庫查詢部分,
                 * 如果數據庫查詢到結果,則對結果進行緩存,並返回結果。
                 * 如果數據庫查詢不到結果,則對請求的數據進行緩存,防止緩存擊穿。
                 * 除了上述比較被動的防御以外,我們還可以采取一段時間內限制請求次數來達到惡意攻擊行為。
                 */
                return Ok();
            }
        }
    }
}

 

 

Redis的數據結構應用場景


String:

key-value結構中,value不僅可以是String,也可以是數字類型。可以應用在比如博客粉絲數量、評論數量、閱讀數量的緩存。redis也提供了計數器類型的命令(incr、decr等)
Hash:

Hash表中可以儲存多個K-V結構元素,可以用來儲存用戶的信息模塊。

key=User123
value={
“id”: 1,
“name”: “BengHiong”,
“age”: 21,
“location”: “guangdong”
}

List:

鏈表結構,可以應用於顯示某一列信息(如用戶關注列表、粉絲列表、作品列表等),還可以通過lrange命令進行分頁查詢,由於redis速度快特性,實現用戶不斷下拉操作數據仍能快速呈現的效果。
Set:

Set結構自帶了排重功能,可以通過交集命令sinterstore實現多個用戶共同好友、共同關注、共同愛好的功能,也可以用sdiffstore命令實現體現用戶如Q群未加好友的陌生人。
SortedSet:

相比Set多了個socre權重,實現了排序功能。可以應用於例如游戲排行榜、禮物排行榜功能。
Redis為什么能這么快?
完全基於內存,絕大部分請求是純粹的內存操作,非常快速。數據存在內存中。
結構類似於HashMap,HashMap的優勢就是查找和操作的時間復雜度都是O(1);
采用單線程,避免了不必要的上下文切換和競爭條件,也不存在多進程或者多線程導致的切換而消耗 CPU,不用去考慮各種鎖的問題,不存在加鎖釋放鎖操作,沒有因為可能出現死鎖而導致的性能消耗;
為啥是單線程?
redis由於是內存操作,速度非常快,所以使用單線程不必過於擔心某個操作時間占用較長導致其他操作長時間阻塞,而且單線程模式下可以減少線程間的切換和競爭開銷,以及不需要加鎖機制,提高了性能。(由於是單線程,盡量避免那些操作時間較長的命令如 keys *),多線程的話需要加鎖,還需要線程間大量的cache同步。因為每一次命令執行時間短,所以綜合下來,單線程才是最優方案
注意: redis 單線程指的是網絡請求模塊使用了一個線程,即一個線程處理所有網絡請求,其他模塊仍用了多個線程。(比如持久化操作需要fork一個子進程進行數據備份操作)

Redis的內存淘汰機制
expire time:redis中會給每個key設定過期時間,時間到了之后會有redis進行移除工作,這在緩存中是非常必要的,因為緩存空間畢竟是有限的。那當到了過期時間時,redis是怎么將這個key移除的呢?這就要說到redis的兩種刪除方式了。定期刪除和惰性刪除

 

定期刪除:
redis默認是每隔 100ms 就隨機抽取一些設置了過期時間的key,檢查其是否過期,如果過期就刪除。注意這里是隨機抽取的。為什么要隨機呢?你想一想假如 redis 存了幾十萬個 key ,每隔100ms就遍歷所有的設置過期時間的 key 的話,就會給 CPU 帶來很大的負載!

 

惰性刪除:
定期刪除可能會導致很多過期 key 到了時間並沒有被刪除掉。所以就有了惰性刪除。假如你的過期 key,靠定期刪除沒有被刪除掉,還停留在內存里,除非你的系統去查一下那個 key,才會被redis給刪除掉。這就是所謂的惰性刪除,也是夠懶的哈!

 

如何解決 Redis 的並發競爭 Key 問題
所謂 Redis 的並發競爭 Key 的問題也就是多個系統同時對一個 key 進行操作,但是最后執行的順序和我們期望的順序不同,這樣也就導致了結果的不同!(如果是單系統就不需要考慮這個問題了,因為redis本身是單線程的)
推薦一種方案:分布式鎖(zookeeper 和 redis 都可以實現分布式鎖)。(如果不存在 Redis 的並發競爭 Key 問題,不要使用分布式鎖,這樣會影響性能)
基於zookeeper臨時有序節點可以實現的分布式鎖。大致思想為:每個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節點的目錄下,生成一個唯一的瞬時有序節點。 判斷是否獲取鎖的方式很簡單,只需要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個瞬時節點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產生的死鎖問題。完成業務流程后,刪除對應的子節點釋放鎖。

 

@天才卧龍的博客


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM