基於zk的分布式鎖:
大概原理:仍然跟基於db或者redis一致,就是注冊節點,然后刪除。不同的是zk因為可以對節點的事件進行監聽,那么在收到節點刪除的事件時,正在阻塞的線程便可以發起新的搶占鎖的請求。當然,真正生產的代碼一般不是這么寫的,因為這樣的情況下如果等待的線程非常多,那么zk向所有注冊點的廣播就要消耗大量的帶寬,也會極大的消耗zk的性能,這顯然是不合理的。所以,基於臨時有序節點的分布式鎖的優勢就非常明顯了,所有節點只關注自己的前節點,消耗少,線程等待時間可控,而且高可用。
---------Talk Is Cheap, Just Show Me The Code-------------------------------------------------
第一版本的zk分布式鎖(做法跟db、redis完全相同):
鎖工具類主要方法:
鎖的獲取:
/**
* 阻塞獲取鎖
*/
public boolean lock(){
if(tryLock()){
return true;
}
waitForLock();
return lock();
}
/**
* 非阻塞獲取鎖
*/
public boolean tryLock(){
try{
zkClient.createPersistent("/lock");
System.out.println("搶到了鎖"+Thread.currentThread().getName());
}catch (ZkNodeExistsException e){
System.out.println("沒搶到鎖"+Thread.currentThread().getName());
return false;
}
return true;
}
關鍵的搶占失敗后等待方法:
/**
* 監聽節點,等待搶鎖
*/
private void waitForLock(){
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("有鎖釋放了,開始搶占鎖"+Thread.currentThread().getName());
if(cdl != null){
cdl.countDown();
}
}
};
zkClient.subscribeDataChanges("/lock",listener);
if(zkClient.exists("/lock")){//節點是否存在
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{//節點可能剛好被刪除
lock();
}
}
鎖的釋放:
/**
* 解鎖
*/
public void unlock(){
System.out.println("刪除鎖"+Thread.currentThread().getName());
zkClient.delete("/lock");
}
測試代碼:
private CountDownLatch cdl = new CountDownLatch(num);
@Test
public void testLock(){
for(int i=0;i<num;i++){
MyThread t = new MyThread("mythread"+i);
t.start();
cdl.countDown();
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//內部類
class MyThread extends Thread{
private String threadName;
public MyThread(){};
public MyThread(String threadName){
this.threadName = threadName;
}
@Override
public void run() {
ZkLock zkLock = new ZkLock();
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(zkLock.lock()){
System.out.println("線程-"+threadName+"獲得鎖,orderId為:"+MyResources.getInstance().getNextId());
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkLock.unlock();
}
}
}
執行結果一部分如下:

已經不用再說什么了,這么多通知已經說明了一切,每刪除一個節點通知所有訂閱節點的代價是高昂的,這是不能忍受的,得改!
第二版本的zk分布式鎖(臨時有序節點版):
/**
* 阻塞鎖
* @return
*/
public boolean lock(){
if(tryLock()){
return true;
}else{
waitForLock();
return lock();
}
}
/**
* 非阻塞鎖
* @return
*/
public boolean tryLock(){
if(currentPath == null){
currentPath = zkClient.createEphemeralSequential(lockPath+"/lock",Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName()+"創建節點:"+currentPath+"=================================================");
List<String> nodeList = zkClient.getChildren(lockPath);
Collections.sort(nodeList);
if(currentPath.equals(lockPath+"/"+nodeList.get(0))){//當前注冊節點為最靠前節點
return true;
}else{
int weizhi = Collections.binarySearch(nodeList,currentPath.substring(8));
beforePath = lockPath +"/"+nodeList.get(weizhi-1);
return false;
}
}else{
return true;
}
}
一樣重要的等待方法:
//鎖等待
private boolean waitForLock() {
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if(cdl!=null){
System.out.println("前面的節點"+beforePath+"被刪除了=====================");
cdl.countDown();
}
}
};
zkClient.subscribeDataChanges(beforePath,listener);
if(zkClient.exists(beforePath)){//前節點存在
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}else{//訂閱的時候,可能前邊線程處理較快,已經刪除了
return true;
}
}
釋放鎖:
/**
* 釋放鎖
* @return
*/
public boolean unlock(){
return zkClient.delete(currentPath);
}
測試代碼:
private int num = 10;
private CountDownLatch cdl = new CountDownLatch(num);
@Test
public void testLock(){
for(int i=0;i<num;i++){
ZkLockTest2.MyThread t = new ZkLockTest2.MyThread("mythread"+i);
t.start();
cdl.countDown();
}
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//內部類
class MyThread extends Thread{
private String threadName;
public MyThread(){};
public MyThread(String threadName){
this.threadName = threadName;
}
@Override
public void run() {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
ZkLock2 zkLock = new ZkLock2();
if(zkLock.lock()){
System.out.println("線程-"+threadName+"獲得鎖,orderId為:"+MyResources.getInstance().getNextId());
zkLock.unlock();
}
}
}
執行結果如下(所有):
結束語:
三篇結束了,對比三種實現方式結論如下:
a、基於db的分布式鎖,相對容易理解,易上手;但依賴數據庫,對數據庫有損耗(這個影響其實比較小,多數時候可以忽略),可能會有短時間死鎖等,其它線程重試的時間不確定,整體時間利用率不好把控(個人認為這個是主要的,其它的可重入,死鎖之類的有其它辦法解決)。
b、基於redis的分布式鎖,性能遠高於db的,依賴redis(這個跟db一般不會掛,但就怕萬一),主備之間可能數據不一致,一樣線程休眠時間不好確定,整體時間利用率不好把控(個人認為這個也是主要原因,解決辦法的話,休眠時間設置為正常單線程處理業務時間的2-3倍(經驗值),所有線程休眠時間在某個時間段內隨機,不要固定時間;其它問題多數可以有辦法解決)。
c、基於zk的分布式鎖,性能略遜redis但一樣遠高於db,得益於zk的高可用,不用擔心掛掉的問題,而且由於臨時有序節點的排序跟節點監聽,解決了休眠問題;缺點實現略復雜。
只是自己學習之作,沒有考慮生產中具體的需求,可能有理解不正確的地方,歡迎批評斧正。
