基於Zookeeper實現分布式鎖


為什么需要分布式鎖

  鎖是多線程代碼中的概念,只有當多任務訪問同一個互斥共享資源時才需要。如下圖:

        

  在我們進行單機應用開發,涉及並發同步的時候,我們往往采用synchronized或者Lock的方式來解決多線程間的代碼同步問題,這時多線程的運行都是在同一個JVM之下。但當我們的應用是分布式集群工作的情況下,屬於多JVM下的工作環境,JVM之間已經無法通過多線程的鎖解決同步問題。那么就需要一種更加高級的鎖機制,來處理種跨機器的進程之間的數據同步問題——這就是分布式鎖。

  如下例:攜程、美團、飛豬、去哪兒四個購票網站實際上都沒有最終售票權,只有12306鐵道總局有火車票,那么四個購票網站都需要購買火車票,那么四個網站必須排隊進行同步,否則不同步會造成多售(類似同一進程中多線程間不同步也會造成多售)。這時他們就需要有一個公共的鎖管理方案,來保證APP間的購票是同步的。要想購票:

  1、首先服務獲取分布式

  2、服務獲取分布式鎖后,才能去調用12306進行購票。

  3、購票成功后,釋放分布式鎖,這樣其余APP才能獲取鎖並進行12306購票。

           

為什么要使用zookeeper

  1、基於mysql實現分布式鎖

  基於分布式鎖的實現,首先肯定是想單獨分離出一台mysql數據庫,所有服務要想操作文件(共享資源),那么必須先在mysql數據庫中插入一個標志,插入標志的服務就持有了鎖,並對文件進行操作,操作完成后,主動刪除標志進行鎖釋放,其與服務會一直查詢數據庫,看是否標志有被占用,直到沒有標志占用時自己才能寫入標志獲取鎖。

  但是這樣有這么一個問題,如果服務(jvm1)宕機或者卡頓了,會一直持有鎖未釋放,這樣就造成了死鎖,因此就需要有一個監視鎖進程時刻監視鎖的狀態,如果超過一定時間未釋放就要進行主動清理鎖標記,然后供其與服務繼續獲取鎖。

  如果監視鎖字段進程和jvm1同時掛掉,依舊不能解決死鎖問題,於是又增加一個監視鎖字段進程,這樣一個進程掛掉,還有另一個監視鎖字段進程可以對鎖進行管理。這樣又誕生一個新的問題,兩個監視進程必須進行同步,否則對於過期的情況管理存在不一致問題。

  因此存在以下問題,並且方案變得很復雜:

  1、監視鎖字段進程對於鎖的監視時間周期過短,仍舊會造成多售(jvm1還沒處理完其持有的鎖就被主動銷毀,造成多個服務同時持有鎖進行操作)。

  2、監視鎖字段進程對於鎖的監視時間周期過長,會造成整個服務卡頓過長,吞吐低下。

  3、監視鎖字段進程間的同步問題。

  4、當一個jvm持有鎖的時候,其余服務會一直訪問數據庫查看鎖,會造成其余jvm的資源浪費。

         

  2、基於Redis實現分布式鎖

  相比較於基於數據庫實現分布式鎖的方案來說,基於緩存來實現在性能方面會表現的更好一點,Redis就是其中一種。由於Redis可以設置字段的有效期,因此可以實現自動釋放超期的鎖,不需要多個監視鎖字段進程進行鎖守護,可以依舊存在上述mysql實現中除了3以外1、2、4中的問題。  

           

  3、基於Zookeeper實現分布式鎖

  基於以上兩種實現方式,有了基於zookeeper實現分布式鎖的方案。由於zookeeper有以下特點:

  1️⃣維護了一個有層次的數據節點,類似文件系統。

  2️⃣有以下數據節點:臨時節點、持久節點、臨時有序節點(分布式鎖實現基於的數據節點)、持久有序節點。

  3️⃣zookeeper可以和client客戶端通過心跳的機制保持長連接,如果客戶端鏈接zookeeper創建了一個臨時節點,那么這個客戶端與zookeeper斷開連接后會自動刪除。

  4️⃣zookeeper的節點上可以注冊上用戶事件(自定義),如果節點數據刪除等事件都可以觸發自定義事件。

  5️⃣zookeeper保持了統一視圖,各服務對於狀態信息獲取滿足一致性。

  Zookeeper的每一個節點,都是一個天然的順序發號器。

  在每一個節點下面創建子節點時,只要選擇的創建類型是有序(EPHEMERAL_SEQUENTIAL 臨時有序或者PERSISTENT_SEQUENTIAL 永久有序)類型,那么,新的子節點后面,會加上一個次序編號。這個次序編號,是上一個生成的次序編號加一

  比如,創建一個用於發號的節點“/test/lock”,然后以他為父親節點,可以在這個父節點下面創建相同前綴的子節點,假定相同的前綴為“/test/lock/seq-”,在創建子節點時,同時指明是有序類型。如果是第一個創建的子節點,那么生成的子節點為/test/lock/seq-0000000000,下一個節點則為/test/lock/seq-0000000001,依次類推,等等。
          

如何使用zookeeper實現分布式鎖

  大致思想為:每個客戶端對某個方法加鎖時,在 Zookeeper 上與該方法對應的指定節點的目錄下,生成一個唯一的臨時有序節點。 判斷是否獲取鎖的方式很簡單,只需要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個臨時節點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產生的死鎖問題。

  1、排它鎖

  排他鎖,又稱寫鎖或獨占鎖。如果事務T1對數據對象O1加上了排他鎖,那么在整個加鎖期間,只允許事務T1對O1進行讀取或更新操作,其他任務事務都不能對這個數據對象進行任何操作,直到T1釋放了排他鎖。

  排他鎖核心是保證當前有且僅有一個事務獲得鎖,並且鎖釋放之后,所有正在等待獲取鎖的事務都能夠被通知到。

  Zookeeper 的強一致性特性,能夠很好地保證在分布式高並發情況下節點的創建一定能夠保證全局唯一性,即Zookeeper將會保證客戶端無法重復創建一個已經存在的數據節點。可以利用Zookeeper這個特性,實現排他鎖。

  1️⃣定義鎖:通過Zookeeper上的數據節點來表示一個鎖
  2️⃣獲取鎖:客戶端通過調用 create 方法創建表示鎖的臨時節點,可以認為創建成功的客戶端獲得了鎖,同時可以讓沒有獲得鎖的節點在該節點上注冊Watcher監聽,以便實時監聽到lock節點的變更情況
  3️⃣釋放鎖:以下兩種情況都可以讓鎖釋放
    當前獲得鎖的客戶端發生宕機或異常,那么Zookeeper上這個臨時節點就會被刪除
    正常執行完業務邏輯,客戶端主動刪除自己創建的臨時節點
  基於Zookeeper實現排他鎖流程:

          

  2、共享鎖

  共享鎖,又稱讀鎖。如果事務T1對數據對象O1加上了共享鎖,那么當前事務只能對O1進行讀取操作,其他事務也只能對這個數據對象加共享鎖,直到該數據對象上的所有共享鎖都被釋放。

  共享鎖與排他鎖的區別在於,加了排他鎖之后,數據對象只對當前事務可見,而加了共享鎖之后,數據對象對所有事務都可見。

  1️⃣定義鎖:通過Zookeeper上的數據節點來表示一個鎖,是一個類似於 /lockpath/[hostname]-請求類型-序號 的臨時順序節點
  2️⃣獲取鎖:客戶端通過調用 create 方法創建表示鎖的臨時順序節點,如果是讀請求,則創建 /lockpath/[hostname]-R-序號 節點,如果是寫請求則創建 /lockpath/[hostname]-W-序號節點
  3️⃣判斷讀寫順序:大概分為4個步驟
  1)創建完節點后,獲取 /lockpath 節點下的所有子節點,並對該節點注冊子節點變更的Watcher監聽
  2)確定自己的節點序號在所有子節點中的順序
    3.1)對於讀請求:1. 如果沒有比自己序號更小的子節點,或者比自己序號小的子節點都是讀請求,那么表明自己已經成功獲取到了共享鎖,同時開始執行讀取邏輯 2. 如果有比自己序號小的子節點有寫請求,那么等待
    3.2)對於寫請求,如果自己不是序號最小的節點,那么等待
  4)接收到Watcher通知后,重復步驟1)
  4️⃣釋放鎖:與排他鎖邏輯一致

             

   基於Zookeeper實現共享鎖流程:

            

  3、羊群效應

  在實現共享鎖的 "判斷讀寫順序" 的第1個步驟是:創建完節點后,獲取 /lockpath 節點下的所有子節點,並對該節點注冊子節點變更的Watcher監聽。這樣的話,任何一次客戶端移除共享鎖之后,Zookeeper將會發送子節點變更的Watcher通知給所有機器,系統中將有大量的 "Watcher通知" 和 "子節點列表獲取" 這個操作重復執行,然后所有節點再判斷自己是否是序號最小的節點(寫請求)或者判斷比自己序號小的子節點是否都是讀請求(讀請求),從而繼續等待下一次通知。

  然而,這些重復操作很多都是 "無用的",實際上每個鎖競爭者只需要關注序號比自己小的那個節點是否存在即可。

  當集群規模比較大時,這些 "無用的" 操作不僅會對Zookeeper造成巨大的性能影響和網絡沖擊,更為嚴重的是,如果同一時間有多個客戶端釋放了共享鎖,Zookeeper服務器就會在短時間內向其余客戶端發送大量的事件通知--這就是所謂的 "羊群效應"。

  改進后的分布式鎖實現:

  1️⃣客戶端調用 create 方法創建一個類似於 /lockpath/[hostname]-請求類型-序號 的臨時順序節點。

  2️⃣客戶端調用 getChildren 方法獲取所有已經創建的子節點列表(這里不注冊任何Watcher)。

  3️⃣如果無法獲取任何共享鎖,那么調用 exist 來對比自己小的那個節點注冊Watcher
    讀請求:向比自己序號小的最后一個寫請求節點注冊Watcher監聽
    寫請求:向比自己序號小的最后一個節點注冊Watcher監聽
  4️⃣等待Watcher監聽,繼續進入步驟2️⃣
  Zookeeper羊群效應改進前后Watcher監聽圖:

            

zookeeper分布式鎖示例代碼

  分布式鎖實現類(鎖初始化、創建、獲取、等待、釋放):

/**
 * 基於zookeeper的分布式鎖
 */
public class DistributedLock implements Lock, Watcher {
    private ZooKeeper zk = null;
    // 根節點
    private String ROOT_LOCK = "/lock_msb";
    // 競爭的資源
    private String lockName;
    // 等待的前一個鎖
    private String WAIT_LOCK;
    // 當前鎖
    private String CURRENT_LOCK;
    // 計數器
    private CountDownLatch countDownLatch;
    private int sessionTimeout = 3000000;
    private List<Exception> exceptionList = new ArrayList<Exception>();

    /**
     * 配置分布式鎖
     * @param config 連接的url
     * @param lockName 競爭資源
     */
    public DistributedLock(String config, String lockName) {
        this.lockName = lockName;
        try {
            // 連接zookeeper
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(ROOT_LOCK, false);
            if (stat == null) {
                // 如果根節點不存在,則創建根節點
                zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    // 節點監視器
    public void process(WatchedEvent event) {
        if (this.countDownLatch != null) {
            this.countDownLatch.countDown();
        }
    }

    public void lock() {
        if (exceptionList.size() > 0) {
            throw new LockException(exceptionList.get(0));
        }
        try {
            if (this.tryLock()) {
                System.out.println(Thread.currentThread().getName() + " " + lockName + "獲得了鎖");
                return;
            } else {
                // 等待鎖
                waitForLock(WAIT_LOCK, sessionTimeout);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if (lockName.contains(splitStr)) {
                throw new LockException("鎖名有誤");
            }
            // 創建臨時有序節點
            CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(CURRENT_LOCK + " 已經創建");
            // 取所有子節點
            List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
            // 取出所有lockName的鎖
            List<String> lockObjects = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if (_node.equals(lockName)) {
                    lockObjects.add(node);
                }
            }
            Collections.sort(lockObjects);
            System.out.println(Thread.currentThread().getName() + " 的鎖是 " + CURRENT_LOCK);
            // 若當前節點為最小節點,則獲取鎖成功
            if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
                return true;
            }

            // 若不是最小節點,則找到自己的前一個節點
            String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
            WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(WAIT_LOCK, timeout);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    // 等待鎖
    private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
        Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);

        if (stat != null) {
            System.out.println(Thread.currentThread().getName() + "等待鎖 " + ROOT_LOCK + "/" + prev);
            this.countDownLatch = new CountDownLatch(1);
            // 計數等待,若等到前一個節點消失,則precess中進行countDown,停止等待,獲取鎖
            this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
            this.countDownLatch = null;
            System.out.println(Thread.currentThread().getName() + " 等到了鎖");
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println("釋放鎖 " + CURRENT_LOCK);
            zk.delete(CURRENT_LOCK, -1);
            CURRENT_LOCK = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public Condition newCondition() {
        return null;
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }


    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

  測試類:

public class Test {
    //100張票
    private Integer n = 100;
//    private Lock lock = new ReentrantLock();

    public void printInfo() {
        System.out.println(Thread.currentThread().getName() +
                "正在運行,剩余余票:" + --n);
    }

    public class TicketThread implements Runnable {
        public void run() {
            Lock lock = new DistributedLock("192.168.150.111:2181,192.168.150.112:2181,192.168.150.113:2181", "zk");
            lock.lock();
            try {
                if (n > 0) {
                    printInfo();
                }
                }finally{
                    lock.unlock();
                }
            }
    }

    public void ticketStart() {
        TicketThread thread = new TicketThread();
        for (int i = 0; i < 30; i++) {
            Thread t = new Thread(thread, "mem" + i);
            t.start();
        }
    }

    public static void main(String[] args) {
        new Test().ticketStart();
    }
}

基於Curator客戶端實現分布式鎖

  Apache Curator是一個Zookeeper的開源客戶端,它提供了Zookeeper各種應用場景(Recipe,如共享鎖服務、master選舉、分布式計數器等)的抽象封裝,接下來將利用Curator提供的類來實現分布式鎖。

  Curator提供的跟分布式鎖相關的類有5個,分別是:

Shared Reentrant Lock 可重入鎖
Shared Lock 共享不可重入鎖
Shared Reentrant Read Write Lock 可重入讀寫鎖
Shared Semaphore 信號量
Multi Shared Lock 多鎖

  關於錯誤處理:還是強烈推薦使用ConnectionStateListener處理連接狀態的改變。當連接LOST時你不再擁有鎖。

  可重入鎖

  Shared Reentrant Lock,全局可重入鎖,所有客戶端都可以請求,同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。它是由類 InterProcessMutex 來實現,它的主要方法:

// 構造方法
public InterProcessMutex(CuratorFramework client, String path)
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
// 通過acquire獲得鎖,並提供超時機制:
public void acquire() throws Exception
public boolean acquire(long time, TimeUnit unit) throws Exception
// 撤銷鎖
public void makeRevocable(RevocationListener<InterProcessMutex> listener)
public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)

  定義一個 FakeLimitedResource 類來模擬一個共享資源,該資源一次只能被一個線程使用,直到使用結束,下一個線程才能使用,否則會拋出異常。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    // 模擬只能單線程操作的資源
    public void use() throws InterruptedException {
        if (!inUse.compareAndSet(false, true)) {
            // 在正確使用鎖的情況下,此異常不可能拋出
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (100 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

  下面的代碼將創建 N 個線程來模擬分布式系統中的節點,系統將通過 InterProcessMutex 來控制對資源的同步使用;每個節點都將發起10次請求,完成 請求鎖--訪問資源--再次請求鎖--釋放鎖--釋放鎖 的過程;客戶端通過 acquire 請求鎖,通過 release 釋放鎖,獲得幾把鎖就要釋放幾把鎖;這個共享資源一次只能被一個線程使用,如果控制同步失敗,將拋異常。

public class SharedReentrantLockTest {
    private static final String lockPath = "/testZK/sharedreentrantlock";
    private static final Integer clientNums = 5;
    final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的資源
    private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < clientNums; i++) {
            String clientName = "client#" + i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework client = ZKUtils.getClient();
                    client.start();
                    Random random = new Random();
                    try {
                        final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                        // 每個客戶端請求10次共享資源
                        for (int j = 0; j < 10; j++) {
                            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                throw new IllegalStateException(j + ". " + clientName + " 不能得到互斥鎖");
                            }
                            try {
                                System.out.println(j + ". " + clientName + " 已獲取到互斥鎖");
                                resource.use(); // 使用資源
                                if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                    throw new IllegalStateException(j + ". " + clientName + " 不能再次得到互斥鎖");
                                }
                                System.out.println(j + ". " + clientName + " 已再次獲取到互斥鎖");
                                lock.release(); // 申請幾次鎖就要釋放幾次鎖
                            } finally {
                                System.out.println(j + ". " + clientName + " 釋放互斥鎖");
                                lock.release(); // 總是在finally中釋放
                            }
                            Thread.sleep(random.nextInt(100));
                        }
                    } catch (Throwable e) {
                        System.out.println(e.getMessage());
                    } finally {
                        CloseableUtils.closeQuietly(client);
                        System.out.println(clientName + " 客戶端關閉!");
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        System.out.println("結束!");
    }
}

  控制台打印日志,可以看到對資源的同步訪問控制成功,並且鎖是可重入的:

0. client#3 已獲取到互斥鎖
0. client#3 已再次獲取到互斥鎖
0. client#3 釋放互斥鎖
... ...
0. client#4 已獲取到互斥鎖
0. client#4 已再次獲取到互斥鎖
0. client#4 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 已再次獲取到互斥鎖
1. client#1 釋放互斥鎖
2. client#1 已獲取到互斥鎖
2. client#1 已再次獲取到互斥鎖
2. client#1 釋放互斥鎖
1. client#4 已獲取到互斥鎖
1. client#4 已再次獲取到互斥鎖
1. client#4 釋放互斥鎖
... ...
client#0 客戶端關閉!
8. client#4 已獲取到互斥鎖
8. client#4 已再次獲取到互斥鎖
8. client#4 釋放互斥鎖
9. client#4 已獲取到互斥鎖
9. client#4 已再次獲取到互斥鎖
9. client#4 釋放互斥鎖
client#3 客戶端關閉!
client#4 客戶端關閉!
結束!

  同時在程序運行期間查看Zookeeper節點樹,可以發現每一次請求的鎖實際上對應一個臨時順序節點。

[zk: localhost:2181(CONNECTED) 42] ls /testZK/sharedreentrantlock
[leases, _c_208d461b-716d-43ea-ac94-1d2be1206db3-lock-0000001659, locks, 
  _c_64b19dba-3efa-46a6-9344-19a52e9e424f-lock-0000001658, _c_cee02916-d7d5-4186-8867-f921210b8815-lock-0000001657]

  不可重入鎖

  Shared Lock 與 Shared Reentrant Lock 相似,但是不可重入。這個不可重入鎖由類 InterProcessSemaphoreMutex 來實現,使用方法和上面的類類似。

  將上面程序中的 InterProcessMutex 換成不可重入鎖 InterProcessSemaphoreMutex,如果再運行上面的代碼,結果就會發現線程被阻塞在第二個 acquire 上,直到超時,也就是此鎖不是可重入的。

  控制台輸出日志:

0. client#2 已獲取到互斥鎖
0. client#1 不能得到互斥鎖
0. client#4 不能得到互斥鎖
0. client#0 不能得到互斥鎖
0. client#3 不能得到互斥鎖
client#1 客戶端關閉!
client#4 客戶端關閉!
client#3 客戶端關閉!
client#0 客戶端關閉!
0. client#2 釋放互斥鎖
0. client#2 不能再次得到互斥鎖
client#2 客戶端關閉!
結束!

  把第二個獲取鎖的代碼注釋,程序才能正常執行:

0. client#1 已獲取到互斥鎖
0. client#1 釋放互斥鎖
0. client#2 已獲取到互斥鎖
0. client#2 釋放互斥鎖
0. client#0 已獲取到互斥鎖
0. client#0 釋放互斥鎖
0. client#4 已獲取到互斥鎖
0. client#4 釋放互斥鎖
0. client#3 已獲取到互斥鎖
0. client#3 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 釋放互斥鎖
1. client#2 已獲取到互斥鎖
1. client#2 釋放互斥鎖
....
....
9. client#4 已獲取到互斥鎖
9. client#4 釋放互斥鎖
9. client#0 已獲取到互斥鎖
client#2 客戶端關閉!
9. client#0 釋放互斥鎖
9. client#1 已獲取到互斥鎖
client#0 客戶端關閉!
client#4 客戶端關閉!
9. client#1 釋放互斥鎖
9. client#3 已獲取到互斥鎖
client#1 客戶端關閉!
9. client#3 釋放互斥鎖
client#3 客戶端關閉!
結束!

  可重入讀寫鎖

  Shared Reentrant Read Write Lock,可重入讀寫鎖,一個讀寫鎖管理一對相關的鎖,一個負責讀操作,另外一個負責寫操作;讀操作在寫鎖沒被使用時可同時由多個進程使用,而寫鎖在使用時不允許讀(阻塞);此鎖是可重入的;一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖,這也意味着寫鎖可以降級成讀鎖, 比如 請求寫鎖 --->讀鎖 ---->釋放寫鎖;從讀鎖升級成寫鎖是不行的。

  可重入讀寫鎖主要由兩個類實現:InterProcessReadWriteLock、InterProcessMutex,使用時首先創建一個 InterProcessReadWriteLock 實例,然后再根據你的需求得到讀鎖或者寫鎖,讀寫鎖的類型是 InterProcessMutex。

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < clientNums; i++) {
        final String clientName = "client#" + i;
        new Thread(new Runnable() {
            @Override
            public void run() {
                CuratorFramework client = ZKUtils.getClient();
                client.start();
                final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
                final InterProcessMutex readLock = lock.readLock();
                final InterProcessMutex writeLock = lock.writeLock();

                try {
                    // 注意只能先得到寫鎖再得到讀鎖,不能反過來!!!
                    if (!writeLock.acquire(10, TimeUnit.SECONDS)) {
                        throw new IllegalStateException(clientName + " 不能得到寫鎖");
                    }
                    System.out.println(clientName + " 已得到寫鎖");
                    if (!readLock.acquire(10, TimeUnit.SECONDS)) {
                        throw new IllegalStateException(clientName + " 不能得到讀鎖");
                    }
                    System.out.println(clientName + " 已得到讀鎖");
                    try {
                        resource.use(); // 使用資源
                    } finally {
                        System.out.println(clientName + " 釋放讀寫鎖");
                        readLock.release();
                        writeLock.release();
                    }
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                } finally {
                    CloseableUtils.closeQuietly(client);
                    countDownLatch.countDown();
                }
            }
        }).start();
    }
    countDownLatch.await();
    System.out.println("結束!");
}

  控制台打印日志:

client#1 已得到寫鎖
client#1 已得到讀鎖
client#1 釋放讀寫鎖
client#2 已得到寫鎖
client#2 已得到讀鎖
client#2 釋放讀寫鎖
client#0 已得到寫鎖
client#0 已得到讀鎖
client#0 釋放讀寫鎖
client#4 已得到寫鎖
client#4 已得到讀鎖
client#4 釋放讀寫鎖
client#3 已得到寫鎖
client#3 已得到讀鎖
client#3 釋放讀寫鎖
結束!

  信號量

  Shared Semaphore,一個計數的信號量類似JDK的 Semaphore,JDK中 Semaphore 維護的一組許可(permits),而Cubator中稱之為租約(Lease)。有兩種方式可以決定 semaphore 的最大租約數,第一種方式是由用戶給定的 path 決定,第二種方式使用 SharedCountReader 類。如果不使用 SharedCountReader,沒有內部代碼檢查進程是否假定有10個租約而進程B假定有20個租約。 所以所有的實例必須使用相同的 numberOfLeases 值.

  信號量主要實現類有:

nterProcessSemaphoreV2 - 信號量實現類
Lease - 租約(單個信號)
SharedCountReader - 計數器,用於計算最大租約數量

  調用 acquire 會返回一個租約對象,客戶端必須在 finally 中 close 這些租約對象,否則這些租約會丟失掉。但是,如果客戶端session由於某種原因比如crash丟掉,那么這些客戶端持有的租約會自動close,這樣其它客戶端可以繼續使用這些租約。租約還可以通過下面的方式返還:

public void returnLease(Lease lease)
public void returnAll(Collection<Lease> leases) 

  注意一次你可以請求多個租約,如果 Semaphore 當前的租約不夠,則請求線程會被阻塞。同時還提供了超時的重載方法。

public Lease acquire() throws Exception
public Collection<Lease> acquire(int qty) throws Exception
public Lease acquire(long time, TimeUnit unit) throws Exception
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception

  一個Demo程序如下:

public class SharedSemaphoreTest {
    private static final int MAX_LEASE = 10;
    private static final String PATH = "/testZK/semaphore";
    private static final FakeLimitedResource resource = new FakeLimitedResource();

    public static void main(String[] args) throws Exception {
        CuratorFramework client = ZKUtils.getClient();
        client.start();
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
        Collection<Lease> leases = semaphore.acquire(5);
        System.out.println("獲取租約數量:" + leases.size());
        Lease lease = semaphore.acquire();
        System.out.println("獲取單個租約");
        resource.use(); // 使用資源
        // 再次申請獲取5個leases,此時leases數量只剩4個,不夠,將超時
        Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
        System.out.println("獲取租約,如果超時將為null: " + leases2);
        System.out.println("釋放租約");
        semaphore.returnLease(lease);
        // 再次申請獲取5個,這次剛好夠
        leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
        System.out.println("獲取租約,如果超時將為null: " + leases2);
        System.out.println("釋放集合中的所有租約");
        semaphore.returnAll(leases);
        semaphore.returnAll(leases2);
        client.close();
        System.out.println("結束!");
    }
}

  控制台打印日志:

獲取租約數量:5
獲取單個租約
獲取租約,如果超時將為null: null
釋放租約
獲取租約,如果超時將為null: [org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@3108bc, 
  org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@370736d9,
  org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@5f9d02cb,
  org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@63753b6d,
  org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@6b09bb57] 釋放集合中的所有租約 結束!

  注意:上面所講的4種鎖都是公平鎖(fair)。從ZooKeeper的角度看,每個客戶端都按照請求的順序獲得鎖,相當公平。

多鎖

  Multi Shared Lock 是一個鎖的容器。當調用 acquire,所有的鎖都會被 acquire,如果請求失敗,所有的鎖都會被 release。同樣調用 release 時所有的鎖都被 release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。

  主要涉及兩個類:

InterProcessMultiLock - 對所對象實現類
InterProcessLock - 分布式鎖接口類

  它的構造函數需要包含的鎖的集合,或者一組 ZooKeeper 的 path,用法和 Shared Lock 相同。

public InterProcessMultiLock(CuratorFramework client, List<String> paths)
public InterProcessMultiLock(List<InterProcessLock> locks)

  一個Demo程序如下:

public class MultiSharedLockTest {
    private static final String lockPath1 = "/testZK/MSLock1";
    private static final String lockPath2 = "/testZK/MSLock2";
    private static final FakeLimitedResource resource = new FakeLimitedResource();

    public static void main(String[] args) throws Exception {
        CuratorFramework client = ZKUtils.getClient();
        client.start();

        InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); // 可重入鎖
        InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2); // 不可重入鎖
        // 組鎖,多鎖
        InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
        if (!lock.acquire(10, TimeUnit.SECONDS)) {
            throw new IllegalStateException("不能獲取多鎖");
        }
        System.out.println("已獲取多鎖");
        System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
        System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
        try {
            resource.use(); // 資源操作
        } finally {
            System.out.println("釋放多個鎖");
            lock.release(); // 釋放多鎖
        }
        System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
        System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
        client.close();
        System.out.println("結束!");
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM