分布式鎖實現的正確打開方式


一、分布式鎖概述

1.1、分布式鎖作用

1)在分布式系統環境下,一個方法在同一時間只能被一個機器的一個線程執行

2)具備高可用、高性能的獲取鎖與釋放鎖

3)具備鎖失效機制,防止死鎖

4)具備非阻塞鎖(沒有獲取到鎖將直接返回獲取鎖失敗)或堵塞鎖特性(根據業務需求考慮)

1.2、分布式鎖應用場景

1)庫存扣減與增加

分布式鎖保證庫存扣減不會超賣,庫存增加不會造成庫存數據不准確

2)積分抵現

防止積分扣減出現溢出的情況

3)會員禮品核銷

防止禮品核銷多次

1.3、實現方式

1)使用Redis,基於setnx命令或其他。

2)使用ZooKeeper,基於臨時有序節點。

3)使用MySQL,基於唯一索引

二、基於Zookeeper實現分布式鎖

2.1、Zookeeper特性介紹

1)有序節點

假如當前有一個父節點為/lock,我們可以在這個父節點下面創建子節點;zookeeper提供了一個可選的有序特性,例如我們可以創建子節點“/lock/node-”並且指明有序,那么zookeeper在生成子節點時會根據當前的子節點數量自動添加整數序號,也就是說如果是第一個創建的子節點,那么生成的子節點為/lock/node-0000000000,下一個節點則為/lock/node-0000000001,依次類推。

2)臨時節點

客戶端可以建立一個臨時節點,在會話結束或者會話超時后,zookeeper會自動刪除該節點。

3)事件監聽

在讀取數據時,我們可以同時對節點設置事件監聽,當節點數據或結構變化時,zookeeper會通知客戶端。當前zookeeper有如下四種事件:節點創建、節點刪除、節點數據修改、子節點變更

2.2、Zookeeper分布式鎖實現(方式一)

2.2.1、實現原理

1)客戶端連接zookeeper,並在父節點(/lock)下創建臨時的且有序的子節點,第一個客戶端對應的子節點為/lock/lock-1,第二個為/lock/lock-2,以此類推。
2)客戶端獲取/lock下的子節點列表,判斷自己創建的子節點是否為當前子節點列表中序號最小的子節點,如果是則認為獲得鎖,否則監聽/lock的子節點變更消息,獲得子節點變更通知后重復此步驟直至獲得鎖
3)執行業務代碼;
4)完成業務流程后,刪除對應的子節點釋放鎖。

2.2.2、實現代碼

1.基於curator的zookeeper分布式鎖實現

public static void main(String[] args) throws Exception {
        //創建zookeeper的客戶端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);

        client.start();

        //創建分布式鎖, 鎖空間的根節點路徑為/curator/lock
        InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");

        mutex.acquire();

        //獲得了鎖, 進行業務流程
        System.out.println("Enter mutex");

        //完成業務流程, 釋放鎖
        mutex.release();

        //關閉客戶端
        client.close();

    }

2.實現方式二

1)定義變量

/**
    * Zookeeper客戶端
    */
private ZooKeeper zookeeper;

/**
    * 鎖的唯一標識
    */
private String lockId;

/**
    * 與Zookeeper建立會話的信號量
    */
private CountDownLatch connectedLatch;

/**
    * 創建分布式鎖的過程中,開始和等待請求創建分布式鎖的信號標志
    */
private CountDownLatch creatingLatch;

/**
    * 分布式鎖路徑前綴
    */
private String locksRootPath = "/locks";

/**
    * 排在當前節點前面一位的節點的路徑
    */
private String waitNodeLockPath;

/**
    * 為了獲得鎖,本次創建的節點的路徑
    */
private String currentNodeLockPath;

2)構造函數

public ZookeeperTempOrderLock(String lockId) {
    this.lockId = lockId;
    try {
        // 會話超時時間
        int sessionTimeout = 30000;
        //
        zookeeper = new ZooKeeper("192.168.0.93:2181", sessionTimeout, this);
        connectedLatch.await();
    } catch (IOException ioe) {
        log.error("與Zookeeper建立連接時出現異常", ioe);
    } catch (InterruptedException ite) {
        log.error("等待與Zookeeper會話建立完成時出現異常", ite);
    }
}

3)實現Zookeeper的watcher

@Override
public void process(WatchedEvent event) {
    if (Event.KeeperState.SyncConnected == event.getState()) {
        connectedLatch.countDown();
    }

    if (creatingLatch != null) {
        creatingLatch.countDown();
    }
}

4)獲取分布式鎖

/**
    * 獲取鎖
    */
public void acquireDistributedLock() {
    try {
        while(!tryLock()) {
            // 等待前一項服務釋放鎖的等待時間 不能超過一次Zookeeper會話的時間
            long waitForPreviousLockRelease = 30000;
            waitForLock(waitNodeLockPath, waitForPreviousLockRelease);
        }
    } catch (InterruptedException | KeeperException e) {
        log.error("等待上鎖的過程中出現異常", e);
    }
}

public boolean tryLock() {
    try {
        // 創建順序臨時節點
        currentNodeLockPath = zookeeper.create(locksRootPath + "/" + lockId,
                "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 查看剛剛創建的節點是不是最小節點
        // 比如針對於這個同名節點,之前有其它服務曾申請創建過,因此Zookeeper中臨時順序節點形如:
        // /locks/10000000000, /locks/10000000001, /locks/10000000002
        List<String> nodePaths = zookeeper.getChildren(locksRootPath, false);
        Collections.sort(nodePaths);
        if(currentNodeLockPath.equals(locksRootPath + "/" + nodePaths.get(0))) {
            // 如果是最小節點,則代表獲取到鎖
            return true;
        }
        // 如果不是最小節點,則找到比自己小1的節點 (緊挨着自己)
        int previousLockNodeIndex = -1;
        for (int i = 0; i < nodePaths.size(); i++) {
            if(currentNodeLockPath.equals(locksRootPath + "/" + nodePaths.get(i))) {
                previousLockNodeIndex = i-1;
                break;
            }
        }
        this.waitNodeLockPath = nodePaths.get(previousLockNodeIndex);

    } catch (KeeperException | InterruptedException e) {
        log.error("創建臨時順序節點失敗", e);
    }
    return false;
}

6)等待其他服務釋放鎖

/**
    * 等待其他服務釋放鎖
    * 實際上就是在等待前一個臨時節點被刪除
    *
    * @param nodePath 希望被刪除的節點的相對路徑
    * @param waitTime 等待時長 單位:毫秒
    */
private boolean waitForLock(String nodePath, long waitTime) throws KeeperException, InterruptedException {
    Stat stat = zookeeper.exists(locksRootPath + "/" + nodePath, true);
    if (stat != null) {
        this.creatingLatch = new CountDownLatch(1);
        this.creatingLatch.await(waitTime, TimeUnit.MILLISECONDS);
        this.creatingLatch = null;
    }
    return true;
}

7)釋放分布式鎖

/**
    * 釋放鎖
    * 實際上就是刪除當前創建的臨時節點
    */
public void releaseLock() {
    log.info("准備刪除的節點路徑: " + currentNodeLockPath);
    try {
        zookeeper.delete(currentNodeLockPath, -1);
        currentNodeLockPath = null;
        zookeeper.close();
    } catch (Exception e) {
        log.error("刪除節點失敗", e);
    }
}

2.3、Zookeeper分布式鎖實現(方式二)

2.3.1、實現原理

假設有兩個服務A、B希望獲得同一把鎖,執行過程大致如下:

1)服務A向zookeeper申請獲得鎖,該請求將嘗試在zookeeper內創建一個臨時節點(ephemeral znode),如果沒有同名的臨時節點存在,則znode創建成功,標志着服務A成功的獲得了鎖。

2) 服務B向zookeeper申請獲得鎖,同樣嘗試在zookeeper內創建一個臨時節點(名稱必須與服務A的相同),由於同名znode已經存在,因此請求被拒絕。接着,服務B會在zk中注冊一個監聽器,用於監聽臨時節點被刪除的事件

