Java進階專題(二十五) 分布式鎖實現業務冪等


前言

​ 現如今很多系統都會基於分布式或微服務思想完成對系統的架構設計。那么在這一個系統中,就會存在若干個微服務,而且服務間也會產生相互通信調用。那么既然產生了服務調用,就必然會存在服務調用延遲或失敗的問題。當出現這種問題,服務端會進行重試等操作或客戶端有可能會進行多次點擊提交。如果這樣請求多次的話,那最終處理的數據結果就一定要保證統一,如支付場景。此時就需要通過保證業務冪等性方案來完成。

什么是冪等

​ 冪等本身是一個數學概念。即 f(n) = 1^n ,無論n為多少,f(n)的值永遠為1。在編程開發中,對於冪等的定義為:無論對某一個資源操作了多少次,其影響都應是相同的。 換句話說就是:在接口重復調用的情況下,對系統產生的影響是一樣的,但是返回值允許不同,如查詢。
​ 冪等性不僅僅只是一次或多次操作對資源沒有產生影響,還包括第一次操作產生影響后,以后多次操作不會再產生影響。並且冪等關注的是是否對資源產生影響,而不關注結果。

以SQL為例:

select * from table where id=1 。此SQL無論執行多少次,雖然結果有可能出現不同,都不會對數據產生
改變,具備冪等性。
insert into table(id,name) values(1,'heima') 。此SQL如果id或name有唯一性約束,多次操作只允許插
入一條記錄,則具備冪等性。如果不是,則不具備冪等性,多次操作會產生多條數據。
update table set score=100 where id = 1 。此SQL無論執行多少次,對數據產生的影響都是相同的。具備
冪等性。
update table set score=50+score where id = 1 。此SQL涉及到了計算,每次操作對數據都會產生影響。
不具備冪等性。
delete from table where id = 1 。此SQL多次操作,產生的結果相同,具備冪等性。

冪等性設計主要從兩個維度進行考慮:空間、時間。
空間:定義了冪等的范圍,如生成訂單的話,不允許出現重復下單。
時間:定義冪等的有效期。有些業務需要永久性保證冪等,如下單、支付等。而部分業務只要保證一段時間
冪等即可。
同時對於冪等的使用一般都會伴隨着出現鎖的概念,用於解決並發安全問題。

接口冪等

​ 對於冪等的考慮,主要解決兩點前后端交互與服務間交互。這兩點有時都要考慮冪等性的實現。從前端的思路解決
的話,主要有三種:前端防重、PRG模式、Token機制
2.1)前端防重
​ 通過前端防重保證冪等是最簡單的實現方式,前端相關屬性和JS代碼即可完成設置。可靠性並不好,有經驗的人員可以通過工具跳過頁面仍能重復提交。主要適用於表單重復提交或按鈕重復點擊。
2.2)PRG模式
​ PRG模式即POST-REDIRECT-GET。當用戶進行表單提交時,會重定向到另外一個提交成功頁面,而不是停留在原先的表單頁面。這樣就避免了用戶刷新導致重復提交。同時防止了通過瀏覽器按鈕前進/后退導致表單重復提交。
是一種比較常見的前端防重策略。
2.3)token機制
​ 2.3.1)方案介紹
​ 通過token機制來保證冪等是一種非常常見的解決方案,同時也適合絕大部分場景。該方案需要前后端進行一定程度的交互來完成。

1)服務端提供獲取token接口,供客戶端進行使用。服務端生成token后,如果當前為分布式架構,將token存放於redis中,如果是單體架構,可以保存在jvm緩存中。

2)當客戶端獲取到token后,會攜帶着token發起請求。

3)服務端接收到客戶端請求后,首先會判斷該token在redis中是否存在。如果存在,則完成進行業務處理,業務處理完成后,再刪除token。如果不存在,代表當前請求是重復請求,直接向客戶端返回對應標識。

​ 但是現在有一個問題,當前是先執行業務再刪除token。在高並發下,很有可能出現第一次訪問時token存在,完成具體業務操作。但在還沒有刪除token時,客戶端又攜帶token發起請求,此時,因為token還存在,第二次請求也會驗證通過,執行具體業務操作。

