ZK分布式鎖原理


單體應用鎖

在單體的應用開發場景中,涉及並發同步的時候,大家往往采用synchronized或者Lock的方式來解決多線程間的同步問題。但在分布式集群工作的開發場景中,那么就需要一種更加高級的鎖機制,來處理種跨JVM進程之間的數據同步問題,這就是分布式鎖。

公平鎖和可重入鎖的原理

最經典的分布式鎖是可重入的公平鎖。什么是可重入的公平鎖呢?直接講解的概念和原理,會比較抽象難懂,還是從具體的實例入手吧!這里用一個簡單的故事來類比,估計就簡單多了。

故事發生在一個沒有自來水的古代,在一個村子有一口井,水質非常的好,村民們都搶着取井里的水。井就那么一口,村里的人很多,村民為爭搶取水打架斗毆,甚至頭破血流。

問題總是要解決,於是村長絞盡腦汁,最終想出了一個憑號取水的方案。井邊安排一個看井人,維護取水的秩序。取水秩序很簡單:

(1)取水之前,先取號;

(2)號排在前面的,就可以先取水;

(3)先到的排在前面,那些后到的,一個一個挨着,在井邊排成一隊。

取水示意圖,如圖10-3所示。

圖10-3 排隊取水示意圖

這種排隊取水模型,就是一種鎖的模型。排在最前面的號,擁有取水權,就是一種典型的獨占鎖。另外,先到先得,號排在前面的人先取到水,取水之后就輪到下一個號取水,挺公平的,說明它是一種公平鎖。

什么是可重入鎖呢?
假定,取水時以家庭為單位,家庭的某人拿到號,其他的家庭成員過來打水,這時候不用再取號,如圖10-4所示。

圖10-4 同一家庭的人不需要重復排隊

圖10-4中,排在1號的家庭,老公取號,假設其老婆來了,直接排第一個,正所謂妻憑夫貴。再看上圖的2號,父親正在打水,假設其兒子和女兒也到井邊了,直接排第二個,所謂子憑父貴。總之,如果取水時以家庭為單位,則同一個家庭,可以直接復用排號,不用從后面排起重新取號。

以上這個故事模型中,取號一次,可以用來多次取水,其原理為可重入鎖的模型。在重入鎖模型中,一把獨占鎖,可以被多次鎖定,這就叫做可重入鎖。

Zookeeper分布式鎖的原理

理解了經典的公平可重入鎖的原理后,再來看在分布式場景下的公平可重入鎖的原理。通過前面的分析,基本可以判定:ZooKeeper
的臨時順序節點,天生就有一副實現分布式鎖的胚子。為什么呢?

(一) ZooKeeper的每一個節點,都是一個天然的順序發號器。

在每一個節點下面創建臨時順序節點(EPHEMERAL_SEQUENTIAL)類型,新的子節點后面,會加上一個次序編號,而這個生成的次序編號,是上一個生成的次序編號加一。

例如,有一個用於發號的節點“/test/lock”為父親節點,可以在這個父節點下面創建相同前綴的臨時順序子節點,假定相同的前綴為“/test/lock/seq-”。第一個創建的子節點基本上應該為/test/lock/seq-0000000000,下一個節點則為/test/lock/seq-0000000001,依次類推,如果10-5所示。

圖10-5 Zookeeper臨時順序節點的天然的發號器作用

(二) ZooKeeper節點的遞增有序性,可以確保鎖的公平

一個ZooKeeper分布式鎖,首先需要創建一個父節點,盡量是持久節點(PERSISTENT類型),然后每個要獲得鎖的線程,都在這個節點下創建個臨時順序節點。由於ZK節點,是按照創建的次序,依次遞增的。

為了確保公平,可以簡單的規定:編號最小的那個節點,表示獲得了鎖。所以,每個線程在嘗試占用鎖之前,首先判斷自己是排號是不是當前最小,如果是,則獲取鎖。

(三)ZooKeeper的節點監聽機制,可以保障占有鎖的傳遞有序而且高效

每個線程搶占鎖之前,先嘗試創建自己的ZNode。同樣,釋放鎖的時候,就需要刪除創建的Znode。創建成功后,如果不是排號最小的節點,就處於等待通知的狀態。等誰的通知呢?不需要其他人,只需要等前一個Znode
的通知就可以了。前一個Znode刪除的時候,會觸發Znode事件,當前節點能監聽到刪除事件,就是輪到了自己占有鎖的時候。第一個通知第二個、第二個通知第三個,擊鼓傳花似的依次向后。

ZooKeeper的節點監聽機制,能夠非常完美地實現這種擊鼓傳花似的信息傳遞。具體的方法是,每一個等通知的Znode節點,只需要監聽(linsten)或者監視(watch)排號在自己前面那個,而且緊挨在自己前面的那個節點,就能收到其刪除事件了。
只要上一個節點被刪除了,就進行再一次判斷,看看自己是不是序號最小的那個節點,如果是,自己就獲得鎖。

另外,ZooKeeper的內部優越的機制,能保證由於網絡異常或者其他原因,集群中占用鎖的客戶端失聯時,鎖能夠被有效釋放。一旦占用Znode鎖的客戶端與ZooKeeper集群服務器失去聯系,這個臨時Znode也將自動刪除。排在它后面的那個節點,也能收到刪除事件,從而獲得鎖。正是由於這個原因,在創建取號節點的時候,盡量創建臨時znode
節點,

(四)ZooKeeper的節點監聽機制,能避免羊群效應

ZooKeeper這種首尾相接,后面監聽前面的方式,可以避免羊群效應。所謂羊群效應就是一個節點掛掉,所有節點都去監聽,然后做出反應,這樣會給服務器帶來巨大壓力,所以有了臨時順序節點,當一個節點掛掉,只有它后面的那一個節點才做出反應。

圖解: 分布式鎖的搶占過程

接下來我們一起來看看,多客戶端獲取及釋放zk分布式鎖的整個流程及背后的原理。

首先大家看看下面的圖,如果現在有兩個客戶端一起要爭搶zk上的一把分布式鎖,會是個什么場景?

如果大家對zk還不太了解的話,建議先自行百度一下,簡單了解點基本概念,比如zk有哪些節點類型等等。

參見上圖。zk里有一把鎖,這個鎖就是zk上的一個節點。然后呢,兩個客戶端都要來獲取這個鎖,具體是怎么來獲取呢?

咱們就假設客戶端A搶先一步,對zk發起了加分布式鎖的請求,這個加鎖請求是用到了zk中的一個特殊的概念,叫做“臨時順序節點”。

簡單來說,就是直接在"my_lock"這個鎖節點下,創建一個順序節點,這個順序節點有zk內部自行維護的一個節點序號。

客戶端A發起一個加鎖請求

比如說,第一個客戶端來搞一個順序節點,zk內部會給起個名字叫做:xxx-000001。然后第二個客戶端來搞一個順序節點,zk可能會起個名字叫做:xxx-000002。大家注意一下,最后一個數字都是依次遞增的,從1開始逐次遞增。zk會維護這個順序。

所以這個時候,假如說客戶端A先發起請求,就會搞出來一個順序節點,大家看下面的圖,Curator框架大概會弄成如下的樣子:

大家看,客戶端A發起一個加鎖請求,先會在你要加鎖的node下搞一個臨時順序節點,這一大坨長長的名字都是Curator框架自己生成出來的。

然后,那個最后一個數字是"1"。大家注意一下,因為客戶端A是第一個發起請求的,所以給他搞出來的順序節點的序號是"1"。

接着客戶端A創建完一個順序節點。還沒完,他會查一下"my_lock"這個鎖節點下的所有子節點,並且這些子節點是按照序號排序的,這個時候他大概會拿到這么一個集合:

接着客戶端A會走一個關鍵性的判斷,就是說:唉!兄弟,這個集合里,我創建的那個順序節點,是不是排在第一個啊?

如果是的話,那我就可以加鎖了啊!因為明明我就是第一個來創建順序節點的人,所以我就是第一個嘗試加分布式鎖的人啊!

bingo!加鎖成功!大家看下面的圖,再來直觀的感受一下整個過程。

客戶端B過來排隊

接着假如說,客戶端A都加完鎖了,客戶端B過來想要加鎖了,這個時候他會干一樣的事兒:先是在"my_lock"這個鎖節點下創建一個臨時順序節點,此時名字會變成類似於:

大家看看下面的圖:

客戶端B因為是第二個來創建順序節點的,所以zk內部會維護序號為"2"。

接着客戶端B會走加鎖判斷邏輯,查詢"my_lock"鎖節點下的所有子節點,按序號順序排列,此時他看到的類似於:

同時檢查自己創建的順序節點,是不是集合中的第一個?

明顯不是啊,此時第一個是客戶端A創建的那個順序節點,序號為"01"的那個。所以加鎖失敗!

客戶端B開啟監聽客戶端A

加鎖失敗了以后,客戶端B就會通過ZK的API對他的順序節點的上一個順序節點加一個監聽器。zk天然就可以實現對某個節點的監聽。

如果大家還不知道zk的基本用法,可以百度查閱,非常的簡單。客戶端B的順序節點是:

他的上一個順序節點,不就是下面這個嗎?

即客戶端A創建的那個順序節點!

所以,客戶端B會對:

這個節點加一個監聽器,監聽這個節點是否被刪除等變化!大家看下面的圖。

接着,客戶端A加鎖之后,可能處理了一些代碼邏輯,然后就會釋放鎖。那么,釋放鎖是個什么過程呢?

其實很簡單,就是把自己在zk里創建的那個順序節點,也就是:

這個節點給刪除。

刪除了那個節點之后,zk會負責通知監聽這個節點的監聽器,也就是客戶端B之前加的那個監聽器,說:兄弟,你監聽的那個節點被刪除了,有人釋放了鎖。

此時客戶端B的監聽器感知到了上一個順序節點被刪除,也就是排在他之前的某個客戶端釋放了鎖

客戶端B搶鎖成功

此時,就會通知客戶端B重新嘗試去獲取鎖,也就是獲取"my_lock"節點下的子節點集合,此時為:

集合里此時只有客戶端B創建的唯一的一個順序節點了!

然后呢,客戶端B判斷自己居然是集合中的第一個順序節點,bingo!可以加鎖了!直接完成加鎖,運行后續的業務代碼即可,運行完了之后再次釋放鎖。

分布式鎖的基本實現

接下來就是基於ZooKeeper,實現一下分布式鎖。首先,定義了一個鎖的接口Lock,很簡單,僅僅兩個抽象方法:一個加鎖方法,一個解鎖方法。Lock接口的代碼如下:

package com.crazymakercircle.zk.distributedLock;

/**
 * create by 尼恩 @ 瘋狂創客圈
 **/
public interface Lock {
    /**
     * 加鎖方法
     *
     * @return 是否成功加鎖
     */
    boolean lock() throws Exception;

    /**
     * 解鎖方法
     *
     * @return 是否成功解鎖
     */
    boolean unlock();
}

使用ZooKeeper實現分布式鎖的算法,有以下幾個要點:

(1)一把分布式鎖通常使用一個Znode節點表示;如果鎖對應的Znode節點不存在,首先創建Znode節點。這里假設為“/test/lock”,代表了一把需要創建的分布式鎖。

(2)搶占鎖的所有客戶端,使用鎖的Znode節點的子節點列表來表示;如果某個客戶端需要占用鎖,則在“/test/lock”下創建一個臨時有序的子節點。

這里,所有臨時有序子節點,盡量共用一個有意義的子節點前綴。

比如,如果子節點的前綴為“/test/lock/seq-”,則第一次搶鎖對應的子節點為“/test/lock/seq-000000000”,第二次搶鎖對應的子節點為“/test/lock/seq-000000001”,以此類推。

再比如,如果子節點前綴為“/test/lock/”,則第一次搶鎖對應的子節點為“/test/lock/000000000”,第二次搶鎖對應的子節點為“/test/lock/000000001”,以此類推,也非常直觀。

(3)如果判定客戶端是否占有鎖呢?
很簡單,客戶端創建子節點后,需要進行判斷:自己創建的子節點,是否為當前子節點列表中序號最小的子節點。如果是,則認為加鎖成功;如果不是,則監聽前一個Znode子節點變更消息,等待前一個節點釋放鎖。

(4)一旦隊列中的后面的節點,獲得前一個子節點變更通知,則開始進行判斷,判斷自己是否為當前子節點列表中序號最小的子節點,如果是,則認為加鎖成功;如果不是,則持續監聽,一直到獲得鎖。

(5)獲取鎖后,開始處理業務流程。完成業務流程后,刪除自己的對應的子節點,完成釋放鎖的工作,以方面后繼節點能捕獲到節點變更通知,獲得分布式鎖。

實戰:加鎖的實現

Lock接口中加鎖的方法是lock()。lock()方法的大致流程是:首先嘗試着去加鎖,如果加鎖失敗就去等待,然后再重復。

1.lock()方法的實現代碼

lock()方法加鎖的實現代碼,大致如下

package com.crazymakercircle.zk.distributedLock;

import com.crazymakercircle.zk.ZKclient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * create by 尼恩 @ 瘋狂創客圈
 **/
@Slf4j
public class ZkLock implements Lock {
    //ZkLock的節點鏈接
    private static final String ZK_PATH = "/test/lock";
    private static final String LOCK_PREFIX = ZK_PATH + "/";
    private static final long WAIT_TIME = 1000;
    //Zk客戶端
    CuratorFramework client = null;

    private String locked_short_path = null;
    private String locked_path = null;
    private String prior_path = null;
    final AtomicInteger lockCount = new AtomicInteger(0);
    private Thread thread;

    public ZkLock() {
        ZKclient.instance.init();
        synchronized (ZKclient.instance) {
            if (!ZKclient.instance.isNodeExist(ZK_PATH)) {
                ZKclient.instance.createNode(ZK_PATH, null);
            }
        }
        client = ZKclient.instance.getClient();
    }

    @Override
    public boolean lock() {
//可重入,確保同一線程,可以重復加鎖

        synchronized (this) {
            if (lockCount.get() == 0) {
                thread = Thread.currentThread();
                lockCount.incrementAndGet();
            } else {
                if (!thread.equals(Thread.currentThread())) {
                    return false;
                }
                lockCount.incrementAndGet();
                return true;
            }
        }

        try {
            boolean locked = false;
//首先嘗試着去加鎖
            locked = tryLock();

            if (locked) {
                return true;
            }
            //如果加鎖失敗就去等待
            while (!locked) {

                await();

                //獲取等待的子節點列表

                List<String> waiters = getWaiters();
//判斷,是否加鎖成功
                if (checkLocked(waiters)) {
                    locked = true;
                }
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            unlock();
        }

        return false;
    }



//...省略其他的方法

}

2.tryLock()嘗試加鎖

嘗試加鎖的tryLock方法是關鍵,做了兩件重要的事情:

(1)創建臨時順序節點,並且保存自己的節點路徑

(2)判斷是否是第一個,如果是第一個,則加鎖成功。如果不是,就找到前一個Znode節點,並且保存其路徑到prior_path。

嘗試加鎖的tryLock方法,其實現代碼如下:

 /**
     * 嘗試加鎖
     * @return 是否加鎖成功
     * @throws Exception 異常
     */
    private boolean tryLock() throws Exception {
        //創建臨時Znode
        locked_path = ZKclient.instance
                .createEphemeralSeqNode(LOCK_PREFIX);
        //然后獲取所有節點
        List<String> waiters = getWaiters();

        if (null == locked_path) {
            throw new Exception("zk error");
        }
        //取得加鎖的排隊編號
        locked_short_path = getShortPath(locked_path);

        //獲取等待的子節點列表,判斷自己是否第一個
        if (checkLocked(waiters)) {
            return true;
        }

        // 判斷自己排第幾個
        int index = Collections.binarySearch(waiters, locked_short_path);
        if (index < 0) { // 網絡抖動,獲取到的子節點列表里可能已經沒有自己了
            throw new Exception("節點沒有找到: " + locked_short_path);
        }

        //如果自己沒有獲得鎖,則要監聽前一個節點
        prior_path = ZK_PATH + "/" + waiters.get(index - 1);

        return false;
    }

    private String getShortPath(String locked_path) {

        int index = locked_path.lastIndexOf(ZK_PATH + "/");
        if (index >= 0) {
            index += ZK_PATH.length() + 1;
            return index <= locked_path.length() ? locked_path.substring(index) : "";
        }
        return null;
    }

創建臨時順序節點后,其完整路徑存放在locked_path成員中;另外還截取了一個后綴路徑,放在
locked_short_path成員中,后綴路徑是一個短路徑,只有完整路徑的最后一層。為什么要單獨保存短路徑呢?
因為,在獲取的遠程子節點列表中的其他路徑返回結果時,返回的都是短路徑,都只有最后一層路徑。所以為了方便后續進行比較,也把自己的短路徑保存下來。

創建了自己的臨時節點后,調用checkLocked方法,判斷是否是鎖定成功。如果鎖定成功,則返回true;如果自己沒有獲得鎖,則要監聽前一個節點,此時需要找出前一個節點的路徑,並保存在
prior_path
成員中,供后面的await()等待方法去監聽使用。在進入await()等待方法的介紹前,先說下checkLocked
鎖定判斷方法。

3.checkLocked()檢查是否持有鎖

在checkLocked()方法中,判斷是否可以持有鎖。判斷規則很簡單:當前創建的節點,是否在上一步獲取到的子節點列表的第一個位置:

(1)如果是,說明可以持有鎖,返回true,表示加鎖成功;

(2)如果不是,說明有其他線程早已先持有了鎖,返回false。

checkLocked()方法的代碼如下:

private boolean checkLocked(List<String> waiters) {

        //節點按照編號,升序排列
        Collections.sort(waiters);

        // 如果是第一個,代表自己已經獲得了鎖
        if (locked_short_path.equals(waiters.get(0))) {
            log.info("成功的獲取分布式鎖,節點為{}", locked_short_path);
            return true;
        }
        return false;
    }

checkLocked方法比較簡單,將參與排隊的所有子節點列表,從小到大根據節點名稱進行排序。排序主要依靠節點的編號,也就是后Znode路徑的10位數字,因為前綴都是一樣的。排序之后,做判斷,如果自己的locked_short_path編號位置排在第一個,如果是,則代表自己已經獲得了鎖。如果不是,則會返回false。

如果checkLocked()為false,外層的調用方法,一般來說會執行await()等待方法,執行奪鎖失敗以后的等待邏輯。

4.await()監聽前一個節點釋放鎖

await()也很簡單,就是監聽前一個ZNode節點(prior_path成員)的刪除事件,代碼如下:

private void await() throws Exception {

        if (null == prior_path) {
            throw new Exception("prior_path error");
        }

        final CountDownLatch latch = new CountDownLatch(1);


        //訂閱比自己次小順序節點的刪除事件
        Watcher w = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("監聽到的變化 watchedEvent = " + watchedEvent);
                log.info("[WatchedEvent]節點刪除");

                latch.countDown();
            }
        };

        client.getData().usingWatcher(w).forPath(prior_path);
/*
        //訂閱比自己次小順序節點的刪除事件
        TreeCache treeCache = new TreeCache(client, prior_path);
        TreeCacheListener l = new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client,
                                   TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if (data != null) {
                    switch (event.getType()) {
                        case NODE_REMOVED:
                            log.debug("[TreeCache]節點刪除, path={}, data={}",
                                    data.getPath(), data.getData());

                            latch.countDown();
                            break;
                        default:
                            break;
                    }
                }
            }
        };

        treeCache.getListenable().addListener(l);
        treeCache.start();*/
        latch.await(WAIT_TIME, TimeUnit.SECONDS);
    }

首先添加一個Watcher監聽,而監聽的節點,正是前面所保存在prior_path成員的前一個節點的路徑。這里,僅僅去監聽自己前一個節點的變動,而不是其他節點的變動,提升效率。完成監聽之后,調用latch.await(),線程進入等待狀態,一直到線程被監聽回調代碼中的latch.countDown() 所喚醒,或者等待超時。

說 明

以上代碼用到的CountDownLatch的核心原理和實戰知識,《Netty Zookeeper Redis 高並發實戰》姊妹篇 《Java高並發核心編程(卷2)》。

上面的代碼中,監聽前一個節點的刪除,可以使用兩種監聽方式:

(1)Watcher 訂閱;

(2)TreeCache 訂閱。

兩種方式的效果,都差不多。但是這里的刪除事件,只需要監聽一次即可,不需要反復監聽,所以使用的是Watcher
一次性訂閱。而TreeCache 訂閱的代碼在源碼工程中已經被注釋,僅僅供大家參考。

一旦前一個節點prior_path節點被刪除,那么就將線程從等待狀態喚醒,重新一輪的鎖的爭奪,直到獲取鎖,並且完成業務處理。

至此,分布式Lock加鎖的算法,還差一點就介紹完成。這一點,就是實現鎖的可重入。

5.可重入的實現代碼

什么是可重入呢?只需要保障同一個線程進入加鎖的代碼,可以重復加鎖成功即可。
修改前面的lock方法,在前面加上可重入的判斷邏輯。代碼如下:

@Override

public boolean lock() {

//可重入的判斷

synchronized (this) {

if (lockCount.get() == 0) {

thread = Thread.currentThread();

lockCount.incrementAndGet();

} else {

if (!thread.equals(Thread.currentThread())) {

return false;

}

lockCount.incrementAndGet();

return true;

}

}

//....

}

為了變成可重入,在代碼中增加了一個加鎖的計數器lockCount
,計算重復加鎖的次數。如果是同一個線程加鎖,只需要增加次數,直接返回,表示加鎖成功。

至此,lock()方法已經介紹完成,接下來,就是去釋放鎖

實戰:釋放鎖的實現

Lock接口中的unLock()方法,表示釋放鎖,釋放鎖主要有兩個工作:

(1)減少重入鎖的計數,如果最終的值不是0,直接返回,表示成功的釋放了一次;

(2)如果計數器為0,移除Watchers監聽器,並且刪除創建的Znode臨時節點。

unLock()方法的代碼如下:

/**
     * 釋放鎖
     *
     * @return 是否成功釋放鎖
     */
    @Override
    public boolean unlock() {
//只有加鎖的線程,能夠解鎖
        if (!thread.equals(Thread.currentThread())) {
            return false;
        }
//減少可重入的計數
        int newLockCount = lockCount.decrementAndGet();
//計數不能小於0
        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path);
        }
//如果計數不為0,直接返回
        if (newLockCount != 0) {
            return true;
        }
        //刪除臨時節點
        try {
            if (ZKclient.instance.isNodeExist(locked_path)) {
                client.delete().forPath(locked_path);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }

這里,為了盡量保證線程安全,可重入計數器的類型,使用的不是int類型,而是Java並發包中的原子類型——AtomicInteger。

實戰:分布式鎖的使用

寫一個用例,測試一下ZLock的使用,代碼如下:

 @Test
    public void testLock() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            FutureTaskScheduler.add(() -> {
                //創建鎖
                ZkLock lock = new ZkLock();
                lock.lock();
//每條線程,執行10次累加
                for (int j = 0; j < 10; j++) {
//公共的資源變量累加
                    count++;
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("count = " + count);
                //釋放鎖
                lock.unlock();

            });
        }

        Thread.sleep(Integer.MAX_VALUE);
    }

以上代碼是10個並發任務,每個任務累加10次,執行以上用例,會發現結果會是預期的和100,如果不使用鎖,結果可能就不是100,因為上面的count是一個普通的變量,不是線程安全的。

說 明

有關線程安全的核心原理和實戰知識,請參閱本書的下一卷《Java高並發核心編程(卷2)》。

原理上一個Zlock實例代表一把鎖,並需要占用一個Znode永久節點,如果需要很多分布式鎖,則也需要很多的不同的Znode節點。以上代碼,如果要擴展為多個分布式鎖的版本,還需要進行簡單改造,這種改造留給各位自己去練習和實現吧。

curator的InterProcessMutex 可重入鎖

分布式鎖Zlock自主實現主要的價值:學習一下分布式鎖的原理和基礎開發,僅此而已。實際的開發中,如果需要使用到分布式鎖,不建議去自己造輪子,建議直接使用Curator客戶端中的各種官方實現的分布式鎖,比如其中的InterProcessMutex
可重入鎖。

這里提供一個簡單的InterProcessMutex 可重入鎖的使用實例,代碼如下:

@Test
    public void testzkMutex() throws InterruptedException {

        CuratorFramework client = ZKclient.instance.getClient();
        final InterProcessMutex zkMutex =
                new InterProcessMutex(client, "/mutex");
        ;
        for (int i = 0; i < 10; i++) {
            FutureTaskScheduler.add(() -> {

                try {
                    //獲取互斥鎖
                    zkMutex.acquire();

                    for (int j = 0; j < 10; j++) {
//公共的資源變量累加
                        count++;
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("count = " + count);
                    //釋放互斥鎖
                    zkMutex.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }

            });
        }

        Thread.sleep(Integer.MAX_VALUE);
    }

ZooKeeper分布式鎖的優點和缺點

總結一下ZooKeeper分布式鎖:

(1)優點:ZooKeeper分布式鎖(如InterProcessMutex),能有效的解決分布式問題,不可重入問題,使用起來也較為簡單。

(2)缺點:ZooKeeper實現的分布式鎖,性能並不太高。為啥呢?
因為每次在創建鎖和釋放鎖的過程中,都要動態創建、銷毀瞬時節點來實現鎖功能。大家知道,ZK中創建和刪除節點只能通過Leader服務器來執行,然后Leader服務器還需要將數據同不到所有的Follower機器上,這樣頻繁的網絡通信,性能的短板是非常突出的。

總之,在高性能,高並發的場景下,不建議使用ZooKeeper的分布式鎖。而由於ZooKeeper的高可用特性,所以在並發量不是太高的場景,推薦使用ZooKeeper的分布式鎖。

在目前分布式鎖實現方案中,比較成熟、主流的方案有兩種:

(1)基於Redis的分布式鎖

(2)基於ZooKeeper的分布式鎖

兩種鎖,分別適用的場景為:

(1)基於ZooKeeper的分布式鎖,適用於高可靠(高可用)而並發量不是太大的場景;

(2)基於Redis的分布式鎖,適用於並發量很大、性能要求很高的、而可靠性問題可以通過其他方案去彌補的場景。

總之,這里沒有誰好誰壞的問題,而是誰更合適的問題。

最后對本章的內容做個總結:在分布式系統中,ZooKeeper是一個重要的協調工具。本章介紹了分布式命名服務、分布式鎖的原理以及基於ZooKeeper的參考實現。本章的那些實戰案例,建議大家自己去動手掌握,無論是應用實際開始、還是大公司面試,都是非常有用的。另外,主流的分布式協調中間件,也不僅僅只有Zookeeper,還有非常著名的Etcd中間件。但是從學習的層面來說,二者之間的功能設計、核心原理都是差不多的,掌握了Zookeeper,Etcd的上手使用也是很容易的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM