單體應用鎖
在單體的應用開發場景中,涉及並發同步的時候,大家往往采用synchronized或者Lock的方式來解決多線程間的同步問題。但在分布式集群工作的開發場景中,那么就需要一種更加高級的鎖機制,來處理種跨JVM進程之間的數據同步問題,這就是分布式鎖。
公平鎖和可重入鎖的原理
最經典的分布式鎖是可重入的公平鎖。什么是可重入的公平鎖呢?直接講解的概念和原理,會比較抽象難懂,還是從具體的實例入手吧!這里用一個簡單的故事來類比,估計就簡單多了。
故事發生在一個沒有自來水的古代,在一個村子有一口井,水質非常的好,村民們都搶着取井里的水。井就那么一口,村里的人很多,村民為爭搶取水打架斗毆,甚至頭破血流。
問題總是要解決,於是村長絞盡腦汁,最終想出了一個憑號取水的方案。井邊安排一個看井人,維護取水的秩序。取水秩序很簡單:
(1)取水之前,先取號;
(2)號排在前面的,就可以先取水;
(3)先到的排在前面,那些后到的,一個一個挨着,在井邊排成一隊。
取水示意圖,如圖10-3所示。
圖10-3 排隊取水示意圖
這種排隊取水模型,就是一種鎖的模型。排在最前面的號,擁有取水權,就是一種典型的獨占鎖。另外,先到先得,號排在前面的人先取到水,取水之后就輪到下一個號取水,挺公平的,說明它是一種公平鎖。
什么是可重入鎖呢?
假定,取水時以家庭為單位,家庭的某人拿到號,其他的家庭成員過來打水,這時候不用再取號,如圖10-4所示。
圖10-4 同一家庭的人不需要重復排隊
圖10-4中,排在1號的家庭,老公取號,假設其老婆來了,直接排第一個,正所謂妻憑夫貴。再看上圖的2號,父親正在打水,假設其兒子和女兒也到井邊了,直接排第二個,所謂子憑父貴。總之,如果取水時以家庭為單位,則同一個家庭,可以直接復用排號,不用從后面排起重新取號。
以上這個故事模型中,取號一次,可以用來多次取水,其原理為可重入鎖的模型。在重入鎖模型中,一把獨占鎖,可以被多次鎖定,這就叫做可重入鎖。
Zookeeper分布式鎖的原理
理解了經典的公平可重入鎖的原理后,再來看在分布式場景下的公平可重入鎖的原理。通過前面的分析,基本可以判定:ZooKeeper
的臨時順序節點,天生就有一副實現分布式鎖的胚子。為什么呢?
(一) ZooKeeper的每一個節點,都是一個天然的順序發號器。
在每一個節點下面創建臨時順序節點(EPHEMERAL_SEQUENTIAL)類型,新的子節點后面,會加上一個次序編號,而這個生成的次序編號,是上一個生成的次序編號加一。
例如,有一個用於發號的節點“/test/lock”為父親節點,可以在這個父節點下面創建相同前綴的臨時順序子節點,假定相同的前綴為“/test/lock/seq-”。第一個創建的子節點基本上應該為/test/lock/seq-0000000000,下一個節點則為/test/lock/seq-0000000001,依次類推,如果10-5所示。
圖10-5 Zookeeper臨時順序節點的天然的發號器作用
(二) ZooKeeper節點的遞增有序性,可以確保鎖的公平
一個ZooKeeper分布式鎖,首先需要創建一個父節點,盡量是持久節點(PERSISTENT類型),然后每個要獲得鎖的線程,都在這個節點下創建個臨時順序節點。由於ZK節點,是按照創建的次序,依次遞增的。
為了確保公平,可以簡單的規定:編號最小的那個節點,表示獲得了鎖。所以,每個線程在嘗試占用鎖之前,首先判斷自己是排號是不是當前最小,如果是,則獲取鎖。
(三)ZooKeeper的節點監聽機制,可以保障占有鎖的傳遞有序而且高效
每個線程搶占鎖之前,先嘗試創建自己的ZNode。同樣,釋放鎖的時候,就需要刪除創建的Znode。創建成功后,如果不是排號最小的節點,就處於等待通知的狀態。等誰的通知呢?不需要其他人,只需要等前一個Znode
的通知就可以了。前一個Znode刪除的時候,會觸發Znode事件,當前節點能監聽到刪除事件,就是輪到了自己占有鎖的時候。第一個通知第二個、第二個通知第三個,擊鼓傳花似的依次向后。
ZooKeeper的節點監聽機制,能夠非常完美地實現這種擊鼓傳花似的信息傳遞。具體的方法是,每一個等通知的Znode節點,只需要監聽(linsten)或者監視(watch)排號在自己前面那個,而且緊挨在自己前面的那個節點,就能收到其刪除事件了。
只要上一個節點被刪除了,就進行再一次判斷,看看自己是不是序號最小的那個節點,如果是,自己就獲得鎖。
另外,ZooKeeper的內部優越的機制,能保證由於網絡異常或者其他原因,集群中占用鎖的客戶端失聯時,鎖能夠被有效釋放。一旦占用Znode鎖的客戶端與ZooKeeper集群服務器失去聯系,這個臨時Znode也將自動刪除。排在它后面的那個節點,也能收到刪除事件,從而獲得鎖。正是由於這個原因,在創建取號節點的時候,盡量創建臨時znode
節點,
(四)ZooKeeper的節點監聽機制,能避免羊群效應
ZooKeeper這種首尾相接,后面監聽前面的方式,可以避免羊群效應。所謂羊群效應就是一個節點掛掉,所有節點都去監聽,然后做出反應,這樣會給服務器帶來巨大壓力,所以有了臨時順序節點,當一個節點掛掉,只有它后面的那一個節點才做出反應。
圖解: 分布式鎖的搶占過程
接下來我們一起來看看,多客戶端獲取及釋放zk分布式鎖的整個流程及背后的原理。
首先大家看看下面的圖,如果現在有兩個客戶端一起要爭搶zk上的一把分布式鎖,會是個什么場景?
如果大家對zk還不太了解的話,建議先自行百度一下,簡單了解點基本概念,比如zk有哪些節點類型等等。
參見上圖。zk里有一把鎖,這個鎖就是zk上的一個節點。然后呢,兩個客戶端都要來獲取這個鎖,具體是怎么來獲取呢?
咱們就假設客戶端A搶先一步,對zk發起了加分布式鎖的請求,這個加鎖請求是用到了zk中的一個特殊的概念,叫做“臨時順序節點”。
簡單來說,就是直接在"my_lock"這個鎖節點下,創建一個順序節點,這個順序節點有zk內部自行維護的一個節點序號。
客戶端A發起一個加鎖請求
比如說,第一個客戶端來搞一個順序節點,zk內部會給起個名字叫做:xxx-000001。然后第二個客戶端來搞一個順序節點,zk可能會起個名字叫做:xxx-000002。大家注意一下,最后一個數字都是依次遞增的,從1開始逐次遞增。zk會維護這個順序。
所以這個時候,假如說客戶端A先發起請求,就會搞出來一個順序節點,大家看下面的圖,Curator框架大概會弄成如下的樣子:
大家看,客戶端A發起一個加鎖請求,先會在你要加鎖的node下搞一個臨時順序節點,這一大坨長長的名字都是Curator框架自己生成出來的。
然后,那個最后一個數字是"1"。大家注意一下,因為客戶端A是第一個發起請求的,所以給他搞出來的順序節點的序號是"1"。
接着客戶端A創建完一個順序節點。還沒完,他會查一下"my_lock"這個鎖節點下的所有子節點,並且這些子節點是按照序號排序的,這個時候他大概會拿到這么一個集合:
接着客戶端A會走一個關鍵性的判斷,就是說:唉!兄弟,這個集合里,我創建的那個順序節點,是不是排在第一個啊?
如果是的話,那我就可以加鎖了啊!因為明明我就是第一個來創建順序節點的人,所以我就是第一個嘗試加分布式鎖的人啊!
bingo!加鎖成功!大家看下面的圖,再來直觀的感受一下整個過程。
客戶端B過來排隊
接着假如說,客戶端A都加完鎖了,客戶端B過來想要加鎖了,這個時候他會干一樣的事兒:先是在"my_lock"這個鎖節點下創建一個臨時順序節點,此時名字會變成類似於:
大家看看下面的圖:
客戶端B因為是第二個來創建順序節點的,所以zk內部會維護序號為"2"。
接着客戶端B會走加鎖判斷邏輯,查詢"my_lock"鎖節點下的所有子節點,按序號順序排列,此時他看到的類似於:
同時檢查自己創建的順序節點,是不是集合中的第一個?
明顯不是啊,此時第一個是客戶端A創建的那個順序節點,序號為"01"的那個。所以加鎖失敗!
客戶端B開啟監聽客戶端A
加鎖失敗了以后,客戶端B就會通過ZK的API對他的順序節點的上一個順序節點加一個監聽器。zk天然就可以實現對某個節點的監聽。
如果大家還不知道zk的基本用法,可以百度查閱,非常的簡單。客戶端B的順序節點是:
他的上一個順序節點,不就是下面這個嗎?
即客戶端A創建的那個順序節點!
所以,客戶端B會對:
這個節點加一個監聽器,監聽這個節點是否被刪除等變化!大家看下面的圖。
接着,客戶端A加鎖之后,可能處理了一些代碼邏輯,然后就會釋放鎖。那么,釋放鎖是個什么過程呢?
其實很簡單,就是把自己在zk里創建的那個順序節點,也就是:
這個節點給刪除。
刪除了那個節點之后,zk會負責通知監聽這個節點的監聽器,也就是客戶端B之前加的那個監聽器,說:兄弟,你監聽的那個節點被刪除了,有人釋放了鎖。
此時客戶端B的監聽器感知到了上一個順序節點被刪除,也就是排在他之前的某個客戶端釋放了鎖
客戶端B搶鎖成功
此時,就會通知客戶端B重新嘗試去獲取鎖,也就是獲取"my_lock"節點下的子節點集合,此時為:
集合里此時只有客戶端B創建的唯一的一個順序節點了!
然后呢,客戶端B判斷自己居然是集合中的第一個順序節點,bingo!可以加鎖了!直接完成加鎖,運行后續的業務代碼即可,運行完了之后再次釋放鎖。
分布式鎖的基本實現
接下來就是基於ZooKeeper,實現一下分布式鎖。首先,定義了一個鎖的接口Lock,很簡單,僅僅兩個抽象方法:一個加鎖方法,一個解鎖方法。Lock接口的代碼如下:
package com.crazymakercircle.zk.distributedLock;
/**
* create by 尼恩 @ 瘋狂創客圈
**/
public interface Lock {
/**
* 加鎖方法
*
* @return 是否成功加鎖
*/
boolean lock() throws Exception;
/**
* 解鎖方法
*
* @return 是否成功解鎖
*/
boolean unlock();
}
使用ZooKeeper實現分布式鎖的算法,有以下幾個要點:
(1)一把分布式鎖通常使用一個Znode節點表示;如果鎖對應的Znode節點不存在,首先創建Znode節點。這里假設為“/test/lock”,代表了一把需要創建的分布式鎖。
(2)搶占鎖的所有客戶端,使用鎖的Znode節點的子節點列表來表示;如果某個客戶端需要占用鎖,則在“/test/lock”下創建一個臨時有序的子節點。
這里,所有臨時有序子節點,盡量共用一個有意義的子節點前綴。
比如,如果子節點的前綴為“/test/lock/seq-”,則第一次搶鎖對應的子節點為“/test/lock/seq-000000000”,第二次搶鎖對應的子節點為“/test/lock/seq-000000001”,以此類推。
再比如,如果子節點前綴為“/test/lock/”,則第一次搶鎖對應的子節點為“/test/lock/000000000”,第二次搶鎖對應的子節點為“/test/lock/000000001”,以此類推,也非常直觀。
(3)如果判定客戶端是否占有鎖呢?
很簡單,客戶端創建子節點后,需要進行判斷:自己創建的子節點,是否為當前子節點列表中序號最小的子節點。如果是,則認為加鎖成功;如果不是,則監聽前一個Znode子節點變更消息,等待前一個節點釋放鎖。
(4)一旦隊列中的后面的節點,獲得前一個子節點變更通知,則開始進行判斷,判斷自己是否為當前子節點列表中序號最小的子節點,如果是,則認為加鎖成功;如果不是,則持續監聽,一直到獲得鎖。
(5)獲取鎖后,開始處理業務流程。完成業務流程后,刪除自己的對應的子節點,完成釋放鎖的工作,以方面后繼節點能捕獲到節點變更通知,獲得分布式鎖。
實戰:加鎖的實現
Lock接口中加鎖的方法是lock()。lock()方法的大致流程是:首先嘗試着去加鎖,如果加鎖失敗就去等待,然后再重復。
1.lock()方法的實現代碼
lock()方法加鎖的實現代碼,大致如下
package com.crazymakercircle.zk.distributedLock;
import com.crazymakercircle.zk.ZKclient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* create by 尼恩 @ 瘋狂創客圈
**/
@Slf4j
public class ZkLock implements Lock {
//ZkLock的節點鏈接
private static final String ZK_PATH = "/test/lock";
private static final String LOCK_PREFIX = ZK_PATH + "/";
private static final long WAIT_TIME = 1000;
//Zk客戶端
CuratorFramework client = null;
private String locked_short_path = null;
private String locked_path = null;
private String prior_path = null;
final AtomicInteger lockCount = new AtomicInteger(0);
private Thread thread;
public ZkLock() {
ZKclient.instance.init();
synchronized (ZKclient.instance) {
if (!ZKclient.instance.isNodeExist(ZK_PATH)) {
ZKclient.instance.createNode(ZK_PATH, null);
}
}
client = ZKclient.instance.getClient();
}
@Override
public boolean lock() {
//可重入,確保同一線程,可以重復加鎖
synchronized (this) {
if (lockCount.get() == 0) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.incrementAndGet();
return true;
}
}
try {
boolean locked = false;
//首先嘗試着去加鎖
locked = tryLock();
if (locked) {
return true;
}
//如果加鎖失敗就去等待
while (!locked) {
await();
//獲取等待的子節點列表
List<String> waiters = getWaiters();
//判斷,是否加鎖成功
if (checkLocked(waiters)) {
locked = true;
}
}
return true;
} catch (Exception e) {
e.printStackTrace();
unlock();
}
return false;
}
//...省略其他的方法
}
2.tryLock()嘗試加鎖
嘗試加鎖的tryLock方法是關鍵,做了兩件重要的事情:
(1)創建臨時順序節點,並且保存自己的節點路徑
(2)判斷是否是第一個,如果是第一個,則加鎖成功。如果不是,就找到前一個Znode節點,並且保存其路徑到prior_path。
嘗試加鎖的tryLock方法,其實現代碼如下:
/**
* 嘗試加鎖
* @return 是否加鎖成功
* @throws Exception 異常
*/
private boolean tryLock() throws Exception {
//創建臨時Znode
locked_path = ZKclient.instance
.createEphemeralSeqNode(LOCK_PREFIX);
//然后獲取所有節點
List<String> waiters = getWaiters();
if (null == locked_path) {
throw new Exception("zk error");
}
//取得加鎖的排隊編號
locked_short_path = getShortPath(locked_path);
//獲取等待的子節點列表,判斷自己是否第一個
if (checkLocked(waiters)) {
return true;
}
// 判斷自己排第幾個
int index = Collections.binarySearch(waiters, locked_short_path);
if (index < 0) { // 網絡抖動,獲取到的子節點列表里可能已經沒有自己了
throw new Exception("節點沒有找到: " + locked_short_path);
}
//如果自己沒有獲得鎖,則要監聽前一個節點
prior_path = ZK_PATH + "/" + waiters.get(index - 1);
return false;
}
private String getShortPath(String locked_path) {
int index = locked_path.lastIndexOf(ZK_PATH + "/");
if (index >= 0) {
index += ZK_PATH.length() + 1;
return index <= locked_path.length() ? locked_path.substring(index) : "";
}
return null;
}
創建臨時順序節點后,其完整路徑存放在locked_path成員中;另外還截取了一個后綴路徑,放在
locked_short_path成員中,后綴路徑是一個短路徑,只有完整路徑的最后一層。為什么要單獨保存短路徑呢?
因為,在獲取的遠程子節點列表中的其他路徑返回結果時,返回的都是短路徑,都只有最后一層路徑。所以為了方便后續進行比較,也把自己的短路徑保存下來。
創建了自己的臨時節點后,調用checkLocked方法,判斷是否是鎖定成功。如果鎖定成功,則返回true;如果自己沒有獲得鎖,則要監聽前一個節點,此時需要找出前一個節點的路徑,並保存在
prior_path
成員中,供后面的await()等待方法去監聽使用。在進入await()等待方法的介紹前,先說下checkLocked
鎖定判斷方法。
3.checkLocked()檢查是否持有鎖
在checkLocked()方法中,判斷是否可以持有鎖。判斷規則很簡單:當前創建的節點,是否在上一步獲取到的子節點列表的第一個位置:
(1)如果是,說明可以持有鎖,返回true,表示加鎖成功;
(2)如果不是,說明有其他線程早已先持有了鎖,返回false。
checkLocked()方法的代碼如下:
private boolean checkLocked(List<String> waiters) {
//節點按照編號,升序排列
Collections.sort(waiters);
// 如果是第一個,代表自己已經獲得了鎖
if (locked_short_path.equals(waiters.get(0))) {
log.info("成功的獲取分布式鎖,節點為{}", locked_short_path);
return true;
}
return false;
}
checkLocked方法比較簡單,將參與排隊的所有子節點列表,從小到大根據節點名稱進行排序。排序主要依靠節點的編號,也就是后Znode路徑的10位數字,因為前綴都是一樣的。排序之后,做判斷,如果自己的locked_short_path編號位置排在第一個,如果是,則代表自己已經獲得了鎖。如果不是,則會返回false。
如果checkLocked()為false,外層的調用方法,一般來說會執行await()等待方法,執行奪鎖失敗以后的等待邏輯。
4.await()監聽前一個節點釋放鎖
await()也很簡單,就是監聽前一個ZNode節點(prior_path成員)的刪除事件,代碼如下:
private void await() throws Exception {
if (null == prior_path) {
throw new Exception("prior_path error");
}
final CountDownLatch latch = new CountDownLatch(1);
//訂閱比自己次小順序節點的刪除事件
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("監聽到的變化 watchedEvent = " + watchedEvent);
log.info("[WatchedEvent]節點刪除");
latch.countDown();
}
};
client.getData().usingWatcher(w).forPath(prior_path);
/*
//訂閱比自己次小順序節點的刪除事件
TreeCache treeCache = new TreeCache(client, prior_path);
TreeCacheListener l = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client,
TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data != null) {
switch (event.getType()) {
case NODE_REMOVED:
log.debug("[TreeCache]節點刪除, path={}, data={}",
data.getPath(), data.getData());
latch.countDown();
break;
default:
break;
}
}
}
};
treeCache.getListenable().addListener(l);
treeCache.start();*/
latch.await(WAIT_TIME, TimeUnit.SECONDS);
}
首先添加一個Watcher監聽,而監聽的節點,正是前面所保存在prior_path成員的前一個節點的路徑。這里,僅僅去監聽自己前一個節點的變動,而不是其他節點的變動,提升效率。完成監聽之后,調用latch.await(),線程進入等待狀態,一直到線程被監聽回調代碼中的latch.countDown() 所喚醒,或者等待超時。
說 明
以上代碼用到的CountDownLatch的核心原理和實戰知識,《Netty Zookeeper Redis 高並發實戰》姊妹篇 《Java高並發核心編程(卷2)》。
上面的代碼中,監聽前一個節點的刪除,可以使用兩種監聽方式:
(1)Watcher 訂閱;
(2)TreeCache 訂閱。
兩種方式的效果,都差不多。但是這里的刪除事件,只需要監聽一次即可,不需要反復監聽,所以使用的是Watcher
一次性訂閱。而TreeCache 訂閱的代碼在源碼工程中已經被注釋,僅僅供大家參考。
一旦前一個節點prior_path節點被刪除,那么就將線程從等待狀態喚醒,重新一輪的鎖的爭奪,直到獲取鎖,並且完成業務處理。
至此,分布式Lock加鎖的算法,還差一點就介紹完成。這一點,就是實現鎖的可重入。
5.可重入的實現代碼
什么是可重入呢?只需要保障同一個線程進入加鎖的代碼,可以重復加鎖成功即可。
修改前面的lock方法,在前面加上可重入的判斷邏輯。代碼如下:
@Override
public boolean lock() {
//可重入的判斷
synchronized (this) {
if (lockCount.get() == 0) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.incrementAndGet();
return true;
}
}
//....
}
為了變成可重入,在代碼中增加了一個加鎖的計數器lockCount
,計算重復加鎖的次數。如果是同一個線程加鎖,只需要增加次數,直接返回,表示加鎖成功。
至此,lock()方法已經介紹完成,接下來,就是去釋放鎖
實戰:釋放鎖的實現
Lock接口中的unLock()方法,表示釋放鎖,釋放鎖主要有兩個工作:
(1)減少重入鎖的計數,如果最終的值不是0,直接返回,表示成功的釋放了一次;
(2)如果計數器為0,移除Watchers監聽器,並且刪除創建的Znode臨時節點。
unLock()方法的代碼如下:
/**
* 釋放鎖
*
* @return 是否成功釋放鎖
*/
@Override
public boolean unlock() {
//只有加鎖的線程,能夠解鎖
if (!thread.equals(Thread.currentThread())) {
return false;
}
//減少可重入的計數
int newLockCount = lockCount.decrementAndGet();
//計數不能小於0
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path);
}
//如果計數不為0,直接返回
if (newLockCount != 0) {
return true;
}
//刪除臨時節點
try {
if (ZKclient.instance.isNodeExist(locked_path)) {
client.delete().forPath(locked_path);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
這里,為了盡量保證線程安全,可重入計數器的類型,使用的不是int類型,而是Java並發包中的原子類型——AtomicInteger。
實戰:分布式鎖的使用
寫一個用例,測試一下ZLock的使用,代碼如下:
@Test
public void testLock() throws InterruptedException {
for (int i = 0; i < 10; i++) {
FutureTaskScheduler.add(() -> {
//創建鎖
ZkLock lock = new ZkLock();
lock.lock();
//每條線程,執行10次累加
for (int j = 0; j < 10; j++) {
//公共的資源變量累加
count++;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("count = " + count);
//釋放鎖
lock.unlock();
});
}
Thread.sleep(Integer.MAX_VALUE);
}
以上代碼是10個並發任務,每個任務累加10次,執行以上用例,會發現結果會是預期的和100,如果不使用鎖,結果可能就不是100,因為上面的count是一個普通的變量,不是線程安全的。
說 明
有關線程安全的核心原理和實戰知識,請參閱本書的下一卷《Java高並發核心編程(卷2)》。
原理上一個Zlock實例代表一把鎖,並需要占用一個Znode永久節點,如果需要很多分布式鎖,則也需要很多的不同的Znode節點。以上代碼,如果要擴展為多個分布式鎖的版本,還需要進行簡單改造,這種改造留給各位自己去練習和實現吧。
curator的InterProcessMutex 可重入鎖
分布式鎖Zlock自主實現主要的價值:學習一下分布式鎖的原理和基礎開發,僅此而已。實際的開發中,如果需要使用到分布式鎖,不建議去自己造輪子,建議直接使用Curator客戶端中的各種官方實現的分布式鎖,比如其中的InterProcessMutex
可重入鎖。
這里提供一個簡單的InterProcessMutex 可重入鎖的使用實例,代碼如下:
@Test
public void testzkMutex() throws InterruptedException {
CuratorFramework client = ZKclient.instance.getClient();
final InterProcessMutex zkMutex =
new InterProcessMutex(client, "/mutex");
;
for (int i = 0; i < 10; i++) {
FutureTaskScheduler.add(() -> {
try {
//獲取互斥鎖
zkMutex.acquire();
for (int j = 0; j < 10; j++) {
//公共的資源變量累加
count++;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("count = " + count);
//釋放互斥鎖
zkMutex.release();
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(Integer.MAX_VALUE);
}
ZooKeeper分布式鎖的優點和缺點
總結一下ZooKeeper分布式鎖:
(1)優點:ZooKeeper分布式鎖(如InterProcessMutex),能有效的解決分布式問題,不可重入問題,使用起來也較為簡單。
(2)缺點:ZooKeeper實現的分布式鎖,性能並不太高。為啥呢?
因為每次在創建鎖和釋放鎖的過程中,都要動態創建、銷毀瞬時節點來實現鎖功能。大家知道,ZK中創建和刪除節點只能通過Leader服務器來執行,然后Leader服務器還需要將數據同不到所有的Follower機器上,這樣頻繁的網絡通信,性能的短板是非常突出的。
總之,在高性能,高並發的場景下,不建議使用ZooKeeper的分布式鎖。而由於ZooKeeper的高可用特性,所以在並發量不是太高的場景,推薦使用ZooKeeper的分布式鎖。
在目前分布式鎖實現方案中,比較成熟、主流的方案有兩種:
(1)基於Redis的分布式鎖
(2)基於ZooKeeper的分布式鎖
兩種鎖,分別適用的場景為:
(1)基於ZooKeeper的分布式鎖,適用於高可靠(高可用)而並發量不是太大的場景;
(2)基於Redis的分布式鎖,適用於並發量很大、性能要求很高的、而可靠性問題可以通過其他方案去彌補的場景。
總之,這里沒有誰好誰壞的問題,而是誰更合適的問題。
最后對本章的內容做個總結:在分布式系統中,ZooKeeper是一個重要的協調工具。本章介紹了分布式命名服務、分布式鎖的原理以及基於ZooKeeper的參考實現。本章的那些實戰案例,建議大家自己去動手掌握,無論是應用實際開始、還是大公司面試,都是非常有用的。另外,主流的分布式協調中間件,也不僅僅只有Zookeeper,還有非常著名的Etcd中間件。但是從學習的層面來說,二者之間的功能設計、核心原理都是差不多的,掌握了Zookeeper,Etcd的上手使用也是很容易的。