對於這個問題的解決方案的思想就是並行變串行。會造成一定性能損耗與吞吐量降低。
第一種方案:對於業務代碼執行和刪除token整體加線程鎖。當后續線程再來訪問時,則阻塞排隊。
第二種方案:借助redis單線程和incr是原子性的特點。當第一次獲取token時,以token作為key,對其進行自增。
然后將token進行返回,當客戶端攜帶token訪問執行業務代碼時,對於判斷token是否存在不用刪除,而是對其繼續incr。如果incr后的返回值為2。則是一個合法請求允許執行,如果是其他值,則代表是非法請求,直接返回。

​ 那如果先刪除token再執行業務呢?其實也會存在問題,假設具體業務代碼執行超時或失敗,沒有向客戶端返回明確結果,那客戶端就很有可能會進行重試,但此時之前的token已經被刪除了,則會被認為是重復請求,不再進行業務處理。

​ 這種方案無需進行額外處理,一個token只能代表一次請求。一旦業務執行出現異常,則讓客戶端重新獲取令牌,重新發起一次訪問即可。推薦使用先刪除token方案
​ 但是無論先刪token還是后刪token,都會有一個相同的問題。每次業務請求都回產生一個額外的請求去獲取token。但是,業務失敗或超時,在生產環境下,一萬個里最多也就十個左右會失敗,那為了這十來個請求,讓其他九千九百多個請求都產生額外請求,就有一些得不償失了。雖然redis性能好,但是這也是一種資源的浪費。

服務冪等

防重表

對於防止數據重復提交,還有一種解決方案就是通過防重表實現。防重表的實現思路也非常簡單。首先創建一張表
作為防重表,同時在該表中建立一個或多個字段的唯一索引作為防重字段,用於保證並發情況下,數據只有一條。
在向業務表中插入數據之前先向防重表插入,如果插入失敗則表示是重復數據。

對於防重表的解決方案,可能有人會說為什么不使用悲觀鎖。悲觀鎖在使用的過程中也是會發生死鎖的。悲觀鎖是
通過鎖表的方式實現的。 假設現在一個用戶A訪問表A(鎖住了表A),然后試圖訪問表B; 另一個用戶B訪問表
B(鎖住了表B),然后試圖訪問表A。 這時對於用戶A來說,由於表B已經被用戶B鎖住了,所以用戶A必須等到用
戶B釋放表B才能訪問。 同時對於用戶B來說,由於表A已經被用戶A鎖住了,所以用戶B必須等到用戶A釋放表A才
能訪問。此時死鎖就已經產生了。

Mysql樂觀鎖保證冪等

MySQL樂觀鎖是基於數據庫完成分布式鎖的一種實現,實現的方式有兩種:基於版本號、基於條件。但是實現思
想都是基於MySQL的行鎖思想來實現的。

通過版本號控制是一種非常常見的方式,適合於大多數場景。但現在庫存扣減的場景來說,通過版本號控制就是多
人並發訪問購買時,查詢時顯示可以購買,但最終只有一個人能成功,這也是不可以的。其實最終只要商品庫存不
發生超賣就可以。那此時就可以通過條件來進行控制。

mysql樂觀鎖更適用於一些需要計數的表上,而且在競爭不激烈,出現並發沖突幾率較小時,推薦使用樂觀鎖。雖
然通過MySQL樂觀鎖可以完成並發控制,但鎖的操作是直接作用於數據庫上,這樣就會在一定程度上對數據庫性能產生影響。並且mysql的連接數量是有限的,如果出現大量鎖操作占用連接時,也會造成MySQL的性能瓶頸。

zookeeper分布式鎖

實現思想

對於分布式鎖的實現,zookeeper天然攜帶的一些特性能夠很完美的實現分布式鎖。其內部主要是利用znode節點
特性和watch機制完成。

在zookeeper中節點會分為四類,分別是:
持久節點:一旦創建,則永久存在於zookeeper中,除非手動刪除。
持久有序節點:一旦創建,則永久存在於zookeeper中,除非手動刪除。同時每個節點都會默認存在節點序號,每個節點的序號都是有序遞增的。如demo000001、demo000002.....demo00000N。
臨時節點:當節點創建后,一旦服務器重啟或宕機,則被自動刪除。
臨時有序節點:當節點創建后,一旦服務器重啟或宕機,則被自動刪除。同時每個節點都會默認存在節點序號,每個節點的序號都是有序遞增的。如demo000001、demo000002.....demo00000N。

watch監聽機制

watch監聽機制主要用於監聽節點狀態變更,用於后續事件觸發,假設當B節點監聽A節點時,一旦A節點發生修
改、刪除、子節點列表發生變更等事件,B節點則會收到A節點改變的通知,接着完成其他額外事情。

