使用場景
首先,我們看這樣一個場景:客戶下單的時候,我們調用庫存中心進行減庫存,那我們一般的操作都是:
update store set num = $num where id = $id
這種通過設置庫存的修改方式,我們知道在並發量高的時候會存在數據庫的丟失更新,比如 a, b 當前兩個事務,查詢出來的庫存都是 5,a 買了 3 個單子要把庫存設置為 2,而 b 買了 1 個單子要把庫存設置為 4,那這個時候就會出現 a 會覆蓋 b 的更新,所以我們更多的都是會加個條件:
update store set num = $num where id = $id and num = $query_num
即樂觀鎖的方式來處理,當然也可以通過版本號來處理樂觀鎖,都是一樣的,但是這是更新一個表,如果我們牽扯到多個表呢,我們希望和這個單子關聯的所有的表同一時間只能被一個線程來處理更新,多個線程按照不同的順序去更新同一個單子關聯的不同數據,出現死鎖的概率比較大。對於非敏感的數據,我們也沒有必要去都加樂觀鎖處理,我們的服務都是多機器部署的,要保證多進程多線程同時只能有一個進程的一個線程去處理,這個時候我們就需要用到分布式鎖。分布式鎖的實現方式有很多,我們今天分別通過數據庫,Zookeeper, Redis 以及 Tair 的實現邏輯。
(一)數據庫實現
加 xx 鎖
更新一個單子關聯的所有的數據,先查詢出這個單子,並加上排他鎖,在進行一系列的更新操作
begin transaction; select ...for update; doSomething(); commit();
這種處理主要依靠排他鎖來阻塞其他線程,不過這個需要注意幾點:
- 查詢的數據一定要在數據庫里存在,如果不存在的話,數據庫會加 gap 鎖,而 gap 鎖之間是兼容的,這種如果兩個線程都加了gap 鎖,另一個再更新的話會出現死鎖。不過一般能更新的數據都是存在的
- 后續的處理流程需要盡可能的時間短,即在更新的時候提前准備好數據,保證事務處理的時間足夠的短,流程足夠的短,因為開啟事務是一直占着連接的,如果流程比較長會消耗過多的數據庫連接的
唯一鍵
通過在一張表里創建唯一鍵來獲取鎖,比如執行 saveStore 這個方法
insert table lock_store ('method_name') values($method_name)
其中 method_name
是個唯一鍵,通過這種方式也可以做到,解鎖的時候直接刪除改行記錄就行。不過這種方式,鎖就不會是阻塞式的,因為插入數據是立馬可以得到返回結果的。
那針對以上數據庫實現的兩種分布式鎖,存在什么樣的優缺點呢?
優點
簡單,方便,快速實現
缺點
- 基於數據庫,開銷比較大,性能可能會存在影響
- 基於數據庫的當前讀來實現,數據庫會在底層做優化,可能用到索引,可能不用到索引,這個依賴於查詢計划的分析
(二)Zookeeper 實現
獲取鎖
- 先有一個鎖跟節點,lockRootNode,這可以是一個永久的節點
- 客戶端獲取鎖,先在 lockRootNode 下創建一個順序的瞬時節點,保證客戶端斷開連接,節點也自動刪除
- 調用 lockRootNode 父節點的 getChildren() 方法,獲取所有的節點,並從小到大排序,如果創建的最小的節點是當前節點,則返回 true,獲取鎖成功,否則,關注比自己序號小的節點的釋放動作(exist watch),這樣可以保證每一個客戶端只需要關注一個節點,不需要關注所有的節點,避免羊群效應。
- 如果有節點釋放操作,重復步驟 3
釋放鎖
只需要刪除步驟 2 中創建的節點即可
使用 Zookeeper 的分布式鎖存在什么樣的優缺點呢?
優點
- 客戶端如果出現宕機故障的話,鎖可以馬上釋放
- 可以實現阻塞式鎖,通過 watcher 監聽,實現起來也比較簡單
- 集群模式,穩定性比較高
缺點
- 一旦網絡有任何的抖動,Zookeeper 就會認為客戶端已經宕機,就會斷掉連接,其他客戶端就可以獲取到鎖。當然 Zookeeper 有重試機制,這個就比較依賴於其重試機制的策略了
- 性能上不如緩存
(三)Redis 實現
我們先舉個例子,比如現在我要更新產品的信息,產品的唯一鍵就是 productId
#簡單實現 1
public boolean lock(String key, V v, int expireTime){ int retry = 0; //獲取鎖失敗最多嘗試10次 while (retry < failRetryTimes){ //獲取鎖 Boolean result = redis.setNx(key, v, expireTime); if (result){ return true; } try { //獲取鎖失敗間隔一段時間重試 TimeUnit.MILLISECONDS.sleep(sleepInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } return false; } public boolean unlock(String key){ return redis.delete(key); } public static void main(String[] args) { Integer productId = 324324; RedisLock<Integer> redisLock = new RedisLock<Integer>(); redisLock.lock(productId+"", productId, 1000); } }
這是一個簡單的實現,存在的問題:
- 可能會導致當前線程的鎖誤被其他線程釋放,比如 a 線程獲取到了鎖正在執行,但是由於內部流程處理超時或者 gc 導致鎖過期,這個時候b線程獲取到了鎖,a 和 b 線程處理的是同一個 productId,b還在處理的過程中,這個時候 a 處理完了,a 去釋放鎖,可能就會導致 a 把 b 獲取的鎖釋放了。
- 不能實現可重入
- 客戶端如果第一次已經設置成功,但是由於超時返回失敗,此后客戶端嘗試會一直失敗
針對以上問題我們改進下:
- v 傳 requestId,然后我們在釋放鎖的時候判斷一下,如果是當前 requestId,那就可以釋放,否則不允許釋放
- 加入 count 的鎖計數,在獲取鎖的時候查詢一次,如果是當前線程已經持有的鎖,那鎖技術加 1,直接返回 true
#簡單實現 2
private static volatile int count = 0; public boolean lock(String key, V v, int expireTime){ int retry = 0; //獲取鎖失敗最多嘗試10次 while (retry < failRetryTimes){ //1.先獲取鎖,如果是當前線程已經持有,則直接返回 //2.防止后面設置鎖超時,其實是設置成功,而網絡超時導致客戶端返回失敗,所以獲取鎖之前需要查詢一下 V value = redis.get(key); //如果當前鎖存在,並且屬於當前線程持有,則鎖計數+1,直接返回 if (null != value && value.equals(v)){ count ++; return true; } //如果鎖已經被持有了,那需要等待鎖的釋放 if (value == null || count <= 0){ //獲取鎖 Boolean result = redis.setNx(key, v, expireTime); if (result){ count = 1; return true; } } try { //獲取鎖失敗間隔一段時間重試 TimeUnit.MILLISECONDS.sleep(sleepInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } return false; } public boolean unlock(String key, String requestId){ String value = redis.get(key); if (Strings.isNullOrEmpty(value)){ count = 0; return true; } //判斷當前鎖的持有者是否是當前線程,如果是的話釋放鎖,不是的話返回false if (value.equals(requestId)){ if (count > 1){ count -- ; return true; } boolean delete = redis.delete(key); if (delete){ count = 0; } return delete; } return false; } public static void main(String[] args) { Integer productId = 324324; RedisLock<String> redisLock = new RedisLock<String>(); String requestId = UUID.randomUUID().toString(); redisLock.lock(productId+"", requestId, 1000)