分布式鎖
1、鎖:
單進程的系統中,存在多線程同時操作一個公共變量,此時需要加鎖對變量進行同步操作,保證多線程的操作線性執行消除並發修改。解決的是單進程中的多線程並發問題。
2、分布式鎖:
只要的應用場景是在集群模式的多個相同服務,可能會部署在不同機器上,解決進程間安全問題,防止多進程同時操作一個變量或者數據庫。解決的是多進程的並發問題。
一、基於Redis實現
在Redis2.6.12版本之前,使用setnx命令設置key-value、使用expire命令設置key的過期時間獲取分布式鎖,使用del命令釋放分布式鎖,但是這種實現有如下一些問題:
1. setnx命令設置完key-value后,還沒來得及使用expire命令設置過期時間,當前線程掛掉了,會導致當前線程設置的key一直有效,后續線程無法正常通過setnx獲取鎖,造成死鎖;
這個問題很好解決,只因為這兩個不是一個原子操作。2.6.12之后的版本,set命令進行了增強SET key value [EX seconds] [PX milliseconds] [NX|XX]
- EX second :設置鍵的過期時間為 second 秒。 SET key value EX second 效果等同於 SETEX key second value 。
- PX millisecond :設置鍵的過期時間為 millisecond 毫秒。 SET key value PX millisecond 效果等同於 PSETEX key millisecond value 。
- NX :只在鍵不存在時,才對鍵進行設置操作。 SET key value NX 效果等同於 SETNX key value 。
- XX :只在鍵已經存在時,才對鍵進行設置操作。
2. 在分布式環境下,進程A通過這種實現方式獲取到了鎖,但是在獲取到鎖之后,執行被阻塞,阻塞時間大於key超時時間導致該鎖失效;之后進程B獲取到該鎖,之后進程A恢復執行,執行完成后釋放該鎖,將會把進程B的鎖也釋放掉。也就是把他人的鎖釋放掉的問題,實際上還有另一個問題就是任務完成之前key失效的問題;
這個問題也很好解決,只需要在value中存放一個唯一標識符,釋放的時候判斷是不是自己的標識符即可,如果是自己的,就可以釋放
3. 為了實現高可用,將會選擇主從復制機制,但是主從復制機制是異步的,會出現數據不同步的問題,可能導致多個機器的多個線程獲取到同一個鎖。
解決方案是不采用主從復制,使用RedLock算法(官方推薦)
1.獲取當前Unix時間,以毫秒為單位
2.依次嘗試從N個實例,使用相同的key和隨機值獲取鎖。當向Redis設置鎖時,客戶端應該設置一個網絡連接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免服務器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試另外一個Redis實例
3.客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(這里是3個節點)的Redis節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功
4.如果取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)
5.如果因為某些原因,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功)。
這里我提供兩種實現(測試的話新建一個SpringBoot工程,引入spring-boot-starter-data-redis依賴)
利用set
package com.example.demo.utils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.BoundValueOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.util.StringUtils; import java.util.Random; import java.util.concurrent.TimeUnit; /** * 要求: * 1. 在分布式系統環境下,一個方法或者變量同一時間只能被一個線程操作 * 2. 具備鎖失效機制,網絡中斷或宕機無法釋放鎖時,鎖必須被刪除,防止死鎖 * 3. 具備阻塞鎖特性,即沒有獲取到鎖,則繼續等待獲取鎖 * 4. 具備非阻塞鎖特性,即沒有獲取到鎖,則直接返回獲取鎖失敗 * 5. 具備可重入特性,一個線程中可以多次獲取同一把鎖 * * 問題: * 1. setnx后發生了異常而沒有設置過期時間,導致key一直有效,造成死鎖(SEX命令代替SEXNX+EXPIRE) * 2. 分布式環境下,進程A獲取到了鎖,但是執行時間過長,鎖已經自動失效了,此時進程B獲取到了鎖,然后進程A執行完畢,執行del命令,把鎖釋放掉,結果把進程B的也釋放了,導致不可預知的問題(可以把value設置成自己唯一標識符,這樣只能刪除自己的鎖) * 3. Redis集群中主從數據同步是從庫異步讀取主庫,會出現數據不同步的問題,這樣導致多個進程獲取同一把鎖(不采用主從復制,使用RedLock算法) * 4. 在任務執行完之前,key失效了怎么辦?(鎖獲取成功后,注冊一個定時任務,每隔一定時間(this.internalLockLeaseTime / 3L)就去續約) * * 擴展:Redis 集群沒有使用一致性hash, 而是引入了哈希槽的概念。 * Reds 集群有16384個哈希槽,每個key通過CRC16校驗后對16384取模來決定放置哪個槽.集群的每個節點負責一部分hash槽。這種結構很容易添加或者刪除節點,並且無論是添加刪除或者修改某一個節點,都不會造成集群不可用的狀態。 * 使用哈希槽的好處就在於可以方便的添加或移除節點。 * 當需要增加節點時,只需要把其他節點的某些哈希槽挪到新節點就可以了; * 當需要移除節點時,只需要把移除節點上的哈希槽挪到其他節點就行了; * * 兩種實現: * 1. 利用set操作,設置失效時間。(問題:不好控制鎖的失效時間,可能在key失效之前,同步任務沒有完成) * 2. 利用setnx、get、getset操作,不設置失效時間,失效時間作為value存儲 * * * SET key value [EX seconds] [PX milliseconds] [NX|XX] * EX-過期時間秒;PX-過期時間毫秒;NX-key不存在則設置成功;XX-key存在則設置成功 * * GETSET key value(原子操作) * 將給定 key 的值設為 value ,並返回 key 的舊值(old value)。 */ @Slf4j public class RedisLockUtils { // key失效時間:60000毫秒 private static final Long EXPIRE_TIME = 60L * 1000; // 線程等待時間:10000毫秒 private static final Long WAIT_MAX_TIMEOUT = 10L * 1000; /** * 獲取鎖 * @param redis redis服務 * @param key key * @param ownerFlag 鎖擁有者標識,防止把其他人的釋放了 * @param operation 業務邏輯 * @param <T> * @return */ public static <T> T executeOperation(StringRedisTemplate redis, String key, String ownerFlag, RedisLockUtils.Operation<T> operation) { validParam(redis, key, ownerFlag); long start = System.currentTimeMillis(); boolean locked = false; T resultObj; try { Random rand = new Random(); for (;;) { BoundValueOperations<String, String> ops = redis.boundValueOps(key); // 設置key if (ops.setIfAbsent(ownerFlag, EXPIRE_TIME, TimeUnit.SECONDS)) { locked = true; break; } // 超時 if ((System.currentTimeMillis() - start) >= WAIT_MAX_TIMEOUT){ break; } // 獲取不到鎖,睡眠隨機時間 int sleepTime = rand.nextInt(1001); // 取值:0-1000毫秒 Thread.sleep(sleepTime); } // 執行業務邏輯 resultObj = operation.invoke(locked); if (log.isDebugEnabled()) { log.debug("executeOperation success"); } } catch (Exception e) { throw new RuntimeException(e); } finally { // 獲取鎖持有者的標識 String v = redis.boundValueOps(key).get(); log.info("release lock,value:{},ownerFlag:{}", v, ownerFlag); // 只能釋放自己的 if (ownerFlag.equals(v)){ redis.boundValueOps(key).expire(0L, TimeUnit.SECONDS); } } return resultObj; } private static void validParam(StringRedisTemplate redis, String key, String ownerFlag){ if (redis == null){ throw new RuntimeException("redisService can't be null"); } if (StringUtils.isEmpty(key)){ throw new RuntimeException("key can't be null"); } if (StringUtils.isEmpty(ownerFlag)){ throw new RuntimeException("ownerFlag can't be null"); } } @FunctionalInterface public interface Operation<T>{ T invoke(boolean locked); } }
測試
package com.example.demo; import com.example.demo.utils.RedisDistributeLock; import com.example.demo.utils.RedisLockUtils; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.*; @SpringBootTest class DemoApplicationTests { @Autowired private StringRedisTemplate stringRedisTemplate; // 共享變量 private int share = 0; @Test void testOne() throws InterruptedException { // 線程數量 int threadCount = 100; CountDownLatch countDownLatch = new CountDownLatch(threadCount); CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount); ExecutorService executorService = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount; i++){ executorService.execute(() -> { try { // 讓所有線程同時開始執行 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } Integer result = RedisLockUtils.executeOperation(stringRedisTemplate, "redis_lock", Thread.currentThread().getName(), locked -> { if (locked) { return share++; } return -1; }); System.out.println(result); countDownLatch.countDown(); }); } // 為了等線程全部執行完畢才離開測試代碼塊 countDownLatch.await(); System.out.println("=================================="); } }
輸出:
利用setnx + get + getset
package com.example.demo.utils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.Random; import java.util.concurrent.TimeUnit; @Slf4j public class RedisDistributeLock { /** * 鎖超時時間,防止線程在入鎖以后,無限的執行等待 */ private static int EXPIRE_TIMEOUT = 60 * 1000; /** * 鎖等待時間,防止線程飢餓 */ private static int WAIT_TIMEOUT = 10 * 1000; public static <T> T executeOperation(StringRedisTemplate redisService, String lockKey, RedisDistributeLock.Operation<T> operation) { T resultObj; long start = System.currentTimeMillis(); Random rand = new Random(); boolean locked = false; try { for (;;) { long expireTime = System.currentTimeMillis() + EXPIRE_TIMEOUT + 1; String expireValue = String.valueOf(expireTime); // 鎖到期時間 // 1. setnx:嘗試獲取鎖 if (redisService.boundValueOps(lockKey).setIfAbsent(expireValue)) { // 設置成功,獲取到鎖 locked = true; break; } // 2. get:獲取已占用的鎖設置的value【走到這,說明上一步中鎖被占用】 String currentExpireValue = redisService.boundValueOps(lockKey).get(); // redis里的時間 // redis里的時間小於當前時間【說明已經超時,其它線程可以搶占了】 if (currentExpireValue != null && Long.parseLong(currentExpireValue) < System.currentTimeMillis()) { // 3. getset:設置新時間,返回舊時間 String oldExpireValue = redisService.boundValueOps(lockKey).getAndSet(expireValue); // 只有一個線程能成功getset,當get的返回值和getset的返回值一樣,才能獲取到鎖,如果不一致,則證明被其它線程獲取了 if (oldExpireValue != null && oldExpireValue.equals(currentExpireValue)) { // getset成功,獲取到鎖 locked = true; break; } } // 4. 獲取鎖超時 if ((System.currentTimeMillis() - start) >= WAIT_TIMEOUT){ break; } // 獲取不到鎖,睡眠隨機時間 int sleepTime = rand.nextInt(1001); // 取值:0-1000毫秒 Thread.sleep(sleepTime); } // 執行業務邏輯 resultObj = operation.invoke(locked); } catch (Exception e) { throw new RuntimeException(e); } finally { // 5. 最后,如果未超時,則刪除鎖 if (System.currentTimeMillis() < Long.parseLong(redisService.boundValueOps(lockKey).get())) { redisService.boundValueOps(lockKey).expire(0L, TimeUnit.SECONDS); } } return resultObj; } @FunctionalInterface public interface Operation<T>{ T invoke(boolean locked); } }
測試
package com.example.demo; import com.example.demo.utils.RedisDistributeLock; import com.example.demo.utils.RedisLockUtils; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.*; @SpringBootTest class DemoApplicationTests { @Autowired private StringRedisTemplate stringRedisTemplate; // 共享變量 private int share = 0; @Test void testTwo() throws InterruptedException { // 線程數量 int threadCount = 100; CountDownLatch countDownLatch = new CountDownLatch(threadCount); CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount); ExecutorService executorService = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount; i++){ executorService.execute(() -> { try { // 讓所有線程同時開始執行 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } Integer result = RedisDistributeLock.executeOperation(stringRedisTemplate, "redis_lock_time", locked -> { if (locked) { return share++; } return -1; }); System.out.println(result); countDownLatch.countDown(); }); } // 為了等線程全部執行完畢才離開測試代碼塊 countDownLatch.await(); System.out.println("=================================="); } }
輸出:
二、基於ZooKeeper的實現方式
利用zookeeper規定同一個目錄下只能有一個唯一文件名的特性。
1. 創建一個目錄mylock;
2. 線程A想獲取鎖就在mylock目錄下創建臨時順序節點;
3. 獲取mylock目錄下所有的子節點,然后獲取比自己小的兄弟節點,如果不存在,則說明當前線程順序號最小,獲得鎖;
4. 線程B獲取所有節點,判斷自己不是最小節點,設置監聽比自己次小的節點;
5. 線程A處理完,刪除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。
------------------------------------------------------------------------------------------------
另附:Apache的開源ZooKeeper客戶端Curator,提供了分布式鎖的實現 InterProcessMutex,acquire方法用於獲取鎖,release方法用於釋放鎖。
優點:具備高可用、可重入、阻塞鎖特性,可解決失效死鎖問題。
缺點:因為需要頻繁的創建和刪除節點,性能上不如Redis方式。
分布式事務
1、事務
解決一個會話過程中,上下文的修改對所有數據庫表的操作要么全部成功,要不全部失敗。所以應用在service層。解決的是一個會話中的操作的數據一致性。
2、分布式事務
比如一個轉賬操作:
(1)招行賬戶減少100
(2)建行賬戶增加100
這時候需要保證對兩個服務的操作全部成功或者全部回退。解決的是組合服務的數據操作的一致性問題。
ACID
數據庫事務的四大特性(ACID):原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)、持久性(Durability)。
⑴ 原子性(Atomicity)
一個事務內所有操作共同組成一個原子包,要么全部成功,要么全部失敗。
⑵ 一致性(Consistency)
數據庫事務的一致性就規定了事物提交前后,永遠只可能存在事物提交前的狀態和事物提交后的狀態,從一個一致性的狀態到另一個一致性狀態,而不可能出現中間的過程態。也就是說事物的執行結果是量子化狀態,而不是線性狀態。
⑶ 隔離性(Isolation)
事務的隔離性,基於原子性和一致性,因為事務是原子化,量子化的,所以,事務可以有多個原子包的形式並發執行,但是,每個事務互不干擾。
⑷ 持久性(Durability)
持久性,當一個事物提交之后,數據庫狀態永遠的發生了改變。
CAP
一個分布式系統不可能同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多滿足其中兩項。
一致性:指的是在分布式環境下,數據在多個副本之間是否能夠保持一致的特性。比如系統的數據副本分布在不同節點上,對第一個節點的數據進行更新后,其它的節點也應該得到相應的更新。
可用性:指的是系統提供的服務必須處於一直可用的狀態。
分區容錯性:分布式系統在遇到任何網絡分區故障的時候,仍然需要保證對外提供滿足一致性和可用性的服務,除非是整個網絡環境都發生了故障。
BASE
BASE理論:是Basically Available(基本可用),Soft state(軟狀態),Eventually consistent(最終一致性)三個短語的簡寫,其核心思想是,即使無法做到強一致性,每個應用也應該采用適當的方式來達到最終一致性。
基本可用:出現不可預知的故障,允許損失部分可用性。比如發生故障的時候比正常慢了1秒。
軟狀態:允許系統在不同的節點進行數據同步的過程中存在延時。
最終一致性:系統中所有副本,在經過一段時間的同步后,最終能達到一致性的狀態。
一致性解決方案
2PC(二階段提交)
- 第一階段是表決階段,所有參與者都將本事務能否成功的信息反饋發給協調者;
- 第二階段是執行階段,協調者根據所有參與者的反饋,通知所有參與者,步調一致地在所有分支上提交或者回滾;
3PC(三階段提交,2PC的改進版本)
- CanCommit:
- 事務詢問:協調者向參與者發送一個包含事務內容的 canCommit 請求,詢問是否可以執行事務提交操作,並等待響應。
- 各參與者向協調者反饋事務詢問的響應,如果參與者認為自己可以順利執行事務,就返回 Yes,否則反饋 No 響應。
- PreCommit
協調者在得到所有參與者的響應之后,會根據結果執行2種操作:執行事務預提交,或者中斷事務。
1. 執行事務預提交:
首先協調者向所有參與者節點發出 preCommit 的請求,並等待響應。然后參與者受到 preCommit 請求后,會執行事務操作,並將結果返回。最后協調者得到了Ack響應,確定下一階段是否為提交或者是終止操作。
2. 中斷事務也分為2個步驟:
首先協調者向所有參與者節點發出 abort 請求 。然后參與者如果收到 abort 請求或者超時了,都會中斷事務。
- do Commit
1. 執行提交:
首先協調者發送提交請求,並等待Ack 響應。然后參與者收到 doCommit 請求后,執行事務並反饋事務提交結果,向協調者發送 Ack 消息。最后協調者接收 Ack 消息后,完成事務。
2. 中斷事務:
中斷事務是因為出現了異常,比如協調者一方出現了問題,或者是協調者與參與者之間出現了故障。
首先協調者向所有的參與者發送中斷請求。然后參與者接收到中斷請求后,會利用其在二階段記錄的 undo 信息來執行事務回滾操作,並釋放資源。接下來參與者在完成事務回滾之后,向協調者發送 Ack 消息。最后協調者接收到所有的 Ack 消息后,中斷事務。
TCC
TCC 其實就是采用的補償機制,其核心思想是:針對每個操作,都要注冊一個與其對應的確認和補償(撤銷)操作。
- Try 階段主要是對業務系統做檢測及資源預留
- Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,默認 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
- Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。
可靠消息最終一致性
可靠消息最終一致性事務適合執行周期長且實時性要求不高的場景,引入消息機制后,同步的事務操作變為基於消息執行的異步操作,避免分布式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。
比如:上游應用業務執行成功后發送消息,消息系統保證消息的傳遞,下游應用監聽消息並執行相關業務。