實現原理

其實現思想是當某個線程要對方法加鎖時,首先會在zookeeper中創建一個與當前方法對應的父節點,接着每個要
獲取當前方法的鎖的線程,都會在父節點下創建一個臨時有序節點,因為節點序號是遞增的,所以后續要獲取鎖的
線程在zookeeper中的序號也是逐次遞增的。根據這個特性,當前序號最小的節點一定是首先要獲取鎖的線程,因
此可以規定序號最小的節點獲得鎖。所以,每個線程再要獲取鎖時,可以判斷自己的節點序號是否是最小的,如果
是則獲取到鎖。當釋放鎖時,只需將自己的臨時有序節點刪除即可。

在並發下,每個線程都會在對應方法節點下創建屬於自己的臨時節點,且每個節點都是臨時且有序的。
那么zookeeper又是如何有序的將鎖分配給不同線程呢? 這里就應用到了watch監聽機制。每當添加一個新的臨時
節點時,其都會基於watcher機制監聽着它本身的前一個節點等待前一個節點的通知,當前一個節點刪除時,就輪
到它來持有鎖了。然后依次類推。

優缺點

1)zookeeper是基於cp模式,能夠保證數據強一致性。

2)基於watch機制實現鎖釋放的自動監聽,鎖操作性能較好。

3)頻繁創建節點,對於zk服務器壓力較大,吞吐量沒有redis強。

原理剖析

低效鎖思想

在通過zookeeper實現分布式鎖時,有另外一種實現的寫法,這種也是非常常見的,但是它的效率並不高,此處可
以先對這種實現方式進行探討。

此種實現方式,只會存在一個鎖節點。當創建鎖節點時,如果鎖節點不存在,則創建成功,代表當前線程獲取到
鎖,如果創建鎖節點失敗,代表已經有其他線程獲取到鎖,則該線程會監聽鎖節點的釋放。當鎖節點釋放后,則繼
續嘗試創建鎖節點加鎖。

這種方案的低效點就在於,只有一個鎖節點,其他線程都會監聽同一個鎖節點,一旦鎖節點釋放后,其他線程都會
收到通知,然后競爭獲取鎖節點。這種大量的通知操作會嚴重降低zookeeper性能,對於這種由於一個被watch的
znode節點的變化,而造成大量的通知操作,叫做羊群效應

高效鎖思想

為了避免羊群效應的出現,業界內普遍的解決方案就是,讓獲取鎖的線程產生排隊,后一個監聽前一個,依次排
序。推薦使用這種方式實現分布式鎖

按照上述流程會在根節點下為每一個等待獲取鎖的線程創建一個對應的臨時有序節點,序號最小的節點會持有鎖,
並且后一個節點只監聽其前面的一個節點,從而可以讓獲取鎖的過程有序且高效。

代碼實現

public abstract class AbstractLock {
 
    //zookeeper服務器地址
    public static final String ZK_SERVER_ADDR="192.168.200.131:2181";
 
    //zookeeper超時時間
    public static final int CONNECTION_TIME_OUT=30000;
    public static final int SESSION_TIME_OUT=30000;
 
    //創建zk客戶端
    protected ZkClient zkClient = new 
ZkClient(ZK_SERVER_ADDR,SESSION_TIME_OUT,CONNECTION_TIME_OUT);
 
    /**
     * 獲取鎖
     * @return
     */
    public abstract boolean tryLock();
 
    /**
     * 等待加鎖
     */
    public abstract void waitLock();
 
    /**
     * 釋放鎖
     */
    public abstract void releaseLock();
 