3) 若服務A主動向zk發起請求釋放鎖,或者服務A宕機、斷開與zk的網絡連接,zk會將服務A(創建者)創建的臨時節點刪除。而刪除事件也將立刻被監聽器捕獲到,並反饋給服務B。最后,服務B再次向zookeeper申請獲得鎖。

2.3.2、實現代碼

基於臨時節點實現Zookeeper分布式鎖

多個服務如果想競爭同一把鎖,那就向Zookeeper發起創建臨時節點的請求,若能成功創建則獲得鎖,否則借助監聽器,當監聽到鎖被其它服務釋放(臨時節點被刪除),則自己再請求創建臨時節點,反復這幾個步驟直到成功創建臨時節點或者與zookeeper建立的會話超時。

步驟:

1)定義變量

    /**
     * 與Zookeeper成功建立連接的信號標志
     */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 創建分布式鎖的過程中,開始和等待請求創建分布式鎖的信號標志
     */
    private CountDownLatch creatingSemaphore;

    /**
     * Zookeeper客戶端
     */
    private ZooKeeper zookeeper;

    /**
     * 分布式鎖的過期時間 單位:毫秒
     */
    private static final Long DISTRIBUTED_KEY_OVERDUE_TIME = 30000L;

2)構造函數

public ZookeeperLock() {
    try {
        this.zookeeper = new ZooKeeper("192.168.0.93:2181", 5000, new ZookeeperWatcher());
        try {
            connectedSemaphore.await();
        } catch (InterruptedException ite) {
            log.error("等待Zookeeper成功建立連接的過程中,線程拋出異常", ite);
        }
        log.info("與Zookeeper成功建立連接");
    } catch (Exception e) {
        log.error("與Zookeeper建立連接時出現異常", e);
    }
}

3)獲取分布式鎖

實際上就是在嘗試創建臨時節點znode
create(final String path, byte data[], List acl,CreateMode createMod)
path: 從根節點"/"到當前節點的全路徑
data: 當前節點存儲的數據 (由於這里只是借助臨時節點的創建來實現分布式鎖,因此無需存儲數據)
acl: Access Control list 訪問控制列表 主要涵蓋權限模式(Scheme)、授權對象(ID)、授予的權限(Permission)這三個方面 OPEN_ACL_UNSAFE 完全開放的訪問控制 對當前節點進行操作時,無需考慮ACL權限控制
createMode: 節點創建的模式
EPHEMERAL(臨時節點) 當創建節點的客戶端與zk斷開連接后,臨時節點將被刪除
EPHEMERAL_SEQUENTIAL(臨時順序節點)
PERSISTENT(持久節點)
PERSISTENT_SEQUENTIAL(持久順序節點)

public boolean acquireDistributeLock(Long lockId) {
    String path = "/product-lock-" + lockId;

    try {
        zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        log.info("ThreadId=" + Thread.currentThread().getId() + "創建臨時節點成功");
        return true;
    } catch (Exception e) {
        // 若臨時節點已存在,則會拋出異常: NodeExistsException
        while (true) {
            // 相當於給znode注冊了一個監聽器,查看監聽器是否存在
            try {
                Stat stat = zookeeper.exists(path, true);
                if (stat != null) {
                    this.creatingSemaphore = new CountDownLatch(1);
                    this.creatingSemaphore.await(DISTRIBUTED_KEY_OVERDUE_TIME, TimeUnit.MILLISECONDS);
                    this.creatingSemaphore = null;
                }
                zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                return true;
            } catch (Exception ex) {
                log.error("ThreadId=" + Thread.currentThread().getId() + ",查看臨時節點時出現異常", ex);
            }
        }
    }
}

4)釋放分布式鎖

public void releaseDistributedLock(Long lockId) {
    String path = "/product-lock-" + lockId;
    try {
        // 第二個參數version是數據版本 每次znode內數據發生變化,都會使version自增,但由於分布式鎖創建的臨時znode沒有存數據,因此version=-1
        zookeeper.delete(path, -1);
        log.info("成功釋放分布式鎖, lockId=" + lockId + ", ThreadId=" + Thread.currentThread().getId());
    } catch (Exception e) {
        log.error("釋放分布式鎖失敗,lockId=" + lockId, e);
    }
}

5)建立Zookeeper的watcher

不論是zk客戶端與服務器連接成功,還是刪除節點,watcher監聽到的事件都是SyncConnected

private class ZookeeperWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        log.info("接收到事件: " + event.getState() + ", ThreadId=" + Thread.currentThread().getId());

        if (Event.KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }

        if (creatingSemaphore != null) {
            creatingSemaphore.countDown();
        }
    }
}

6)main方式運用

創建了兩個線程,其中第一個線程先執行,且持有鎖5秒鍾才釋放鎖,第二個線程后執行,當且僅當第一個線程釋放鎖(刪除臨時節點)后,第二個線程才能成功獲取鎖。

public static void main(String[] args) throws InterruptedException{
    long lockId = 20200730;

    new Thread(() ->{
        ZookeeperLock zookeeperLock = new ZookeeperLock();
        System.out.println("ThreadId1=" + Thread.currentThread().getId());
        System.out.println("ThreadId=" + Thread.currentThread().getId() + "獲取到分布式鎖: " + zookeeperLock.acquireDistributeLock(lockId));
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            log.error("ThreadId=" + Thread.currentThread().getId() + "暫停時出現異常", e);
        }
        zookeeperLock.releaseDistributedLock(lockId);
    }).start();

    TimeUnit.SECONDS.sleep(1);
    new Thread(() -> {
        ZookeeperLock zookeeperLock = new ZookeeperLock();
        System.out.println("ThreadId2=" + Thread.currentThread().getId());
        System.out.println("ThreadId=" + Thread.currentThread().getId() + "獲取到分布式鎖: " + zookeeperLock.acquireDistributeLock(lockId));
    }).start();
}

三、基於Redis實現分布式鎖

3.1、普通常見實現方式

3.1.1、實現代碼

public String deductStock() {
    String lockKey = "product_001";
    try {
       /*Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa"); //jedis.setnx
        stringRedisTemplate.expire(lockKey, 30, TimeUnit.SECONDS); //設置超時*/
        //為解決原子性問題將設置鎖和設置超時時間合並
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa", 10, TimeUnit.SECONDS);

        //未設置成功,當前key已經存在了,直接返回錯誤
        if (!result) {
            return "error_code";
        }
        //業務邏輯實現,扣減庫存
        ....
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        stringRedisTemplate.delete(lockKey);
    }
    return "end";
}

3.2.2、問題分析

上述代碼可以看到,當前鎖的失效時間為10s,如果當前扣減庫存的業務邏輯執行需要15s時,高並發時會出現問題:

  • 線程1,首先執行到10s后,鎖(product_001)失效
  • 線程2,在第10s后同樣進入當前方法,此時加上鎖(product_001)
  • 當執行到15s時,線程1刪除線程2加的鎖(product_001)
  • 線程3,可以加鎖 .... 如此循環,實際鎖已經沒有意義

3.2.3、解決方案

定義一個子線程,定時去查看是否存在主線程的持有當前鎖,如果存在則為其延長過期時間

3.2、基於Redission實現方式

3.2.1、Redission簡介

Jedis是Redis的Java實現的客戶端,其API提供了比較全面的Redis命令的支持。Redission也是Redis的客戶端,相比於Jedis功能簡單。Jedis簡單使用阻塞的I/O和redis交互,Redission通過Netty支持非阻塞I/O

Redission封裝了鎖的實現,其繼承了java.util.concurrent.locks.Lock的接口,讓我們像操作我們的本地Lock一樣去操作Redission的Lock。

常用API:

RLock redissonLock = redission.getLock();
redissionLock.lock(30,TmieUnit.SECONDS);加鎖並設置鎖的存活時間
redissionLock.unLock();解鎖

3.2.2、實現原理

  • 多個線程去執行lock操作,僅有一個線程能夠加鎖成功,其它線程循環阻塞。
  • 加鎖成功,鎖超時時間默認30s,並開啟后台線程(子線程),加鎖的后台會每隔10秒去檢測線程持有的鎖是否存在,還在的話,就延遲鎖超時時間,重新設置為30s,即鎖延期
  • 對於原子性,Redis分布式鎖底層借助Lua腳本實現鎖的原子性。鎖延期是通過在底層用Lua進行延時,延時檢測時間是對超時時間timeout /3。

1)簡單實現代碼:

public String deductStockRedission() {
    String lockKey = "product_001";
    RLock rlock = redission.getLock(lockKey);
    try {
        rlock.lock();

        //業務邏輯實現,扣減庫存
        ....
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rlock.unlock();
    }
    return "end";
}

2)分析Redission適用原因:

1)redisson所有指令都通過lua腳本執行,redis支持lua腳本原子性執行

2)redisson設置一個key的默認過期時間為30s,如果某個客戶端持有一個鎖超過了30s怎么辦?

redisson中有一個watchdog的概念,翻譯過來就是看門狗,它會在你獲取鎖之后,每隔10秒幫你把key的超時時間設為30s

這樣的話,就算一直持有鎖也不會出現key過期了,其他線程獲取到鎖的問題了。保證了沒有死鎖發生

3)Redisson的可重入鎖

Redis存儲鎖的數據類型是 Hash類型

Hash數據類型的key值包含了當前線程信息

3.2.3、問題分析及對應方案

1)主從同步問題

問題分析:

 當主Redis加鎖了,開始執行線程,若還未將鎖通過異步同步的方式同步到從Redis節點,主節點就掛了,此時會把某一台從節點作為新的主節點,此時別的線程就可以加鎖了,這樣就出錯了,怎么辦?

解決方案:

​ 1)采用zookeeper代替Redis

  由於zk集群的特點,其支持的是CP。而Redis集群支持的則是AP。

​ 2)采用RedLock

假設有3個redis節點,這些節點之間既沒有主從,也沒有集群關系。客戶端用相同的key和隨機值在3個節點上請求鎖,請求鎖的超時時間應小於鎖自動釋放時間。當在2個(超過半數)redis上請求到鎖的時候,才算是真正獲取到了鎖。如果沒有獲取到鎖,則把部分已鎖的redis釋放掉。

public String deductStockRedlock() {
    String lockKey = "product_001";
    //TODO 這里需要自己實例化不同redis實例的redission客戶端連接,這里只是偽代碼用一個redisson客戶端簡化了
    RLock rLock1 = redisson.getLock(lockKey);
    RLock rLock2 = redisson.getLock(lockKey);
    RLock rLock3 = redisson.getLock(lockKey);

    // 向3個redis實例嘗試加鎖
    RedissonRedLock redLock = new RedissionRedLock(rLock1, rLock2, rLock3);
    boolean isLock;
    try {
        // 500ms拿不到鎖, 就認為獲取鎖失敗。10000ms即10s是鎖失效時間。
        isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
        System.out.println("isLock = " + isLock);
        if (isLock) {
            //業務邏輯處理
            ...
        }
    } catch (Exception e) {

    } finally {
        // 無論如何, 最后都要解鎖
        redLock.unlock();
    }
}

不太推薦使用。如果考慮高可用並發推薦使用Redisson,考慮一致性推薦使用zookeeper。

2)提高並發:分段鎖

由於Redission實際上就是將並行的請求,轉化為串行請求。這樣就降低了並發的響應速度,為了解決這一問題,可以將鎖進行分段處理:例如秒殺商品001,原本存在1000個商品,可以將其分為20段,為每段分配50個商品。

比如:

​ 將庫存進行分段,放入redis中,例如1000庫存,可分10段放入Redis

​ key的設計可以為Product:10001:0 | Product:10001:1 ....

​ Redis底層集群,將根據key,計算器槽位,放入不同節點中

參考文章:

https://blog.csdn.net/miaomiao19971215/article/details/107564197

https://www.cnblogs.com/bbgs-xc/p/14412646.html#_label1_0

https://www.cnblogs.com/wei57960/p/14059772.html

https://www.cnblogs.com/jay-huaxiao/p/14503018.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM