幾天分析了一下三種分布式鎖的實現,但是沒有利用zookeeper實現一個分布式鎖,因為感覺基於Zookeeper實現分布式鎖還是稍微復雜的,同時也需要使用Watcher機制,所以就單獨搞一篇Zookeeper實現的分布式鎖。
首先,第一種實現。我們可以利用Zookeeper不能重復創建一個節點的特性來實現一個分布式鎖,這看起來和redis實現分布式鎖很像。但是也是有差異的,后面會詳細分析。
主要流程圖如下:


上面的流程很簡單:
- 查看目標Node是否已經創建,已經創建,那么等待鎖。
- 如果未創建,創建一個瞬時Node,表示已經占有鎖。
- 如果創建失敗,那么證明鎖已經被其他線程占有了,那么同樣等待鎖。
- 當釋放鎖,或者當前Session超時的時候,節點被刪除,喚醒之前等待鎖的線程去爭搶鎖。
上面是一個完整的流程,簡單的代碼實現如下:
package com.codertom.params.engine; import com.google.common.base.Strings; import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; /** * Zookeepr實現分布式鎖 */ public class LockTest { private String zkQurom = "localhost:2181"; private String lockNameSpace = "/mylock"; private String nodeString = lockNameSpace + "/test1"; private Lock mainLock; private ZooKeeper zk; public LockTest(){ try { zk = new ZooKeeper(zkQurom, 6000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("Receive event "+watchedEvent); if(Event.KeeperState.SyncConnected == watchedEvent.getState()) System.out.println("connection is established..."); } }); } catch (IOException e) { e.printStackTrace(); } } private void ensureRootPath() throws InterruptedException { try { if (zk.exists(lockNameSpace,true)==null){ zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (KeeperException e) { e.printStackTrace(); } } private void watchNode(String nodeString, final Thread thread) throws InterruptedException { try { zk.exists(nodeString, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println( "==" + watchedEvent.toString()); if(watchedEvent.getType() == Event.EventType.NodeDeleted){ System.out.println("Threre is a Thread released Lock=============="); thread.interrupt(); } try { zk.exists(nodeString,new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println( "==" + watchedEvent.toString()); if(watchedEvent.getType() == Event.EventType.NodeDeleted){ System.out.println("Threre is a Thread released Lock=============="); thread.interrupt(); } try { zk.exists(nodeString,true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (KeeperException e) { e.printStackTrace(); } } /** * 獲取鎖 * @return * @throws InterruptedException */ public boolean lock() throws InterruptedException { String path = null; ensureRootPath(); watchNode(nodeString,Thread.currentThread()); while (true) { try { path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException e) { System.out.println(Thread.currentThread().getName() + " getting Lock but can not get"); try { Thread.sleep(5000); }catch (InterruptedException ex){ System.out.println("thread is notify"); } } if (!Strings.nullToEmpty(path).trim().isEmpty()) { System.out.println(Thread.currentThread().getName() + " get Lock..."); return true; } } } /** * 釋放鎖 */ public void unlock(){ try { zk.delete(nodeString,-1); System.out.println("Thread.currentThread().getName() + release Lock..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public static void main(String args[]) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0;i<4;i++){ service.execute(()-> { LockTest test = new LockTest(); try { test.lock(); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } test.unlock(); }); } service.shutdown(); } }
代碼比較糙,但是大致的實現思路和上述一致,這里需要注意:
- 因為使用的是原生的Zookeeper API實現,Watch需要重復的設置,所以代碼復雜的些。
- 喚醒直接用的Thread.interupt這樣其實控制流程其實是不好的。
其實上面的實現有優點也有缺點:
優點:
實現比較簡單,有通知機制,能提供較快的響應,有點類似reentrantlock的思想,對於節點刪除失敗的場景由Session超時保證節點能夠刪除掉。
缺點:
重量級,同時在大量鎖的情況下會有“驚群”的問題。
“驚群”就是在一個節點刪除的時候,大量對這個節點的刪除動作有訂閱Watcher的線程會進行回調,這對Zk集群是十分不利的。所以需要避免這種現象的發生。
解決“驚群”:
為了解決“驚群“問題,我們需要放棄訂閱一個節點的策略,那么怎么做呢?
- 我們將鎖抽象成目錄,多個線程在此目錄下創建瞬時的順序節點,因為Zk會為我們保證節點的順序性,所以可以利用節點的順序進行鎖的判斷。
- 首先創建順序節點,然后獲取當前目錄下最小的節點,判斷最小節點是不是當前節點,如果是那么獲取鎖成功,如果不是那么獲取鎖失敗。
- 獲取鎖失敗的節點獲取當前節點上一個順序節點,對此節點注冊監聽,當節點刪除的時候通知當前節點。
- 當unlock的時候刪除節點之后會通知下一個節點。
上面的實現和reentrantlock的公平鎖實現還是比較類似的,下面是簡單的實現:
package com.codertom.params.engine; import com.google.common.base.Strings; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by zhiming on 2017-02-05. */ public class FairLockTest { private String zkQurom = "localhost:2181"; private String lockName = "/mylock"; private String lockZnode = null; private ZooKeeper zk; public FairLockTest(){ try { zk = new ZooKeeper(zkQurom, 6000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("Receive event "+watchedEvent); if(Event.KeeperState.SyncConnected == watchedEvent.getState()) System.out.println("connection is established..."); } }); } catch (IOException e) { e.printStackTrace(); } } private void ensureRootPath(){ try { if (zk.exists(lockName,true)==null){ zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } /** * 獲取鎖 * @return * @throws InterruptedException */ public void lock(){ String path = null; ensureRootPath(); try { path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); lockZnode = path; List<String> minPath = zk.getChildren(lockName,false); System.out.println(minPath); Collections.sort(minPath); System.out.println(minPath.get(0)+" and path "+path); if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) { System.out.println(Thread.currentThread().getName() + " get Lock..."); return; } String watchNode = null; for (int i=minPath.size()-1;i>=0;i--){ if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){ watchNode = minPath.get(i); break; } } if (watchNode!=null){ final String watchNodeTmp = watchNode; final Thread thread = Thread.currentThread(); Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getType() == Event.EventType.NodeDeleted){ thread.interrupt(); } try { zk.exists(lockName + "/" + watchNodeTmp,true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode); } } try { Thread.sleep(1000000000); }catch (InterruptedException ex){ System.out.println(Thread.currentThread().getName() + " notify"); System.out.println(Thread.currentThread().getName() + " get Lock..."); return; } } catch (Exception e) { e.printStackTrace(); } } /** * 釋放鎖 */ public void unlock(){ try { System.out.println(Thread.currentThread().getName() + "release Lock..."); zk.delete(lockZnode,-1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public static void main(String args[]) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0;i<4;i++){ service.execute(()-> { FairLockTest test = new FairLockTest(); try { test.lock(); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } test.unlock(); }); } service.shutdown(); } }
同樣上面的程序也有幾點需要注意:
- Zookeeper的API沒有提供直接的獲取上一個節點或者最小節點的API需要我們自己實現。
- 使用了interrupt做線程的喚醒,這樣不科學,因為不想將JVM的lock引進來所以沒有用countdownlatch來做流程控制。
- Watch也是要重新設置的,這里使用了Watch的復用,所以代碼簡單些。
其實上面的實現還是很復雜的,因為你需要反復的去關注Watcher,實現一個Demo可以,做一個生產環境可用的Lock並不容易。因為你的代碼bug在生產環境上會引起很嚴重的bug。
其實對於Zookeeper的一些常用功能是有一些成熟的包實現的,像Curator。Curator的確是足夠牛逼,不僅封裝了Zookeeper的常用API,也包裝了很多常用Case的實現。但是它的編程風格其實還是吧比較難以接受的。
可以用Curator輕易的實現一個分布式鎖:
InterProcessMutex lock = new InterProcessMutex(client, lockPath); if ( lock.acquire(maxWait, waitUnit) ) { try { // do some work inside of the critical section here } finally { lock.release(); } }
是的就這么簡單,一個直接拿過來可用的輪子。
基於Zookeeper的分布式鎖就說完了。基於Zookeeper實現分布式鎖,其實是不常用的。雖然它實現鎖十分優雅,但編程復雜,同時還要單獨維護一套Zookeeper集群,頻繁的Watch對Zookeeper集群的壓力還是蠻大的,如果不是原有的項目以來Zookeeper,同時鎖的量級比較小的話,還是不用為妙。
作者:一只小哈
鏈接:https://www.jianshu.com/p/5d12a01018e1
來源:簡書