.netCore如何借助CSRedis實現一個安全高效的分布式鎖
分布式是鎖是分布式中重要且會遇到的問題,分布式鎖的難點在於,是多個進行訪問同一個資源,出現資源競爭的情況(普通的多個線程是在一個進程中 可以加鎖就能解決)
如何借助CSRedis實現一個安全高效的分布式鎖?往下看
一、Redis實現分布式鎖常見的幾個命令
► Setnx
命令:SETNX key value
說明:將 key 的值設為 value ,當且僅當 key 不存在。若給定的 key 已經存在,則 SETNX 不做任何動作。SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
時間復雜度:O(1)
返回值:設置成功,返回1 ; 設置失敗,返回 0
► Getset
命令:GETSET key value
說明:將給定 key 的值設為 value ,並返回 key 的舊值(old value)。當 key 存在但不是字符串類型時,返回一個錯誤。
時間復雜度:O(1)
返回值:返回給定 key 的舊值; 當 key 沒有舊值時,也即是, key 不存在時,返回 nil 。
► Expire
命令:EXPIRE key seconds
說明:為給定 key 設置生存時間,當 key 過期時(生存時間為 0 ),它會被自動刪除。
時間復雜度:O(1)
返回值:設置成功返回 1 ;當 key 不存在或者不能為 key 設置生存時間時(比如在低於 2.1.3 版本的 Redis 中你嘗試更新 key 的生存時間),返回 0 。
► Del
命令:DEL key [key ...]
說明:刪除給定的一個或多個 key 。不存在的 key 會被忽略。
時間復雜度:O(N); N 為被刪除的 key 的數量。
刪除單個字符串類型的 key ,時間復雜度為O(1)。
刪除單個列表、集合、有序集合或哈希表類型的 key ,時間復雜度為O(M), M 為以上數據結構內的元素數量。
鎖按照不同的維度,有多種分類.比如
1.悲觀鎖,樂觀鎖;
2.公平鎖,非公平鎖;
3.獨享鎖,共享鎖;
4.線程鎖,進程鎖;
案例一 (次案例要安裝CsRedis的依賴包)實現方式 redis 本身是個單線程,在某個進程進入redis后會把當前連接的庫 加上鎖防止其他進程進入(redis 可以設置鎖的時常,業務處理完后也可以解除當前鎖,避免死鎖的發生)其他進程在訪問redis 后發現有鎖可以設置 等待 或者重試時間 大致邏輯是這樣
using CSRedis; using System; using System.Threading; using System.Threading.Tasks; namespace Test0002 { internal class Program { static void Main(string[] args) { Console.WriteLine("Hello World!"); CSRedisClient redisClient = new CSRedis.CSRedisClient("120.0.0.0.1:6380,defaultDatabase=1,password=mixdo2018"); var lockKey = "lockKey"; var stockKey = "stock"; redisClient.Set(stockKey, 5);//商品庫存 var releaseLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis腳本 redisClient.Del(lockKey);//測試前,先把鎖刪了. Parallel.For(0, 10, i => { var id = Guid.NewGuid().ToString("N"); //獲取鎖 do { //set : key存在則失敗,不存在才會成功,並且過期時間5秒 var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖 } while (true); Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費"); //扣減庫存 var currentStock = redisClient.IncrBy(stockKey, -1); if (currentStock < 0) { Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!"); redisClient.Eval(releaseLockScript, lockKey, id); return; } //模擬處理業務,這里不考慮失敗的情況 Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3))); Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩余 {currentStock} 個"); //業務處理完后,釋放鎖. redisClient.Eval(releaseLockScript, lockKey, id); }); } } }
///上面這個只是簡單的在單個進程里面模擬
把上面的方式進行處理之后得到下面簡練的方式
///下面這個可以在本地通過 dotnet TimePlan.dll --urls http://localhost:5002 、 dotnet TimePlan.dll --urls http://localhost:5003 、dotnet TimePlan.dll --urls http://localhost:5004 、dotnet TimePlan.dll --urls http://localhost:5005 開多台進程進行測試
using CSRedis; using System; using System.Threading; namespace Test0004 { internal class Program { static void Main(string[] args) { Thread.Sleep(500); Console.WriteLine("開始循環10次"); for (int i = 0; i < 1000; i++) { Thread.Sleep(500); Console.WriteLine("Hello World!"); CSRedisClient redisClient = new CSRedis.CSRedisClient("120.0.0.0.1:6380,defaultDatabase=1,password=mixdo2018"); var lockKey = "lockKey"; var releaseLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis腳本 var id = Guid.NewGuid().ToString("N"); var success = redisClient.Set(lockKey, id, expireSeconds: -1, exists: RedisExistence.Nx); if (success != true) { Console.WriteLine("當前資源以被占用,沒機會了,等10秒鍾再次嘗試"); Thread.Sleep(500);//休息1秒再嘗試獲取鎖 continue; } else { Console.WriteLine("已獲取當前資源鎖的鑰匙,可放心使用"); } ///業務操作 Console.WriteLine("我好開心,我拿到key了:" + id); Thread.Sleep(500);//休息1秒再嘗試獲取鎖 Console.WriteLine("哈哈哈"); Console.WriteLine("救命救命"); redisClient.Eval(releaseLockScript, lockKey, id); redisClient.Del(lockKey); redisClient.Dispose(); } } } }
案例二(此案例要安裝StackExchange.Redis的包) 實現方式(思路大致一樣)
using Microsoft.Extensions.Configuration; using StackExchange.Redis; using System; using System.Threading; namespace Redis { public class RedisHelper { #region Fileds private static string _redisConnection; private static int _db = 0; private static ConnectionMultiplexer connection; #endregion #region Constructors public RedisHelper(IConfiguration configuration) { _redisConnection = configuration["RedisConfigHost.Connection"]?.ToString() ?? ""; } public static ConnectionMultiplexer CacheConnection { get { try { if (connection == null || !connection.IsConnected) { connection = new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(_redisConnection)).Value; } } catch (Exception ex) { return null; } return connection; } } #endregion #region Methons /// <summary> /// 緩存當前數據庫 /// </summary> public static IDatabase CacheRedis => CacheConnection.GetDatabase(_db); /// <summary> /// 新增單條值 /// </summary> /// <param name="values"></param> /// <returns></returns> public static bool StringSet(string key, string values) { if (string.IsNullOrEmpty(key) && string.IsNullOrEmpty(values)) throw new AggregateException("values or is null"); return CacheRedis.StringSet(key, values); } /// <summary> /// 查詢單個key值 /// </summary> /// <param name="key"></param> /// <returns></returns> public static RedisValue GetStringKey(string key) { return CacheRedis.StringGet(key); } /// <summary> /// 判斷key是否存儲 /// </summary> /// <param name="key">redis key</param> /// <returns></returns> public bool KeyExists(string key) { return CacheRedis.KeyExists(key); } /// <summary> /// 刪除單個key /// </summary> /// <param name="key">redis key</param> /// <returns>是否刪除成功</returns> public bool KeyDelete(string key) { return CacheRedis.KeyDelete(key); } /// <summary> /// redis 枷鎖 /// </summary> /// <param name="key">需要加鎖的鎖名</param> /// <param name="expireTimeSeconds">該鎖自動到期時間 如果沒其他要求可設置為最大時常 該方式一定要手動解鎖</param> /// <exception cref="Exception"></exception> #region 分布式鎖 public static bool LockByRedis(string key, string values) { try { //expireTimeSeconds = expireTimeSeconds > 20 ? 10 : expireTimeSeconds; //var data = TimeSpan.FromSeconds(expireTimeSeconds); //var token = Environment.MachineName; //bool lockflag = CacheRedis.LockTake(key, Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(expireTimeSeconds)); bool lockflag = CacheRedis.LockTake(key, values, TimeSpan.MaxValue); if (!lockflag) { return false; } return true; } catch (Exception ex) { throw new Exception($"Redis加鎖異常:原因{ex.Message}"); } } /// <summary> /// 解鎖 /// </summary> /// <param name="key">需要解鎖的鎖名</param>
/// <param name="values">需要解鎖的值</param>
/// <returns></returns> /// <exception cref="Exception"></exception> public static bool UnLockByRedis(string key, string valuse) { try { // Thread.CurrentThread.ManagedThreadId //Environment.MachineName return CacheRedis.LockRelease(key, valuse); } catch (Exception ex) { throw new Exception($"Redis加鎖異常:原因{ex.Message}"); } } #endregion #endregion #region Utilities #endregion } }
下面是調用方式
public static void Alternative() { count++; Console.WriteLine($"進入Alternative進入時間{DateTime.Now}"); var lockKey = "lockKey"; var lockValue = Guid.NewGuid().ToString("N"); bool result = Redis.RedisHelper.LockByRedis(lockKey, lockValue); if (!result) { Console.WriteLine("沒有搶到鎖,這次請求停止"); return; } string data = Redis.RedisHelper.GetStringKey("Sum").ToString(); int sum = int.Parse(string.IsNullOrEmpty(data) ? "0" : data); Console.WriteLine("讀取到的sum是:" + sum.ToString()); if (sum <= 0) { Console.WriteLine("sum 小於0 直接插入"); Redis.RedisHelper.StringSet("Sum", count.ToString()); SendTimed(); } else { Console.WriteLine("sum 大於0 開始判斷;當前的count是:" + count.ToString()); if (sum < count) { Console.WriteLine("count大於sum開始插入"); Redis.RedisHelper.StringSet("Sum", count.ToString()); SendTimed(); } } var unlock = Redis.RedisHelper.UnLockByRedis(lockKey, lockValue); Console.WriteLine($"進入Alternative結束時間{DateTime.Now}"); }
示例2
介紹c#操作redis的文章 http://www.cnblogs.com/axel10/p/8459434.html ,這篇文章中的案例使用了StringIncrement來實現了高並發情況下key值的穩定增加,但如果要用鎖的方式而不是StringIncrement方法,那該怎么做呢?
LockTake涉及到三個參數:key,token和TimeSpan,分別表示redis數據庫中該鎖的名稱、鎖的持有者標識和有效時間。下面將用一個多線程增加key值的案例來演示LockTake/LockRelease的用法。
using StackExchange.Redis; using StackExchange.Redis.Extensions.Core; using StackExchange.Redis.Extensions.Core.Configuration; using StackExchange.Redis.Extensions.Newtonsoft; using System; using System.Threading; namespace RedisTest { class Program { static RedisValue Token = Environment.MachineName; static RedisKey Key = "lock"; static void Ins() { Thread thread = new Thread(() => { for (int i = ; i < ; i++) { if (client.Database.LockTake(Key, Token, TimeSpan.MaxValue)) //key表示的是redis數據庫中該鎖的名稱,不可重復。 Token用來標識誰擁有該鎖並用來釋放鎖。TimeSpan表示該鎖的有效時間。 { try { int key = client.Get<int>("key"); client.Add("key", key + ); } catch (Exception e) { Console.WriteLine(e); throw; } finally { client.Database.LockRelease(Key, Token); } } else { Console.WriteLine("locking"); while (!TryAgain()) { Thread.Sleep(); } } } }); thread.Start(); } private static StackExchangeRedisCacheClient client; static void Main(string[] args) { var redisConfiguration = new RedisConfiguration() //配置 { Hosts = new RedisHost[] { new RedisHost() {Host = "127.0.0.1", Port = } } }; client = new StackExchangeRedisCacheClient(new NewtonsoftSerializer(), redisConfiguration); client.Add("key", ); for (int j = ; j < ; j++) { Ins(); } Thread.Sleep(); int i = client.Get<int>("key"); Console.WriteLine(i); Console.ReadKey(); } static bool TryAgain(int value) { if (client.Database.LockTake(Key, Token, TimeSpan.MaxValue)) { try { int key = client.Get<int>("key"); client.Add("key", key + value); } catch (Exception e) { Console.WriteLine(e); throw; } finally { client.Database.LockRelease(Key, Token); } return true; } else { return false; } } } }
案例三 實現方式
Redis實現分布式鎖(悲觀鎖/樂觀鎖)
Redis連接池
public static PooledRedisClientManager RedisClientPool = CreateManager(); private static PooledRedisClientManager CreateManager() { var redisHosts = System.Configuration.ConfigurationManager.AppSettings["redisHosts"]; if (string.IsNullOrEmpty(redisHosts)) { throw new Exception("AppSetting redisHosts no found"); } string[] redisHostarr = redisHosts.Split(new string[] { ",", "," }, StringSplitOptions.RemoveEmptyEntries); return new PooledRedisClientManager(redisHostarr, redisHostarr, new RedisClientManagerConfig { MaxWritePoolSize = 1000, MaxReadPoolSize = 1000, AutoStart = true, DefaultDb = 0 }); }
使用Redis的SetNX命令實現加鎖,
/// <summary> /// 加鎖 /// </summary> /// <param name="key">鎖key</param> /// <param name="selfMark">自己標記</param> /// <param name="lockExpirySeconds">鎖自動過期時間[默認10](s)</param> /// <param name="waitLockMilliseconds">等待鎖時間(ms)</param> /// <returns></returns> public static bool Lock(string key, out string selfMark, int lockExpirySeconds = 10, long waitLockMilliseconds = long.MaxValue) { DateTime begin = DateTime.Now; selfMark = Guid.NewGuid().ToString("N");//自己標記,釋放鎖時會用到,自己加的鎖除非過期否則只能自己打開 using (RedisClient redisClient = (RedisClient)RedisClientPool.GetClient()) { string lockKey = "Lock:" + key; while (true) { string script = string.Format("if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then redis.call('PEXPIRE',KEYS[1],{0}) return 1 else return 0 end", lockExpirySeconds * 1000); //循環獲取取鎖 if (redisClient.ExecLuaAsInt(script, new[] { lockKey }, new[] { selfMark }) == 1) { return true; } //不等待鎖則返回 if (waitLockMilliseconds == 0) { break; } //超過等待時間,則不再等待 if ((DateTime.Now - begin).TotalMilliseconds >= waitLockMilliseconds) { break; } Thread.Sleep(100); } return false; } }
- 參數key:鎖的key
- 參數selfMark:在設置鎖的時候會產生一個自己的標識,在釋放鎖的時候會用到,所謂解鈴還須系鈴人。防止鎖被誤釋放,導致鎖無效.
- 參數lockExpirySeconds:鎖的默認過期時間,防止被永久死鎖.
- 參數waitLockMilliseconds:循環獲取鎖的等待時間.
如果設置為0,為樂觀鎖機制,獲取不到鎖,直接返回未獲取到鎖.
默認值為long最大值,為悲觀鎖機制,約等於很多很多天,可以理解為一直等待
釋放鎖
/// <summary> /// 釋放鎖 /// </summary> /// <param name="key">鎖key</param> /// <param name="selfMark">自己標記</param> public void UnLock(string key, string selfMark) { using (RedisClient redisClient = (RedisClient)RedisClientPool.GetClient()) { string lockKey = "Lock:" + key; var script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; redisClient.ExecLuaAsString(script, new[] { lockKey }, new[] { selfMark }); } }
- 參數key:鎖的key
- 參數selfMark:在設置鎖的時候返回的自己標識,用來解鎖自己加的鎖(此值不能隨意傳,必須是加鎖時返回的值)
調用方式
- 悲觀鎖方式
int num = 10; string lockkey = "xianseng"; //悲觀鎖開啟20個人同時拿寶貝 for (int i = 0; i < 20; i++) { Task.Run(() => { string selfmark = ""; try { if (PublicLockHelper.Lock(lockkey, out selfmark)) { if (num > 0) { num--; Console.WriteLine($"我拿到了寶貝:寶貝剩余{num}個\t\t{selfmark}"); } else { Console.WriteLine("寶貝已經沒有了"); } Thread.Sleep(100); } } finally { PublicLockHelper.UnLock(lockkey, selfmark); } }); }
- 樂觀鎖方式
int num = 10; string lockkey = "xianseng"; //樂觀鎖開啟10個線程,每個線程拿5次 for (int i = 0; i < 10; i++) { Task.Run(() => { for (int j = 0; j < 5; j++) { string selfmark = ""; try { if (PublicLockHelper.Lock(lockkey, out selfmark, 10, 0)) { if (num > 0) { num--; Console.WriteLine($"我拿到了寶貝:寶貝剩余{num}個\t\t{selfmark}"); } else { Console.WriteLine("寶貝已經沒有了"); } Thread.Sleep(1000); } else { Console.WriteLine("沒有拿到,不想等了"); } } finally { PublicLockHelper.UnLock(lockkey, selfmark); } } }); }
單機只能用多線模擬使用分布式鎖了
此鎖已經可以滿足大多數場景了,若有不妥,還請多多指出,以免誤別人!
()
借鑒:https://cloud.tencent.com/developer/article/1742360
借鑒:https://www.cnblogs.com/simoncai/p/11477177.html
借鑒:https://www.cnblogs.com/runningsmallguo/p/10322315.html