    public void getLock() {
 
        String threadName = Thread.currentThread().getName();
 
        if (tryLock()) {
            System.out.println(threadName+":   獲取鎖成功");
        }else {
            System.out.println(threadName+":   獲取鎖失敗,等待中");
            //等待鎖
            waitLock();
            getLock();
        }
    }
}
  public class HighLock extends AbstractLock{
 
    private static final String PARENT_NODE_PATH="/high_lock";
 
    //當前節點路徑
    private String currentNodePath;
 
    //前一個節點的路徑
    private String preNodePath;
 
    private CountDownLatch countDownLatch;
 
    @Override
    public boolean tryLock() {
 
        //判斷父節點是否存在
        if (!zkClient.exists(PARENT_NODE_PATH)){
            //不存在
            zkClient.createPersistent(PARENT_NODE_PATH);
        }
 
        //創建第一個臨時有序子節點
        if (currentNodePath == null || "".equals(currentNodePath)){
 
            //根節點下沒有節點信息,將當前節點作為第一個子節點,類型:臨時有序
            currentNodePath = zkClient.createEphemeralSequential(PARENT_NODE_PATH+"/","lock");
        }
 
        //不是第一個子節點,獲取父節點下所有子節點
        List<String> childrenNodeList = zkClient.getChildren(PARENT_NODE_PATH);
 
        //子節點升序排序
        Collections.sort(childrenNodeList);
 
        //判斷是否加鎖成功
        if (currentNodePath.equals(PARENT_NODE_PATH+"/"+childrenNodeList.get(0))){
            //當前節點是序號最小的節點
            return true;
        }else {
            //當前節點不是序號最小的節點,獲取其前面的節點名稱,並賦值
            int length = PARENT_NODE_PATH.length();
            int currentNodeNumber = Collections.binarySearch(childrenNodeList, 
currentNodePath.substring(length + 1));
            preNodePath = PARENT_NODE_PATH+"/"+childrenNodeList.get(currentNodeNumber‐1);
        }
        return false;
    }
 
    @Override
    public void waitLock() {
 
        IZkDataListener zkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
 
            }
 
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
 
                if (countDownLatch != null){
                    countDownLatch.countDown();
                }
            }
        };
 
        //監聽前一個節點的改變
        zkClient.subscribeDataChanges(preNodePath,zkDataListener);
 
        if (zkClient.exists(preNodePath)){
            countDownLatch = new CountDownLatch(1);
 
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
 
            }
        }
 
        zkClient.unsubscribeDataChanges(preNodePath,zkDataListener);
    }
 
    @Override
    public void releaseLock() {
        zkClient.delete(currentNodePath);
        zkClient.close();
    }
}

Redis分布式鎖

原理&實現

分布式鎖的一個很重要的特性就是互斥性,同一時間內多個調用方加鎖競爭,只能有一個調用方加鎖成功。而redis是基於單線程模型的,可以利用這個特性讓調用方的請求排隊,對於並發請求,只會有一個請求能獲取到鎖。
redis實現分布式鎖也很簡單,基於客戶端的幾個API就可以完成,主要涉及三個核心API:
setNx():向redis中存key-value,只有當key不存在時才會設置成功,否則返回0。用於體現互斥性。
expire():設置key的過期時間,用於避免死鎖出現。
delete():刪除key,用於釋放鎖。
1)編寫工具類實現加鎖
通過jedis.set進行加鎖,如果返回值是OK,代表加鎖成功
如果加鎖失敗,則自旋不斷嘗試獲取鎖,同時在一定時間內如果仍沒有獲取到鎖,則退出自旋,不再嘗試獲取鎖。
requestId:用於標識當前每個線程自己持有的鎖標記

public class SingleRedisLock {
 
    JedisPool jedisPool = new JedisPool("192.168.200.128",6379);
 
    //鎖過期時間
    protected long internalLockLeaseTime = 30000;
 
    //獲取鎖的超時時間
    private long timeout = 999999;
 
    /**
     * 加鎖
     * @param lockKey 鎖鍵
     * @param requestId 請求唯一標識
     * @return
     */
    SetParams setParams = SetParams.setParams().nx().px(internalLockLeaseTime);
 
    public boolean tryLock(String lockKey, String requestId){
 
        String threadName = Thread.currentThread().getName();
 
        Jedis jedis = this.jedisPool.getResource();
 
        Long start = System.currentTimeMillis();
 
        try{
            for (;;){
                String lockResult = jedis.set(lockKey, requestId, setParams);
                if ("OK".equals(lockResult)){
                    System.out.println(threadName+":   獲取鎖成功");
                    return true;
                }
                //否則循環等待,在timeout時間內仍未獲取到鎖,則獲取失敗
                System.out.println(threadName+":   獲取鎖失敗,等待中");
                long l = System.currentTimeMillis() ‐ start;
                if (l>=timeout) {
                    return false;
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            jedis.close();
        }
 
    }
}

解鎖時,要避免當前線程將別人的鎖釋放掉。假設線程A加鎖成功,當過了一段時間線程A來解鎖,但線程A的鎖已
經過期了,在這個時間節點,線程B也來加鎖,因為線程A的鎖已經過期,所以線程B時可以加鎖成功的。此時,就
會出現問題,線程A將線程B的鎖給釋放了。
對於這個問題,就需要使用到加鎖時的requestId。當解鎖時要判斷當前鎖鍵的value與傳入的value是否相同,相
同的話,則代表是同一個人,可以解鎖。否則不能解鎖。
但是對於這個操作,有非常多的人,會先查詢做對比,接着相同則刪除。雖然思路是對的,但是忽略了一個問題,
原子性。判斷與刪除分成兩步執行,則無法保證原子性,一樣會出現問題。所以解鎖時不僅要保證加鎖和解鎖是同
一個人還要保證解鎖的原子性。因此結合lua腳本完成查詢&刪除操作。

/**
     * 解鎖
     * @param lockKey 鎖鍵
     * @param requestId 請求唯一標識
     * @return
     */
public boolean releaseLock(String lockKey,String requestId){
 
    String threadName = Thread.currentThread().getName();
    System.out.println(threadName+":釋放鎖");
    Jedis jedis = this.jedisPool.getResource();
 
    String lua =
        "if redis.call('get',KEYS[1]) == ARGV[1] then" +
        "   return redis.call('del',KEYS[1]) " +
        "else" +
        "   return 0 " +
        "end";
 
    try {
        Object result = jedis.eval(lua, Collections.singletonList(lockKey),
                                   Collections.singletonList(requestId));
        if("1".equals(result.toString())){
            return true;
        }
        return false;
    }finally {
        jedis.close();
    }
 
}

測試類

public class LoclTest {
 
    public static void main(String[] args) {
 
        //模擬多個5個客戶端
        for (int i=0;i<5;i++) {
            Thread thread = new Thread(new LockRunnable());
            thread.start();
        }
    }
 
    private static class LockRunnable implements Runnable {
        @Override
        public void run() {
 
            SingleRedisLock singleRedisLock = new SingleRedisLock();
 
            String requestId = UUID.randomUUID().toString();
            boolean lockResult = singleRedisLock.tryLock("lock", requestId);
            if (lockResult){
 
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
 
            singleRedisLock.releaseLock("lock",requestId);
        }
    }
}

此時可以發現,多線程會競爭同一把鎖,且沒有獲取獲取到鎖的線程會自旋不斷嘗試去獲取鎖。每當一個線程將鎖
釋放后,則會有另外一個線程持有鎖。依次類推。

存在的問題

鎖續期

當對業務進行加鎖時,鎖的過期時間,絕對不能想當然的設置一個值。假設線程A在執行某個業務時加鎖成功
並設置鎖過期時間。但該業務執行時間過長,業務的執行時間超過了鎖過期時間,那么在業務還沒執行完
時,鎖就自動釋放了。接着后續線程就可以獲取到鎖,又來執行該業務。就會造成線程A還沒執行完,后續線
程又來執行,導致同一個業務邏輯被重復執行。因此對於鎖的超時時間,需要結合着業務執行時間來判斷,
讓鎖的過期時間大於業務執行時間。
上面的方案是一個基礎解決方案,但是仍然是有問題的。
業務執行時間的影響因素太多了,無法確定一個准確值,只能是一個估值。無法百分百保證業務執行期間,
鎖只能被一個線程占有。
如想保證的話,可以在創建鎖的同時創建一個守護線程,同時定義一個定時任務每隔一段時間去為未釋放的
鎖增加過期時間。當業務執行完,釋放鎖后,再關閉守護線程。 這種實現思想可以用來解決鎖續期。

服務單點&集群問題

在單點redis雖然可以完成鎖操作,可一旦redis服務節點掛掉了,則無法提供鎖操作。
在生產環境下,為了保證redis高可用,會采用異步復制方法進行主從部署。當主節點寫入數據成功,會異步的將
數據復制給從節點,並且當主節點宕機,從節點會被提升為主節點繼續工作。假設主節點寫入數據成功,在沒有將
數據復制給從節點時,主節點宕機。則會造成提升為主節點的從節點中是沒有鎖信息的,其他線程則又可以繼續加
鎖,導致互斥失效。

Redisson分布式鎖

redisson是redis官網推薦實現分布式鎖的一個第三方類庫。其內部完成的功能非常強大,對各種鎖都有實現,同
時對於使用者來說非常簡單,讓使用者能夠將更多的關注點放在業務邏輯上。此處重點利用Redisson解決單機
Redis鎖產生的兩個問題。

單機Redisson實現

依賴

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons‐pool2</artifactId>
</dependency>
<!‐‐Redis分布式鎖‐‐>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson‐spring‐boot‐starter</artifactId>
    <version>3.13.1</version>
</dependency>

配置文件

server:
  redis:
    host: 192.168.200.150
    port: 6379
    database: 0
    jedis:
      pool:
        max‐active: 500
        max‐idle: 1000
        min‐idle: 4

啟動類

@Value("${spring.redis.host}")
private String host;
 
@Value("${spring.redis.port}")
private String port;
 
@Bean
public RedissonClient redissonClient(){
    RedissonClient redissonClient;
 
    Config config = new Config();
    String url = "redis://" + host + ":" + port;
    config.useSingleServer().setAddress(url);
 
    try {
        redissonClient = Redisson.create(config);
        return redissonClient;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

鎖工具

 @Component
public class RedissonLock {
 
    @Autowired
    private RedissonClient redissonClient;
 
    /**
     * 加鎖
     * @param lockKey
     * @return
     */
    public boolean addLock(String lockKey){
 
        try {
            if (redissonClient == null){
                System.out.println("redisson client is null");
                return false;
            }
 
            RLock lock = redissonClient.getLock(lockKey);
 
            //設置鎖超時時間為5秒,到期自動釋放
            lock.lock(5, TimeUnit.SECONDS);
 
            System.out.println(Thread.currentThread().getName()+":  獲取到鎖");
 
            //加鎖成功
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
 
    public boolean releaseLock(String lockKey){
 
        try{
            if (redissonClient == null){
                System.out.println("redisson client is null");
                return false;
            }
 
            RLock lock = redissonClient.getLock(lockKey);
            lock.unlock();
            System.out.println(Thread.currentThread().getName()+":  釋放鎖");
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }
}

測試類

@SpringBootTest
@RunWith(SpringRunner.class)
public class RedissonLockTest {
 
    @Autowired
    private RedissonLock redissonLock;
 
    @Test
    public void easyLock(){
        //模擬多個10個客戶端
        for (int i=0;i<10;i++) {
            Thread thread = new Thread(new LockRunnable());
            thread.start();
        }
 
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    private class LockRunnable implements Runnable {
        @Override
        public void run() {
            redissonLock.addLock("demo");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            redissonLock.releaseLock("demo");
        }
    }
}

根據執行效果可知,多線程並發獲取所時,當一個線程獲取到鎖,其他線程則獲取不到,並且其內部會不斷嘗試獲
取鎖,當持有鎖的線程將鎖釋放后,其他線程則會繼續去競爭鎖。

源碼分析

lock()源碼分析

當獲取到RLock對象后,調用其內部的lock()執行加鎖操作。根據源碼描述,當線程獲取鎖時,如果沒有獲取到
鎖,則會讓其進入自旋,直到獲取到鎖。 如果獲取到鎖,則會一直保留到調用unLock()手動釋放或根據傳入的
leaseTime時間自動釋放。
當前傳入兩個參數值:鎖超時時間,時間單位。主要用於避免死鎖的出現,假設持有鎖的redis節點宕機,到期后
鎖可以自動釋放。

lock()方法中還會調用lock()的另外一個重載方法,需要傳入三個參數:過期時間、時間單位、是否中斷。

在三個參數的lock()重載方法中,首先會獲取當前線程id,接着調用tryAcquire()方法嘗試獲取鎖,如果返回值為
null,代表獲取到鎖。 如果返回值不是null,則根據當前線程id創建異步任務並放入線程池中,接着進入自旋,在
自旋過程中,嘗試調用tryAcquire()獲取鎖,如果獲取到則退出自旋。否則會不斷的嘗試獲取鎖。

在lock()方法中,最核心的是tryAcquire()。其內部核心實現會調用tryAcquireAsync(),並傳入過期時間、時間單位
和當前線程id,進行鎖的獲取。如果leaseTime不為-1,代表設置了有效時間,接着調用tryAcquireAsync()去獲取
鎖。如果是-1的話,則默認把永不過期改為30秒過期,並且創建異步任務,如果沒有獲取到鎖,則什么都不做。如果獲取到了鎖,則調用scheduleExpirationRenewal()對當前線程id的鎖進行延時。

最終的tryLockInnerAsync()則是獲取鎖的具體實現。可以看到,其內部是基於lua腳本語言完成鎖獲取的。因為獲
取鎖的過程涉及到了多步,為了保證執行過程的原子性,所以使用了lua,最核心的就是要理解這段lua腳本的執行
過程。

對於這款lua腳本來說,KEYS[1]代表需要加鎖的key,ARGV[1]代表鎖的超時時間,ARGV[2]代表鎖的唯一標識。
對於這段lua腳本,簡單來說:
1)檢查鎖key是否被占用了,如果沒有則設置鎖key和唯一標識,初始值為1,並且設置鎖key的過期時間。
2)如果鎖key存在,並且value也匹配,表示是當前線程持有的鎖,那么重入次數加1,並且設置失效時間。
3)返回鎖key的失效時間毫秒數。

unLock()源碼分析

在釋放鎖時,unlock()內部會調用unlockAsync()對當前線程持有的鎖進行釋放。其內部最終會執行unlockInnerAsync()方法完成鎖釋放並返回結果。

在unlockInnerAsync()中仍然是結合lua腳本完成釋放鎖操作。
相關參數:
KEYS[1]:當前鎖key。
KEYS[2]:redis消息的ChannelName,每個鎖對應唯一的一個 channelName。
ARGV[1]:redis消息體,用於標記redis的key已經解鎖,用於通知其他線程申請鎖。
ARGV[2]:鎖超時時間。
ARGV[3]:鎖的唯一標識。

1)判斷鎖key和鎖的唯一標識是否匹配,如果不匹配,表示鎖已經被占用,那么直接返回。

2)如果是當前線程持有鎖,則value值-1,用於重入操作。

3)如果-1后的值大於0,則對鎖設置過期時間。

4)如果-1后的值為0,則刪除鎖key,並發布消息,該鎖已被釋放。用於通知其他線程申請鎖。

鎖續期

對於鎖續期問題,在單點redis實現分布式鎖時已經介紹過了,用於防止業務執行超時或宕機而引起的業務被重復
執行。
根據對lock方法的解析,可以發現,當設置完過期時間后,當前鎖的過期時間就已經被設定了,不會發生改變,鎖
到期后則會被自動釋放,因此在業務執行中,通過lock()方法加鎖會造成隱患。

紅鎖

當在單點redis中實現redis鎖時,一旦redis服務器宕機,則無法進行鎖操作。因此會考慮將redis配置為主從結
構,但在主從結構中,數據復制是異步實現的。假設在主從結構中,master會異步將數據復制到slave中,一旦某
個線程持有了鎖,在還沒有將數據復制到slave時,master宕機。則slave會被提升為master,但被提升為slave的
master中並沒有之前線程的鎖信息,那么其他線程則又可以重新加鎖

redlock算法
redlock是一種基於多節點redis實現分布式鎖的算法,可以有效解決redis單點故障的問題。官方建議搭建五台
redis服務器對redlock算法進行實現。
在redis官網中,對於redlock算法的實現思想也做了詳細的介紹。地址:https://redis.io/topics/distlock。整個實

現過程分為五步:
1)記錄獲取鎖前的當前時間
2)使用相同的key,value獲取所有redis實例中的鎖,並且設置獲取鎖的時間要遠遠小於鎖自動釋放的時間。假設
鎖自動釋放時間是10秒,則獲取時間應在5-50毫秒之間。通過這種方式避免客戶端長時間等待一個已經關閉的實
例,如果一個實例不可用了,則嘗試獲取下一個實例。
3)客戶端通過獲取所有實例的鎖后的時間減去第一步的時間,得到的差值要小於鎖自動釋放時間,避免拿到一個
已經過期的鎖。並且要有超過半數的redis實例成功獲取到鎖,才算最終獲取鎖成功。如果不是超過半數,有可能
出現多個客戶端重復獲取到鎖,導致鎖失效。
4)當已經獲取到鎖,那么它的真正失效時間應該為:過期時間-第三步的差值。
5)如果客戶端獲取鎖失敗,則在所有redis實例中釋放掉鎖。為了保證更高效的獲取鎖,還可以設置重試策略,在
一定時間后重新嘗試獲取鎖,但不能是無休止的,要設置重試次數。

雖然通過redlock能夠更加有效的防止redis單點問題,但是仍然是存在隱患的。假設redis沒有開啟持久化,
clientA獲取鎖后,所有redis故障重啟,則會導致clientA鎖記錄消失,clientB仍然能夠獲取到鎖。這種情況雖然發
生幾率極低,但並不能保證肯定不會發生。

保證的方案就是開始AOF持久化,但是要注意同步的策略,使用每秒同步,如果在一秒內重啟,仍然數據丟失。使
用always又會造成性能急劇下降。

官方推薦使用默認的AOF策略即每秒同步,且在redis停掉后,要在ttl時間后再重啟。 缺點就是ttl時間內redis無法
對外提供服務。

實現

redisson對於紅鎖的實現已經非常完善,通過其內部提供的api既可以完成紅鎖的操作。

@Configuration
public class RedissonRedLockConfig {
 
    public RedissonRedLock initRedissonClient(String lockKey){
 
        Config config1 = new Config();
        config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0);
        RedissonClient redissonClient1 = Redisson.create(config1);
 
        Config config2 = new Config();
        config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0);
        RedissonClient redissonClient2 = Redisson.create(config2);
 
        Config config3 = new Config();
        config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0);
        RedissonClient redissonClient3 = Redisson.create(config3);
 
        Config config4 = new Config();
        config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0);
        RedissonClient redissonClient4 = Redisson.create(config4);
 
        Config config5 = new Config();
        config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0);
        RedissonClient redissonClient5 = Redisson.create(config5);
 
        RLock rLock1 = redissonClient1.getLock(lockKey);
        RLock rLock2 = redissonClient2.getLock(lockKey);
        RLock rLock3 = redissonClient3.getLock(lockKey);
        RLock rLock4 = redissonClient4.getLock(lockKey);
        RLock rLock5 = redissonClient5.getLock(lockKey);
 
        RedissonRedLock redissonRedLock = new 
RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);
 
        return redissonRedLock;
    }
}

測試類

@SpringBootTest
@RunWith(SpringRunner.class)
public class RedLockTest {
 
    @Autowired
    private RedissonRedLockConfig redissonRedLockConfig;
 
    @Test
    public void easyLock(){
        //模擬多個10個客戶端
        for (int i=0;i<10;i++) {
            Thread thread = new Thread(new RedLockTest.RedLockRunnable());
            thread.start();
        }
 
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    private class RedLockRunnable implements Runnable {
        @Override
        public void run() {
            RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");
 
            try {
                boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);
 
                if (lockResult){
                    System.out.println("獲取鎖成功");
                    TimeUnit.SECONDS.sleep(3);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                redissonRedLock.unlock();
                System.out.println("釋放鎖");
            }
        }
    }
}

redissonRedLock加鎖源碼分析

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException 
{
    long newLeaseTime = ‐1;
    if (leaseTime != ‐1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }
 
    long time = System.currentTimeMillis();
    long remainTime = ‐1;
    if (waitTime != ‐1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. 允許加鎖失敗節點個數限制(N‐(N/2+1)),當前假設五個節點,則允許失敗節點數為2
    */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2. 遍歷所有節點執行lua加鎖,用於保證原子性
    */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.對節點嘗試加鎖
        */
        try {
            if (waitTime == ‐1 && leaseTime == ‐1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果拋出這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖所有節點
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 拋出異常表示獲取鎖失敗
            lockAcquired = false;
        }
 
        if (lockAcquired) {
            /**
             *4. 如果獲取到鎖則添加到已獲取鎖集合中
            */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5. 計算已經申請鎖失敗的節點是否已經到達 允許加鎖失敗節點個數限制 (N‐(N/2+1))
             * 如果已經到達, 就認定最終申請鎖失敗,則沒有必要繼續從后面的節點申請了
             * 因為 Redlock 算法要求至少N/2+1 個節點都加鎖成功,才算最終的鎖申請成功
4)消息冪等
             */
            if (locks.size() ‐ acquiredLocks.size() == failedLocksLimit()) {
                break;
            }
 
            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == ‐1 && leaseTime == ‐1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit‐‐;
            }
        }
 
        /**
        * 6.計算從各個節點獲取鎖已經消耗的總時間,如果已經等於最大等待時間,則申請鎖失敗,返回false
        */
        if (remainTime != ‐1) {
            remainTime ‐= System.currentTimeMillis() ‐ time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }
 
    if (leaseTime != ‐1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) 
rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
 
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
 
    /**
     * 7.如果邏輯正常執行完則認為最終申請鎖成功,返回true
    */
    return true;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM