為什么要用鎖?
大型站點在高並發的情況下,為了保持數據最終一致性就需要用到技術方案來支持。比如:分布式鎖、分布式事務。有時候我們在為了保證某一個方法每次只能被一個調用者使用的時候,這時候我們也可以鎖來實現。
基於本地緩存實現鎖
為什么還要寫基於本地緩存實現的鎖呢,因為有些項目項目可能還是單機部署的,當隨着業務量增長的時候就會變成多機部署,從單機到多機的切換過程中,我們也需要把原先業務相關的鎖改成分布式鎖,來保持數據的最終一致性。當然項目是使用ioc的那就更好了,切換注冊時的實現類就完成了切換,非常方便。
實現思路:
用戶需要用一個key和一個唯一的值(知道當前這個key的使用者是誰)來獲取一個鎖,獲取到鎖之后,執行完對應的操作然后釋放掉。在釋放鎖的時候 4我們需要判斷下當前這個鎖的使用者對應的值與想要釋放傳遞過來的值是不是相等,如果相等則可以釋放,不相等則不釋放。
實現代碼:
public sealed class LocalLock : ILock { private static ConcurrentDictionary<string, object> _LockCache = new ConcurrentDictionary<string, object>(); private static ConcurrentDictionary<string, string> _LockUserCache = new ConcurrentDictionary<string, string>(); /// <summary> /// 獲取一個鎖(需要自己釋放) /// </summary> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <param name="span">耗時時間</param> /// <returns>成功返回true</returns> public bool LockTake(string key, string value, TimeSpan span) { EnsureUtil.NotNullAndNotEmpty(key, "Lockkey"); EnsureUtil.NotNullAndNotEmpty(value, "Lockvalue"); var obj = _LockCache.GetValue(key, () => { return new object(); }); if (Monitor.TryEnter(obj, span)) { _LockUserCache[key] = value; return true; } return false; } /// <summary> /// 異步獲取一個鎖(需要自己釋放) /// </summary> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <param name="span">耗時時間</param> /// <returns>成功返回true</returns> public Task<bool> LockTakeAsync(string key, string value, TimeSpan span) { return Task.FromResult(LockTake(key, value, span)); } /// <summary> /// 釋放一個鎖 /// </summary> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <returns>成功返回true</returns> public bool LockRelease(string key, string value) { EnsureUtil.NotNullAndNotEmpty(key, "Lockkey"); EnsureUtil.NotNullAndNotEmpty(value, "Lockvalue"); _LockCache.TryGetValue(key, out object obj); if (obj != null) { if (_LockUserCache[key] == value) { Monitor.Exit(obj); return true; } return false; } return true; } /// <summary> /// 異步釋放一個鎖 /// </summary> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <returns>成功返回true</returns> public Task<bool> LockReleaseAsync(string key, string value) { return Task.FromResult(LockRelease(key, value)); } /// <summary> /// 使用鎖執行一個方法 /// </summary> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <param name="span">耗時時間</param> /// <param name="executeAction">要執行的方法</param> public void ExecuteWithLock(string key, string value, TimeSpan span, Action executeAction) { if (executeAction == null) return; if (LockTake(key, value, span)) { try { executeAction(); } finally { LockRelease(key, value); } } } /// <summary> /// 使用鎖執行一個方法 /// </summary> /// <typeparam name="T">返回值類型</typeparam> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <param name="span">耗時時間</param> /// <param name="executeAction">要執行的方法</param> /// <param name="defaultValue">默認返回</param> /// <returns></returns> public T ExecuteWithLock<T>(string key, string value, TimeSpan span, Func<T> executeAction, T defaultValue = default(T)) { if (executeAction == null) return defaultValue; if (LockTake(key, value, span)) { try { return executeAction(); } finally { LockRelease(key, value); } } return defaultValue; } /// <summary> /// 使用鎖執行一個異步方法 /// </summary> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <param name="span">耗時時間</param> /// <param name="executeAction">要執行的方法</param> public async Task ExecuteWithLockAsync(string key, string value, TimeSpan span, Func<Task> executeAction) { if (executeAction == null) return; if (await LockTakeAsync(key, value, span)) { try { await executeAction(); } catch { throw; } finally { LockRelease(key, value); } } } /// <summary> /// 使用鎖執行一個異步方法 /// </summary> /// <typeparam name="T">返回值類型</typeparam> /// <param name="key">鎖的鍵</param> /// <param name="value">當前占用值</param> /// <param name="span">耗時時間</param> /// <param name="executeAction">要執行的方法</param> /// <param name="defaultValue">默認返回</param> /// <returns></returns> public async Task<T> ExecuteWithLockAsync<T>(string key, string value, TimeSpan span, Func<Task<T>> executeAction, T defaultValue = default(T)) { if (executeAction == null) return defaultValue; if (await LockTakeAsync(key, value, span)) { try { return await executeAction(); } catch { throw; } finally { LockRelease(key, value); } } return defaultValue; } }
class Program { static void Main(string[] args) { ILock localLock = new LocalLock(); int excuteCount = 0; Parallel.For(0, 10000, i => { localLock.ExecuteWithLock("test", Guid.NewGuid().ToString(), TimeSpan.FromSeconds(5), () => { Console.WriteLine("獲取鎖成功"); Interlocked.Increment(ref excuteCount); }); }); Console.WriteLine("成功次數:" + excuteCount.ToString()); Console.WriteLine("執行完成"); Console.ReadLine(); } }

基於zk實現的分布式鎖
實現思路:
在獲取鎖的時候在固定節點下創建一個自增的臨時節點,然后獲取節點列表,按照增量排序,假如當前創建的節點是排在第一個的,那就表明這個節點是得到了執行的權限,假如在它前面還有其它節點,那么就對它的上一個節點進行監聽,等到上一個節點被刪除了,那么該節點就得到了執行的權限了。
由於代碼片段太多,待會再github自行查看實現過程。zk獲取鎖的速度比較慢,導致有幾個可能是失敗的。

基於redis的實現
實現思路:
利用redis的setnx(key, value):“set if not exits”,若該key-value不存在,則成功加入緩存並且返回1,否則返回0。在有效時間內如果設置成功則獲取執行限權,沒有那就獲取權限失敗。

對比下會發現redis的執行效率會比zk的快一點。
項目下載地址:https://github.com/ProjectSharing/Lock
