ZooKeeper 分布式鎖 Curator 源碼 04:分布式信號量和互斥鎖


前言

分布式信號量,之前在 Redisson 中也介紹過,Redisson 的信號量是將計數維護在 Redis 中的,那現在來看一下 Curator 是如何基於 ZooKeeper 實現信號量的。

使用 Demo

public class CuratorDemo {

    public static void main(String[] args) throws Exception {

        String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString(connectString)
                .retryPolicy(retryPolicy)
                .build();
        client.start();

        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphores/semaphore_01", 3);

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread() + " 線程 start - " + LocalTime.now());
                    Lease lease = semaphore.acquire();
                    System.out.println(Thread.currentThread() + " 線程 execute - " + LocalTime.now());
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread() + " 線程 over -" + LocalTime.now());
                    semaphore.returnLease(lease);
                } catch (Exception e) {

                }

            }).start();
        }

        Thread.sleep(1000000);

    }
}

控制台輸出數據如下:

源碼

獲取憑證

核心源碼:InterProcessSemaphoreV2#internalAcquire1Lease

這里僅介紹大概邏輯,有興趣的小伙伴可以自行閱讀源碼。

lock 是 InterProcessMutexInterProcessSemaphoreV2 信號量,也是借助於最基礎的加鎖。

通過圖也可以看出,使用 InterProcessSemaphoreV2 時,會先創建 /semaphores/semaphore_01 路徑,並在路徑下創建 locks 節點。也就是 /semaphores/semaphore_01/locks 路徑下,有 10 個臨時順序節點。

緊接着會在 /semaphores/semaphore_01 路徑下創建 leases 節點,所以創建鎖的臨時順序節點之后,會緊接着在 /semaphores/semaphore_01/leases 下創建臨時順序節點。

/semaphores/semaphore_01/leases 節點進行監聽,同時獲取 /semaphores/semaphore_01/leases 下面的子節點數量。

  1. 如果子節點數量小於等於信號量計數,則直接結束循環;
  2. 如果大於,則會進入 wait 等待喚醒。

釋放憑證

釋放憑證就是調用 Lease 的 close 方法,刪除節點,這樣 /semaphores/semaphore_01/leases 上的監聽器就會觸發,然后其他線程獲取憑證。

互斥鎖

互斥鎖 InterProcessSemaphoreMutex,不支持重入,其他的和可重入鎖並沒有什么區別。就是基於 InterProcessSemaphoreV2 實現的。

就是把計數的值 maxLeases 設置為了 1。

總結

信號量 InterProcessSemaphoreV2 其實是通過判斷節點下的子節點數量來實現控制信號量,同時內部加鎖是基於可重入鎖 InterProcessMutex 實現的。

互斥鎖 InterProcessSemaphoreMutex 則是將信號量的技術設置為 1 來實現互斥功能。

相關推薦


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM