死磕 java同步系列之zookeeper分布式鎖


問題

(1)zookeeper如何實現分布式鎖?

(2)zookeeper分布式鎖有哪些優點?

(3)zookeeper分布式鎖有哪些缺點?

簡介

zooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,它可以為分布式應用提供一致性服務,它是Hadoop和Hbase的重要組件,同時也可以作為配置中心、注冊中心運用在微服務體系中。

本章我們將介紹zookeeper如何實現分布式鎖運用在分布式系統中。

基礎知識

什么是znode?

zooKeeper操作和維護的為一個個數據節點,稱為 znode,采用類似文件系統的層級樹狀結構進行管理,如果 znode 節點包含數據則存儲為字節數組(byte array)。

而且,同一個節點多個客戶同時創建【本篇文章由公眾號“彤哥讀源碼”原創】,只有一個客戶端會成功,其它客戶端創建時將失敗。

zooKeeper

節點類型

znode 共有四種類型:

  • 持久(無序)

  • 持久有序

  • 臨時(無序)

  • 臨時有序

其中,持久節點如果不手動刪除會一直存在,臨時節點當客戶端session失效就會自動刪除節點。

什么是watcher?

watcher(事件監聽器),是zookeeper中的一個很重要的特性。

zookeeper允許用戶在指定節點上注冊一些watcher,並且在一些特定事件觸發的時候,zooKeeper服務端會將事件通知到感興趣的客戶端上去,該機制是Zookeeper實現分布式協調服務的重要特性

KeeperState EventType 觸發條件 說明 操作
SyncConnected(3) None(-1) 客戶端與服務端成功建立連接 此時客戶端和服務器處於連接狀態 -
同上 NodeCreated(1) Watcher監聽的對應數據節點被創建 同上 Create
同上 NodeDeleted(2) Watcher監聽的對應數據節點被刪除 同上 Delete/znode
同上 NodeDataChanged(3) Watcher監聽的對應數據節點的數據內容發生變更 同上 setDate/znode
同上 NodeChildChanged(4) Wather監聽的對應數據節點的子節點列表發生變更 同上 Create/child
Disconnected(0) None(-1) 客戶端與ZooKeeper服務器斷開連接 此時客戶端和服務器處於斷開連接狀態 -
Expired(-112) None(-1) 會話超時 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常 -
AuthFailed(4) None(-1) 通常有兩種情況,1:使用錯誤的schema進行權限檢查 2:SASL權限檢查失敗 通常同時也會收到AuthFailedException異常 -

原理解析

方案一

既然,同一個節點只能創建一次,那么,加鎖時檢測節點是否存在,不存在則創建之,存在或者創建失敗則監聽這個節點的刪除事件,這樣,當釋放鎖的時候監聽的客戶端再次競爭去創建這個節點,成功的則獲取到鎖,不成功的則再次監聽該節點。

zooKeeper

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:

(1)三者同時嘗試創建/locker/user_1節點;

(2)client1創建成功,它獲取到鎖;

(3)client2和client3創建失敗,它們監聽/locker/user_1的刪除事件;

(4)client1執行鎖內業務邏輯;

(5)client1釋放鎖,刪除節點/locker/user_1;

(6)client2和client3都捕獲到節點/locker/user_1被刪除的事件,二者皆被喚醒;

(7)client2和client3同時去創建/locker/user_1節點;

(8)回到第二步,依次類推【本篇文章由公眾號“彤哥讀源碼”原創】;

不過,這種方案有個很嚴重的弊端——驚群效應。

如果並發量很高,多個客戶端同時監聽同一個節點,釋放鎖時同時喚醒這么多個客戶端,然后再競爭,最后還是只有一個能獲取到鎖,其它客戶端又要沉睡,這些客戶端的喚醒沒有任何意義,極大地浪費系統資源,那么有沒有更好的方案呢?答案是當然有,請看方案二。

方案二

為了解決方案一中的驚群效應,我們可以使用有序子節點的形式來實現分布式鎖,而且為了規避客戶端獲取鎖后突然斷線的風險,我們有必要使用臨時有序節點。

zooKeeper

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:

(1)三者同時在/locker/user_1/下面創建臨時有序子節點;

(2)三者皆創建成功,分別為/locker/user_1/0000000001、/locker/user_1/0000000003、/locker/user_1/0000000002;

(3)檢查自己創建的節點是不是子節點中最小的;

(4)client1發現自己是最小的節點,它獲取到鎖;

(5)client2和client3發現自己不是最小的節點,它們無法獲取到鎖;

(6)client2創建的節點為/locker/user_1/0000000003,它監聽其上一個節點/locker/user_1/0000000002的刪除事件;

(7)client3創建的節點為/locker/user_1/0000000002,它監聽其上一個節點/locker/user_1/0000000001的刪除事件;

(8)client1執行鎖內業務邏輯;

(9)client1釋放鎖,刪除節點/locker/user_1/0000000001;

(10)client3監聽到節點/locker/user_1/0000000001的刪除事件,被喚醒;

(11)client3再次檢查自己是不是最小的節點,發現是,則獲取到鎖;

(12)client3執行鎖內業務邏輯【本篇文章由公眾號“彤哥讀源碼”原創】;

(13)client3釋放鎖,刪除節點/locker/user_1/0000000002;

(14)client2監聽到節點/locker/user_1/0000000002的刪除事件,被喚醒;

(15)client2執行鎖內業務邏輯;

(16)client2釋放鎖,刪除節點/locker/user_1/0000000003;

(17)client2檢查/locker/user_1/下是否還有子節點,沒有了則刪除/locker/user_1節點;

(18)流程結束;

這種方案相對於方案一來說,每次釋放鎖時只喚醒一個客戶端,減少了線程喚醒的代價,提高了效率。

zookeeper原生API實現

pom文件

pom中引入以下jar包:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.5</version>
</dependency>

Locker接口

定義一個Locker接口,與上一章mysql分布式鎖使用同一個接口。

public interface Locker {
    void lock(String key, Runnable command);
}

zookeeper分布式鎖實現

這里通過內部類ZkLockerWatcher處理zookeeper的相關操作,需要注意以下幾點:

(1)zk連接建立完畢之前不要進行相關操作,否則會報ConnectionLoss異常,這里通過LockSupport.park();阻塞連接線程並在監聽線程中喚醒處理;

(2)客戶端線程與監聽線程不是同一個線程,所以可以通過LockSupport.park();及LockSupport.unpark(thread);來處理;

(3)中間很多步驟不是原子的(坑),所以需要再次檢測,詳見代碼中注釋;

@Slf4j
@Component
public class ZkLocker implements Locker {
    @Override
    public void lock(String key, Runnable command) {
        ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
        try {
            if (watcher.getLock()) {
                command.run();
            }
        } finally {
            watcher.releaseLock();
        }
    }

    private static class ZkLockerWatcher implements Watcher {
        public static final String connAddr = "127.0.0.1:2181";
        public static final int timeout = 6000;
        public static final String LOCKER_ROOT = "/locker";

        ZooKeeper zooKeeper;
        String parentLockPath;
        String childLockPath;
        Thread thread;

        public static ZkLockerWatcher conn(String key) {
            ZkLockerWatcher watcher = new ZkLockerWatcher();
            try {
                ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
                watcher.thread = Thread.currentThread();
                // 阻塞等待連接建立完畢
                LockSupport.park();
                // 根節點如果不存在,就創建一個(並發問題,如果兩個線程同時檢測不存在,兩個同時去創建必須有一個會失敗)
                if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
                    try {
                        zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節點已存在,則創建失敗,這里捕獲異常,並不阻擋程序正常運行
                        log.info("創建節點 {} 失敗", LOCKER_ROOT);
                    }
                }
                // 當前加鎖的節點是否存在
                watcher.parentLockPath = LOCKER_ROOT + "/" + key;
                if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
                    try {
                        zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節點已存在,則創建失敗,這里捕獲異常,並不阻擋程序正常運行
                        log.info("創建節點 {} 失敗", watcher.parentLockPath);
                    }
                }

            } catch (Exception e) {
                log.error("conn to zk error", e);
                throw new RuntimeException("conn to zk error");
            }
            return watcher;
        }

        public boolean getLock() {
            try {
                // 創建子節點【本篇文章由公眾號“彤哥讀源碼”原創】
                this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                // 檢查自己是不是最小的節點,是則獲取成功,不是則監聽上一個節點
                return getLockOrWatchLast();
            } catch (Exception e) {
                log.error("get lock error", e);
                throw new RuntimeException("get lock error");
            } finally {
//                System.out.println("getLock: " + childLockPath);
            }
        }

        public void releaseLock() {
            try {
                if (childLockPath != null) {
                    // 釋放鎖,刪除節點
                    zooKeeper.delete(childLockPath, -1);
                }
                // 最后一個釋放的刪除鎖節點
                List<String> children = zooKeeper.getChildren(parentLockPath, false);
                if (children.isEmpty()) {
                    try {
                        zooKeeper.delete(parentLockPath, -1);
                    } catch (KeeperException e) {
                        // 如果刪除之前又新加了一個子節點,會刪除失敗
                        log.info("刪除節點 {} 失敗", parentLockPath);
                    }
                }
                // 關閉zk連接
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error");
            } finally {
//                System.out.println("releaseLock: " + childLockPath);
            }
        }

        private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
            List<String> children = zooKeeper.getChildren(parentLockPath, false);
            // 必須要排序一下,這里取出來的順序可能是亂的
            Collections.sort(children);
            // 如果當前節點是第一個子節點,則獲取鎖成功
            if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
                return true;
            }

            // 如果不是第一個子節點,就監聽前一個節點
            String last = "";
            for (String child : children) {
                if ((parentLockPath + "/" + child).equals(childLockPath)) {
                    break;
                }
                last = child;
            }

            if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
                this.thread = Thread.currentThread();
                // 阻塞當前線程
                LockSupport.park();
                // 喚醒之后重新檢測自己是不是最小的節點,因為有可能上一個節點斷線了
                return getLockOrWatchLast();
            } else {
                // 如果上一個節點不存在,說明還沒來得及監聽就釋放了,重新檢查一次
                return getLockOrWatchLast();
            }
        }

        @Override
        public void process(WatchedEvent event) {
            if (this.thread != null) {
                // 喚醒阻塞的線程(這是在監聽線程,跟獲取鎖的線程不是同一個線程)
                LockSupport.unpark(this.thread);
                this.thread = null;
            }
        }
    }
}

測試代碼

我們這里起兩批線程,一批獲取user_1這個鎖,一批獲取user_2這個鎖。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testZkLocker() throws IOException {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                locker.lock("user_1", ()-> {
                    try {
                        System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }
        for (int i = 1000; i < 2000; i++) {
            new Thread(()->{
                locker.lock("user_2", ()-> {
                    try {
                        System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}

運行結果:

可以看到穩定在500ms左右打印兩個鎖的結果。

user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621

curator實現

上面的原生API實現更易於理解zookeeper實現分布式鎖的邏輯,但是難免保證沒有什么問題,比如不是重入鎖,不支持讀寫鎖等。

下面我們一起看看現有的輪子curator是怎么實現的。

pom文件

pom文件中引入以下jar包:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

代碼實現

下面是互斥鎖的一種實現方案:

@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
    public static final String connAddr = "127.0.0.1:2181";
    public static final int timeout = 6000;
    public static final String LOCKER_ROOT = "/locker";

    private CuratorFramework cf;

    @PostConstruct
    public void init() {
        this.cf = CuratorFrameworkFactory.builder()
                .connectString(connAddr)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        cf.start();
    }

    @Override
    public void lock(String key, Runnable command) {
        String path = LOCKER_ROOT + "/" + key;
        InterProcessLock lock = new InterProcessMutex(cf, path);
        try {
            // 【本篇文章由公眾號“彤哥讀源碼”原創】
            lock.acquire();
            command.run();
        } catch (Exception e) {
            log.error("get lock error", e);
            throw new RuntimeException("get lock error", e);
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error", e);
            }
        }
    }
}

除了互斥鎖,curator還提供了讀寫鎖、多重鎖、信號量等實現方式,而且他們是可重入的鎖。

總結

(1)zookeeper中的節點有四種類型:持久、持久有序、臨時、臨時有序;

(2)zookeeper提供了一種非常重要的特性——監聽機制,它可以用來監聽節點的變化;

(3)zookeeper分布式鎖是基於 臨時有序節點 + 監聽機制 實現的;

(4)zookeeper分布式鎖加鎖時在鎖路徑下創建臨時有序節點;

(5)如果自己是第一個節點,則獲得鎖;

(6)如果自己不是第一個節點,則監聽前一個節點,並阻塞當前線程;

(7)當監聽到前一個節點的刪除事件時,喚醒當前節點的線程,並再次檢查自己是不是第一個節點;

(8)使用臨時有序節點而不是持久有序節點是為了讓客戶端無故斷線時能夠自動釋放鎖;

彩蛋

zookeeper分布式鎖有哪些優點?

答:1)zookeeper本身可以集群部署,相對於mysql的單點更可靠;

2)不會占用mysql的連接數,不會增加mysql的壓力;

3)使用監聽機制,減少線程上下文切換的次數;

4)客戶端斷線能夠自動釋放鎖,非常安全;

5)有現有的輪子curator可以使用;

6)curator實現方式是可重入的,對現有代碼改造成本小;

zookeeper分布式鎖有哪些缺點?

答:1)加鎖會頻繁地“寫”zookeeper,增加zookeeper的壓力;

2)寫zookeeper的時候會在集群進行同步,節點數越多,同步越慢,獲取鎖的過程越慢;

3)需要另外依賴zookeeper,而大部分服務是不會使用zookeeper的,增加了系統的復雜性;

4)相對於redis分布式鎖,性能要稍微略差一些;

推薦閱讀

1、死磕 java同步系列之開篇

2、死磕 java魔法類之Unsafe解析

3、死磕 java同步系列之JMM(Java Memory Model)

4、死磕 java同步系列之volatile解析

5、死磕 java同步系列之synchronized解析

6、死磕 java同步系列之自己動手寫一個鎖Lock

7、死磕 java同步系列之AQS起篇

8、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

9、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

10、死磕 java同步系列之ReentrantLock VS synchronized

11、死磕 java同步系列之ReentrantReadWriteLock源碼解析

12、死磕 java同步系列之Semaphore源碼解析

13、死磕 java同步系列之CountDownLatch源碼解析

14、死磕 java同步系列之AQS終篇

15、死磕 java同步系列之StampedLock源碼解析

16、死磕 java同步系列之CyclicBarrier源碼解析

17、死磕 java同步系列之Phaser源碼解析

18、死磕 java同步系列之mysql分布式鎖


歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM