.net 使用redis 實現分布式鎖


.netCore如何借助CSRedis實現一個安全高效的分布式鎖

分布式是鎖是分布式中重要且會遇到的問題,分布式鎖的難點在於,是多個進行訪問同一個資源,出現資源競爭的情況(普通的多個線程是在一個進程中 可以加鎖就能解決)

如何借助CSRedis實現一個安全高效的分布式鎖?往下看

一、Redis實現分布式鎖常見的幾個命令

► Setnx

命令:SETNX key value
說明:將 key 的值設為 value ,當且僅當 key 不存在。若給定的 key 已經存在,則 SETNX 不做任何動作。SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
時間復雜度:O(1)
返回值:設置成功,返回1 ; 設置失敗,返回 0

► Getset

命令:GETSET key value
說明:將給定 key 的值設為 value ,並返回 key 的舊值(old value)。當 key 存在但不是字符串類型時,返回一個錯誤。
時間復雜度:O(1)
返回值:返回給定 key 的舊值; 當 key 沒有舊值時,也即是, key 不存在時,返回 nil 。


► Expire

命令:EXPIRE key seconds
說明:為給定 key 設置生存時間,當 key 過期時(生存時間為 0 ),它會被自動刪除。
時間復雜度:O(1)
返回值:設置成功返回 1 ;當 key 不存在或者不能為 key 設置生存時間時(比如在低於 2.1.3 版本的 Redis 中你嘗試更新 key 的生存時間),返回 0 。

► Del

命令:DEL key [key ...]
說明:刪除給定的一個或多個 key 。不存在的 key 會被忽略。
時間復雜度:O(N); N 為被刪除的 key 的數量。
刪除單個字符串類型的 key ,時間復雜度為O(1)。
刪除單個列表、集合、有序集合或哈希表類型的 key ,時間復雜度為O(M), M 為以上數據結構內的元素數量。


 

 

 

鎖按照不同的維度,有多種分類.比如

1.悲觀鎖,樂觀鎖;

2.公平鎖,非公平鎖;

3.獨享鎖,共享鎖;

4.線程鎖,進程鎖;

 

案例一 (次案例要安裝CsRedis的依賴包)實現方式  redis 本身是個單線程,在某個進程進入redis后會把當前連接的庫 加上鎖防止其他進程進入(redis 可以設置鎖的時常,業務處理完后也可以解除當前鎖,避免死鎖的發生)其他進程在訪問redis 后發現有鎖可以設置 等待 或者重試時間  大致邏輯是這樣

using CSRedis;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Test0002
{
    internal class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");

            CSRedisClient redisClient = new CSRedis.CSRedisClient("120.0.0.0.1:6380,defaultDatabase=1,password=mixdo2018");
            var lockKey = "lockKey";
            var stockKey = "stock";
            redisClient.Set(stockKey, 5);//商品庫存
            var releaseLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis腳本
            redisClient.Del(lockKey);//測試前,先把鎖刪了.
            Parallel.For(0, 10, i =>
            {
                var id = Guid.NewGuid().ToString("N");
                //獲取鎖
                do
                {
                    //set : key存在則失敗,不存在才會成功,並且過期時間5秒
                    var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx);
                    if (success == true)
                    {
                        break;
                    }
                    Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖
                } while (true);
                Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費");
                //扣減庫存
                var currentStock = redisClient.IncrBy(stockKey, -1);
                if (currentStock < 0)
                {
                    Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!");
                    redisClient.Eval(releaseLockScript, lockKey, id);
                    return;
                }
                //模擬處理業務,這里不考慮失敗的情況
                Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3)));
                Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩余 {currentStock} 個");
                //業務處理完后,釋放鎖.
                redisClient.Eval(releaseLockScript, lockKey, id);
            });

        }
    }
}

///上面這個只是簡單的在單個進程里面模擬

把上面的方式進行處理之后得到下面簡練的方式

///下面這個可以在本地通過 dotnet TimePlan.dll  --urls  http://localhost:5002   、 dotnet TimePlan.dll  --urls  http://localhost:5003 、dotnet TimePlan.dll  --urls  http://localhost:5004 、dotnet TimePlan.dll  --urls  http://localhost:5005 開多台進程進行測試

using CSRedis; 
using System;
using System.Threading;

namespace Test0004
{
    internal class Program
    {
        static void Main(string[] args)
        {
            Thread.Sleep(500);

            Console.WriteLine("開始循環10次");
            for (int i = 0; i < 1000; i++)
            {
                Thread.Sleep(500);

                Console.WriteLine("Hello World!");
                CSRedisClient redisClient = new CSRedis.CSRedisClient("120.0.0.0.1:6380,defaultDatabase=1,password=mixdo2018");
                var lockKey = "lockKey";
                var releaseLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis腳本
                var id = Guid.NewGuid().ToString("N");

                var success = redisClient.Set(lockKey, id, expireSeconds: -1, exists: RedisExistence.Nx);
                if (success != true)
                {
                    Console.WriteLine("當前資源以被占用,沒機會了,等10秒鍾再次嘗試");
                    Thread.Sleep(500);//休息1秒再嘗試獲取鎖
                    continue;
                }
                else
                {
                    Console.WriteLine("已獲取當前資源鎖的鑰匙,可放心使用");
                }

                ///業務操作
                Console.WriteLine("我好開心,我拿到key了:" + id);
                Thread.Sleep(500);//休息1秒再嘗試獲取鎖
                Console.WriteLine("哈哈哈");
                Console.WriteLine("救命救命");

                redisClient.Eval(releaseLockScript, lockKey, id);
                redisClient.Del(lockKey);
                redisClient.Dispose();
            }

        }

    }

}

 

案例二(此案例要安裝StackExchange.Redis的包)  實現方式(思路大致一樣)

using Microsoft.Extensions.Configuration;
using StackExchange.Redis;
using System;
using System.Threading;

namespace Redis
{
    public class RedisHelper
    {
        #region Fileds
        private static string _redisConnection; 
        private static int _db = 0;
        private static ConnectionMultiplexer connection;
        #endregion

        #region Constructors
        public RedisHelper(IConfiguration configuration)
        {
            _redisConnection = configuration["RedisConfigHost.Connection"]?.ToString() ?? "";
        }

        public static ConnectionMultiplexer CacheConnection
        {
            get
            {
                try
                {
                    if (connection == null || !connection.IsConnected)
                    {
                        connection = new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(_redisConnection)).Value;
                    }
                }
                catch (Exception ex)
                {
                    return null;
                }
                return connection;
            }
        }
        #endregion


        #region Methons

        /// <summary>
        /// 緩存當前數據庫
        /// </summary>
        public static IDatabase CacheRedis => CacheConnection.GetDatabase(_db);

        /// <summary>
        /// 新增單條值
        /// </summary>
        /// <param name="values"></param>
        /// <returns></returns>
        public static bool StringSet(string key, string values)
        {
            if (string.IsNullOrEmpty(key) && string.IsNullOrEmpty(values))
                throw new AggregateException("values or is null");
            return CacheRedis.StringSet(key, values);
        }

        /// <summary>
        /// 查詢單個key值
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static RedisValue GetStringKey(string key)
        {
            return CacheRedis.StringGet(key);
        }

        /// <summary>
        /// 判斷key是否存儲
        /// </summary>
        /// <param name="key">redis key</param>
        /// <returns></returns>
        public bool KeyExists(string key)
        {
            return CacheRedis.KeyExists(key);
        }

        /// <summary>
        /// 刪除單個key
        /// </summary>
        /// <param name="key">redis key</param>
        /// <returns>是否刪除成功</returns>
        public bool KeyDelete(string key)
        {
            return CacheRedis.KeyDelete(key);
        }


        /// <summary>
        /// redis 枷鎖
        /// </summary>
        /// <param name="key">需要加鎖的鎖名</param>
        /// <param name="expireTimeSeconds">該鎖自動到期時間  如果沒其他要求可設置為最大時常   該方式一定要手動解鎖</param>
        /// <exception cref="Exception"></exception>
        #region 分布式鎖
        public static bool LockByRedis(string key, string values)
        {
            try
            {
                //expireTimeSeconds = expireTimeSeconds > 20 ? 10 : expireTimeSeconds;
                //var data = TimeSpan.FromSeconds(expireTimeSeconds);
                //var token = Environment.MachineName;              
                //bool lockflag = CacheRedis.LockTake(key, Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(expireTimeSeconds));

                bool lockflag = CacheRedis.LockTake(key, values, TimeSpan.MaxValue);
                if (!lockflag)
                {
                    return false;
                }
                return true;
            }
            catch (Exception ex)
            {
                throw new Exception($"Redis加鎖異常:原因{ex.Message}");
            }
        }

        /// <summary>
        /// 解鎖
        /// </summary>
        /// <param name="key">需要解鎖的鎖名</param>
        /// <param name="values">需要解鎖的值</param>
        /// <returns></returns>
        /// <exception cref="Exception"></exception>
        public static bool UnLockByRedis(string key, string valuse)
        {
            try
            {
                // Thread.CurrentThread.ManagedThreadId
                //Environment.MachineName
                return CacheRedis.LockRelease(key, valuse);
            }
            catch (Exception ex)
            {
                throw new Exception($"Redis加鎖異常:原因{ex.Message}");
            }
        }

        #endregion
        #endregion


        #region Utilities
        #endregion
    }
}

下面是調用方式

public static void Alternative()
        {
            count++;
            Console.WriteLine($"進入Alternative進入時間{DateTime.Now}");

            var lockKey = "lockKey";
            var lockValue = Guid.NewGuid().ToString("N");

            bool result = Redis.RedisHelper.LockByRedis(lockKey, lockValue);
            if (!result)
            {
                Console.WriteLine("沒有搶到鎖,這次請求停止");
                return;
            }

            string data = Redis.RedisHelper.GetStringKey("Sum").ToString();
            int sum = int.Parse(string.IsNullOrEmpty(data) ? "0" : data);

            Console.WriteLine("讀取到的sum是:" + sum.ToString());
            if (sum <= 0)
            {
                Console.WriteLine("sum 小於0 直接插入");
                Redis.RedisHelper.StringSet("Sum", count.ToString());
                SendTimed();
            }
            else
            {
                Console.WriteLine("sum 大於0 開始判斷;當前的count是:" + count.ToString());
                if (sum < count)
                {
                    Console.WriteLine("count大於sum開始插入");
                    Redis.RedisHelper.StringSet("Sum", count.ToString());
                    SendTimed();
                }
            }

            var unlock = Redis.RedisHelper.UnLockByRedis(lockKey, lockValue);

            Console.WriteLine($"進入Alternative結束時間{DateTime.Now}");


        }

 

示例2 

介紹c#操作redis的文章 http://www.cnblogs.com/axel10/p/8459434.html ,這篇文章中的案例使用了StringIncrement來實現了高並發情況下key值的穩定增加,但如果要用鎖的方式而不是StringIncrement方法,那該怎么做呢?

LockTake涉及到三個參數:key,token和TimeSpan,分別表示redis數據庫中該鎖的名稱、鎖的持有者標識和有效時間。下面將用一個多線程增加key值的案例來演示LockTake/LockRelease的用法。

using StackExchange.Redis;
using StackExchange.Redis.Extensions.Core;
using StackExchange.Redis.Extensions.Core.Configuration;
using StackExchange.Redis.Extensions.Newtonsoft;
using System;
using System.Threading;
 
namespace RedisTest
{
    class Program
    {
        static RedisValue Token = Environment.MachineName;
        static RedisKey Key = "lock";
 
        static void Ins()
        {
            Thread thread = new Thread(() =>
            {
                for (int i = ; i < ; i++)
                {
 
                    if (client.Database.LockTake(Key, Token, TimeSpan.MaxValue))   //key表示的是redis數據庫中該鎖的名稱,不可重復。 Token用來標識誰擁有該鎖並用來釋放鎖。TimeSpan表示該鎖的有效時間。
                    {
                        try
                        {
                            int key = client.Get<int>("key");
 
                            client.Add("key", key + );
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e);
                            throw;
                        }
                        finally
                        {
                            client.Database.LockRelease(Key, Token);
                        }
                    }
                    else
                    {
                        Console.WriteLine("locking");
                        while (!TryAgain())
                        {
                            Thread.Sleep();
                        }
                    }
                }
            });
            thread.Start();
        }
 
        private static StackExchangeRedisCacheClient client;
 
        static void Main(string[] args)
        {
            var redisConfiguration = new RedisConfiguration() //配置
            {
                Hosts = new RedisHost[]
                {
                    new RedisHost() {Host = "127.0.0.1", Port = }
                }
            };
 
            client = new StackExchangeRedisCacheClient(new NewtonsoftSerializer(), redisConfiguration);
            client.Add("key", );
 
            for (int j = ; j < ; j++)
            {
                Ins();
            }
 
            Thread.Sleep();
 
            int i = client.Get<int>("key");
            Console.WriteLine(i);
            Console.ReadKey();
        }
 
        static bool TryAgain(int value)
        {
            if (client.Database.LockTake(Key, Token, TimeSpan.MaxValue))
            {
                try
                {
                    int key = client.Get<int>("key");
 
                    client.Add("key", key + value);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    throw;
                }
                finally
                {
                    client.Database.LockRelease(Key, Token);
                }
 
                return true;
            }
            else
            {
                return false;
            }
        }
    }
}

 

 

案例三  實現方式

Redis實現分布式鎖(悲觀鎖/樂觀鎖)

Redis連接池

public static PooledRedisClientManager RedisClientPool = CreateManager();

        private static PooledRedisClientManager CreateManager()
        {
            var redisHosts = System.Configuration.ConfigurationManager.AppSettings["redisHosts"];
            if (string.IsNullOrEmpty(redisHosts))
            {
                throw new Exception("AppSetting redisHosts no found");
            }

            string[] redisHostarr = redisHosts.Split(new string[] { ",", "" }, StringSplitOptions.RemoveEmptyEntries);

            return new PooledRedisClientManager(redisHostarr, redisHostarr, new RedisClientManagerConfig
            {
                MaxWritePoolSize = 1000,
                MaxReadPoolSize = 1000,
                AutoStart = true,
                DefaultDb = 0
            });
        }

使用Redis的SetNX命令實現加鎖,

        /// <summary>
        /// 加鎖
        /// </summary>
        /// <param name="key">鎖key</param>
        /// <param name="selfMark">自己標記</param>
        /// <param name="lockExpirySeconds">鎖自動過期時間[默認10](s)</param>
        /// <param name="waitLockMilliseconds">等待鎖時間(ms)</param>
        /// <returns></returns>
        public static bool Lock(string key, out string selfMark, int lockExpirySeconds = 10, long waitLockMilliseconds = long.MaxValue)
        {
            DateTime begin = DateTime.Now;
            selfMark = Guid.NewGuid().ToString("N");//自己標記,釋放鎖時會用到,自己加的鎖除非過期否則只能自己打開
            using (RedisClient redisClient = (RedisClient)RedisClientPool.GetClient())
            {
                string lockKey = "Lock:" + key;
                while (true)
                {
                    string script = string.Format("if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then redis.call('PEXPIRE',KEYS[1],{0}) return 1 else return 0 end", lockExpirySeconds * 1000);
                    //循環獲取取鎖
                    if (redisClient.ExecLuaAsInt(script, new[] { lockKey }, new[] { selfMark }) == 1)
                    {
                        return true;
                    }

                    //不等待鎖則返回
                    if (waitLockMilliseconds == 0)
                    {
                        break;
                    }

                    //超過等待時間,則不再等待
                    if ((DateTime.Now - begin).TotalMilliseconds >= waitLockMilliseconds)
                    {
                        break;
                    }
                    Thread.Sleep(100);
                }

                return false;
            }
        }

  • 參數key:鎖的key
  • 參數selfMark:在設置鎖的時候會產生一個自己的標識,在釋放鎖的時候會用到,所謂解鈴還須系鈴人。防止鎖被誤釋放,導致鎖無效.
  • 參數lockExpirySeconds:鎖的默認過期時間,防止被永久死鎖.
  • 參數waitLockMilliseconds:循環獲取鎖的等待時間.

如果設置為0,為樂觀鎖機制,獲取不到鎖,直接返回未獲取到鎖.
默認值為long最大值,為悲觀鎖機制,約等於很多很多天,可以理解為一直等待

釋放鎖

/// <summary>
        /// 釋放鎖
        /// </summary>
        /// <param name="key">鎖key</param>
        /// <param name="selfMark">自己標記</param>
        public void UnLock(string key, string selfMark)
        {
            using (RedisClient redisClient = (RedisClient)RedisClientPool.GetClient())
            {
                string lockKey = "Lock:" + key;
                var script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                redisClient.ExecLuaAsString(script, new[] { lockKey }, new[] { selfMark });
            }
        }
  • 參數key:鎖的key
  • 參數selfMark:在設置鎖的時候返回的自己標識,用來解鎖自己加的鎖(此值不能隨意傳,必須是加鎖時返回的值

調用方式

  • 悲觀鎖方式
 int num = 10;
            string lockkey = "xianseng";

            //悲觀鎖開啟20個人同時拿寶貝
            for (int i = 0; i < 20; i++)
            {
                Task.Run(() =>
                {
                    string selfmark = "";
                    try
                    {
                        if (PublicLockHelper.Lock(lockkey, out selfmark))
                        {
                            if (num > 0)
                            {
                                num--;
                                Console.WriteLine($"我拿到了寶貝:寶貝剩余{num}個\t\t{selfmark}");
                            }
                            else
                            {
                                Console.WriteLine("寶貝已經沒有了");
                            }
                            Thread.Sleep(100);
                        }
                    }
                    finally
                    {
                        PublicLockHelper.UnLock(lockkey, selfmark);
                    }
                });
            }
  • 樂觀鎖方式
  int num = 10;
            string lockkey = "xianseng";

            //樂觀鎖開啟10個線程,每個線程拿5次
            for (int i = 0; i < 10; i++)
            {
                Task.Run(() =>
                {
                    for (int j = 0; j < 5; j++)
                    {
                        string selfmark = "";
                        try
                        {
                            if (PublicLockHelper.Lock(lockkey, out selfmark, 10, 0))
                            {
                                if (num > 0)
                                {
                                    num--;
                                    Console.WriteLine($"我拿到了寶貝:寶貝剩余{num}個\t\t{selfmark}");
                                }
                                else
                                {
                                    Console.WriteLine("寶貝已經沒有了");
                                }

                                Thread.Sleep(1000);
                            }
                            else
                            {
                                Console.WriteLine("沒有拿到,不想等了");
                            }
                        }
                        finally
                        {
                            PublicLockHelper.UnLock(lockkey, selfmark);
                        }
                    }
                });
            }

單機只能用多線模擬使用分布式鎖了

此鎖已經可以滿足大多數場景了,若有不妥,還請多多指出,以免誤別人!



借鑒:https://cloud.tencent.com/developer/article/1742360
借鑒:https://www.cnblogs.com/simoncai/p/11477177.html
借鑒:https://www.cnblogs.com/runningsmallguo/p/10322315.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM