介紹
為什么使用鎖
鎖的出現是為了解決資源爭用問題,在單進程環境下的資源爭奪可以使用 JDK里的鎖實現.
為什么使用分布式鎖?
顧名思義,分布式鎖是為了分布式環境下的資源爭用問題.
Zookeeper是如何實現分布式鎖的?
基於Zookeeper的分布式鎖都是依賴於zk節點路徑唯一的機制來實現的.
什么意思呢?
就是在zk中,在分布式鎖的場景下 對於同一個路徑,只能有一個客戶端能創建成功,其它的都創建失敗
.(這個不難理解,在平時系統中也沒見過有哪2個文件地址完全相同)
下面就說一下zk分布式鎖2種實現,沒錯 本篇就是干的不能再干的干貨!!!
第一種分布式鎖
具體流程
第一種實現是利用的zk的臨時節點, 在爭搶鎖的時候,所有的客戶端都嘗試創建一個臨時節點(代表鎖住的資源),只有一個客戶端會創建成功,創建成功的客戶端得到鎖,其它的客戶端則監聽(利用zk的watch)該節點的狀態改變並且進入阻塞,節點改變后 zk server 會通知剩下的客戶端,剩下的客戶端停止阻塞並且重新爭搶鎖.
zk中有持久節點和臨時節點,為什么使用臨時節點呢?
如果使用的是持久節點,則這個節點在客戶端下線后,依舊會一直存在,不會自動刪除,導致 其它客戶端一直無法爭搶到鎖
.如果使用的是臨時節點的話, 在客戶端下線后zk會刪除與其相關的臨時節點,這樣其它客戶端就能重新爭搶鎖
.
代碼實現
@Override
public void lock() {
// 如果獲取不到鎖,阻塞等待
if (!tryLock()) {
// 沒獲得鎖,阻塞自己
waitForLock();
// 再次嘗試
lock();
}
}
@Override
public boolean tryLock() { // 不會阻塞
// 創建節點
try {
// 創建臨時節點,zk中的節點(路徑)唯一,只有一個會創建成功
// 為什么使用臨時節點: 客戶端掉線后會自動刪除節點(釋放鎖)
client.createEphemeral(lockPath);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
/**
* 爭搶不到鎖的話,等待鎖的釋放
*/
private void waitForLock() {
CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("收到節點被刪除的消息,停止等待,重新爭奪鎖");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data)
throws Exception {
}
};
// 監聽
client.subscribeDataChanges(lockPath, listener);
// 判斷鎖節點是否存在,存在的話表明有別人
if (this.client.exists(lockPath)) {
try {
// 等待接收到消息后,繼續往下執行
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消監聽消息
client.unsubscribeDataChanges(lockPath, listener);
}
總結一下
實現簡單,但是會有 羊群效應
,節點的刪除都會通知所有的客戶端,並且所有的客戶端會 取消監聽 + 重新一起爭奪鎖 + 爭奪失敗 + 再次開啟監聽
,如此循環,資源耗費多,並且這種耗費是可以避免的,那么如何避免呢?就是下面第二種的 改進版分布式鎖
.
第二種分布式鎖
這一種分布式鎖的實現是利用zk的臨時順序節點,每一個客戶端在爭奪鎖的時候都由zk分配一個順序號(sequence),客戶端則按照這個順序去獲取鎖.
具體流程
lock跟前面的一樣,不過lockPath(鎖住的資源)是一個持久節點,客戶端在該持久節點下面創建臨時順序節點,獲取到順序號后,根據自己是否是最小的順序號來獲取鎖,順序號最小則獲取鎖,序號不為最小則監聽(watch)前一個順序號,當前一個順序號被刪除的時候表明鎖被釋放了,則會通知下一個客戶端.
代碼實現
下面貼出跟第一種實現不同的代碼
/**
* 嘗試加鎖
*
* @return
*/
@Override
public boolean tryLock() {
// 創建臨時順序節點
if (this.currentPath == null) {
// 在lockPath節點下面創建臨時順序節點
currentPath = this.client.createEphemeralSequential(LockPath + "/", "aaa");
}
// 獲得所有的子節點
List<String> children = this.client.getChildren(LockPath);
// 排序list
Collections.sort(children);
// 判斷當前節點是否是最小的,如果是最小的節點,則表明此這個client可以獲取鎖
if (currentPath.equals(LockPath + "/" + children.get(0))) {
return true;
} else {
// 如果不是當前最小的sequence,取到前一個臨時節點
// 1.單獨獲取臨時節點的順序號
// 2.查找這個順序號在children中的下標
// 3.存儲前一個節點的完整路徑
int curIndex = children.indexOf(currentPath.substring(LockPath.length() + 1));
beforePath = LockPath + "/" + children.get(curIndex - 1);
}
return false;
}
private void waitForLock() {
CountDownLatch cdl = new CountDownLatch(1);
// 注冊watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("監聽到前一個節點被刪除了");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 監聽前一個臨時節點
client.subscribeDataChanges(this.beforePath, listener);
// 前一個節點還存在,則阻塞自己
if (this.client.exists(this.beforePath)) {
try {
// 直至前一個節點釋放鎖,才會繼續往下執行
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 醒來后,表明前一個臨時節點已經被刪除,此時客戶端可以獲取鎖 && 取消watcher監聽
client.unsubscribeDataChanges(this.beforePath, listener);
}
總結一下
實現比第一種復雜一點,但是更加的合理,少做了很多不必要的操作,只喚醒了后面一個客戶端.
總結
由zk自身的設計,zk不適合高並發寫,需要在使用zk分布式鎖前先做一定過濾操作,先過濾掉部分請求,再進行鎖爭奪.
分布式鎖當然不止zk的實現,各個實現都有其適用的場景,在分布式系統中,沒有最完美的方案,只有最合適的方案,往往都是取舍問題.
最后
這次的內容到這里就結束了,最后的最后,非常感謝你們能看到這里!!你們的閱讀都是對作者的一次肯定!!!
覺得文章有幫助的看官順手點個贊再走唄(終於暴露了我就是來騙贊的(◒。◒)),你們的每個贊對作者來說都非常重要(異常真實),都是對作者寫作的一次肯定(double)!!!