zk實現分布式鎖縱觀網絡各種各樣的帖子層出不窮,筆者查閱很多資料發現一個問題,有些文章只寫原理並沒有具體實現,有些文章雖然寫了實現但是並不全面
借這個周末給大家做一個總結,代碼拿來就可以用並且每一種實現都經過了測試沒有bug。下面我們先從最簡單的實現開始介紹:
- 簡單的實現
package com.srr.lock; /** * @Description 分布式鎖的接口 */ abstract public interface DistributedLock { /** * 獲取鎖 */ boolean lock(); /** * 解鎖 */ void unlock(); abstract boolean readLock(); abstract boolean writeLock(); } package com.srr.lock; /** * 簡單的zk分布式做實現策略 * 性能比較低會導致羊群效應 */ public abstract class SimplerZKLockStrategy implements DistributedLock{ /** * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現 * @throws Exception */ @Override public boolean lock() { //獲取鎖成功 if (tryLock()){ System.out.println(Thread.currentThread().getName()+"獲取鎖成功"); return true; }else{ //獲取鎖失敗 //阻塞一直等待 waitLock(); //遞歸,再次獲取鎖 return lock(); } } /** * 嘗試獲取鎖,子類實現 */ protected abstract boolean tryLock() ; /** * 等待獲取鎖,子類實現 */ protected abstract void waitLock(); /** * 解鎖:刪除key */ @Override public abstract void unlock(); } package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.concurrent.CountDownLatch; /** * 分布式鎖簡單實現 */ public class SimpleZKLock extends SimplerZKLockStrategy{ private static final String PATH = "/lowPerformance_zklock"; private CountDownLatch countDownLatch = null; //zk地址和端口 public static final String ZK_ADDR = "192.168.32.129:2181"; //創建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR); @Override protected boolean tryLock() { //如果不存在這個節點,則創建持久節點 try{ zkClient.createEphemeral(PATH, "lock"); return true; }catch (Exception e){ return false; } } @Override protected void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch){ countDownLatch.countDown(); } System.out.println("listen lock unlock"); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //監聽前一個節點的變化 zkClient.subscribeDataChanges(PATH, lIZkDataListener); if (zkClient.exists(PATH)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(PATH, lIZkDataListener); } @Override public void unlock() { if (null != zkClient) { System.out.println("lock unclock"); zkClient.delete(PATH); } } @Override public boolean readLock() { return true; } @Override public boolean writeLock() { return true; } } package com.srr.lock; import redis.clients.jedis.Jedis; import java.util.concurrent.CountDownLatch; /** * 測試場景 * count從1加到4 * 使用簡單的分布式鎖在分布式環境下保證結果正確 */ public class T { volatile int count = 1; public void inc(){ for(int i = 0;i<3;i++){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } count++; System.out.println("count == "+count); } } public int getCount(){ return count; } public static void main(String[] args) throws InterruptedException { final T t = new T(); final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(5); for(int i = 0;i<5;i++){ new Thread(new Runnable() { @Override public void run() { DistributedLock distributedLock = new SimpleZKLock(); if(lock.lock(distributedLock)){ t.inc(); lock.unlock(distributedLock); countDownLatch.countDown(); } System.out.println("count == "+t.getCount()); } }).start(); } countDownLatch.await(); } }
運行結果:
這種方式實現雖然簡單,但是會引發羊群效應,因為每個等待鎖的客戶端都需要注冊監聽lock節點的刪除事件,如果客戶端並發請求很多,那么這將會非常消耗zookeeper集群
的資源,嚴重的化則會導致zookeeper集群宕機也不是沒有可能。
- 高性能實現,解決羊群效應問題
package com.srr.lock; /** * @Description 分布式鎖的接口 */ abstract public interface DistributedLock { /** * 獲取鎖 */ boolean lock(); /** * 解鎖 */ void unlock(); abstract boolean readLock(); abstract boolean writeLock(); } package com.srr.lock; public abstract class BlockingZKLockStrategy implements DistributedLock{ /** * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現 * @throws Exception */ @Override public final boolean lock() { //獲取鎖成功 if (tryLock()){ System.out.println(Thread.currentThread().getName()+"獲取鎖成功"); return true; }else{ //獲取鎖失敗 //阻塞一直等待 waitLock(); //遞歸,再次獲取鎖 return true; } } /** * 嘗試獲取鎖,子類實現 */ protected abstract boolean tryLock() ; /** * 等待獲取鎖,子類實現 */ protected abstract void waitLock(); /** * 解鎖:刪除key */ @Override public abstract void unlock(); } package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.CountDownLatch; public class BlockingZKLock extends BlockingZKLockStrategy{ private static final String PATH = "/highPerformance_zklock"; //當前節點路徑 private String currentPath; //前一個節點的路徑 private String beforePath; private CountDownLatch countDownLatch = null; //zk地址和端口 public static final String ZK_ADDR = "192.168.32.129:2181"; //超時時間 public static final int SESSION_TIMEOUT = 30000; //創建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT); public BlockingZKLock() { //如果不存在這個節點,則創建持久節點 if (!zkClient.exists(PATH)) { zkClient.createPersistent(PATH); } } @Override protected boolean tryLock() { //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath //if (null == currentPath || "".equals(currentPath)) { //在path下創建一個臨時的順序節點 currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock"); //} try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //獲取所有的臨時節點,並排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) { return true; }else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); beforePath = PATH+"/"+childrens.get(wz-1); } return false; } @Override protected void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch){ countDownLatch.countDown(); } System.out.println("listen lock unlock"); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //監聽前一個節點的變化 zkClient.subscribeDataChanges(beforePath, lIZkDataListener); if (zkClient.exists(beforePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener); } @Override public void unlock() { if (null != zkClient) { System.out.println("lock unclock"); zkClient.delete(currentPath); } } @Override public boolean readLock() { return true; } @Override public boolean writeLock() { return true; } } package com.srr.lock; import java.util.concurrent.CountDownLatch; /** * 測試場景 * count從1加到4 * 使用高性能的分布式鎖在分布式環境下保證結果正確 */ public class T { volatile int count = 1; public void inc(){ for(int i = 0;i<3;i++){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } count++; System.out.println("count == "+count); } } public int getCount(){ return count; } public static void main(String[] args) throws InterruptedException { final T t = new T(); final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(5); for(int i = 0;i<5;i++){ new Thread(new Runnable() { @Override public void run() { DistributedLock distributedLock = new BlockingZKLock(); if(lock.lock(distributedLock)){ t.inc(); lock.unlock(distributedLock); countDownLatch.countDown(); } System.out.println("count == "+t.getCount()); } }).start(); } countDownLatch.await(); } }
這種實現客戶端只需監聽它前一個節點的變化,不需要監聽所有的節點,從而提高了zookeeper鎖的性能。
- 共享鎖(S鎖)
- 寫到這個,看了網絡上很多錯誤的文章實現把排它鎖當做共享鎖
共享鎖正確是實現姿勢如下:
package com.srr.lock; /** * @Description 分布式鎖的接口 */ abstract public interface DistributedLock { /** * 獲取鎖 */ boolean lock(); /** * 解鎖 */ void unlock(); abstract boolean readLock(); abstract boolean writeLock(); } package com.srr.lock; /** * 共享鎖策略 */ abstract public class ZKSharedLockStrategy implements DistributedLock{ @Override public boolean readLock() { //獲取鎖成功 if (tryReadLock()){ System.out.println(Thread.currentThread().getName()+"獲取讀鎖成功"); return true; }else{ //獲取鎖失敗 //阻塞一直等待 waitLock(); //遞歸,再次獲取鎖 return true; } } @Override public boolean writeLock() { //獲取鎖成功 if (tryWriteLock()){ System.out.println(Thread.currentThread().getName()+"獲取寫鎖成功"); return true; }else{ //獲取鎖失敗 //阻塞一直等待 waitLock(); //遞歸,再次獲取鎖 return true; } } /** * 嘗試獲取鎖,子類實現 */ protected abstract boolean tryWriteLock() ; /** * 嘗試獲取鎖,子類實現 */ protected abstract boolean tryReadLock() ; /** * 等待獲取鎖,子類實現 */ protected abstract void waitLock(); /** * 解鎖:刪除key */ @Override public abstract void unlock(); } package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 共享鎖 */ public class ZKSharedLock extends ZKSharedLockStrategy{ private static final String PATH = "/zk-root-readwrite-lock"; //當前節點路徑 private String currentPath; //前一個節點的路徑 private String beforePath; private CountDownLatch countDownLatch = null; //zk地址和端口 public static final String ZK_ADDR = "192.168.32.129:2181"; //超時時間 public static final int SESSION_TIMEOUT = 30000; //創建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT); public ZKSharedLock() { //如果不存在這個節點,則創建持久節點 if (!zkClient.exists(PATH)) { zkClient.createPersistent(PATH); } } @Override protected boolean tryWriteLock() { //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath if (null == currentPath || "".equals(currentPath)) { //在path下創建一個臨時的順序節點 currentPath = zkClient.createEphemeralSequential(PATH+"/w", "writelock"); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //獲取所有的臨時節點,並排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) { return true; }else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); beforePath = PATH+"/"+childrens.get(wz-1); } return false; } @Override protected boolean tryReadLock() { //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath if (null == currentPath || "".equals(currentPath)) { //在path下創建一個臨時的順序節點 currentPath = zkClient.createEphemeralSequential(PATH+"/r", "readklock"); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //獲取所有的臨時節點,並排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) { return true; }else if(isAllReadNodes(childrens)){ return true; }else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); for (int i = wz - 1; i > 0; i--) { // 找到了離得最近的一個寫節點,那么它的后一個節點要么是一個讀節點,要么就是待加鎖的節點本身 if (childrens.get(i).indexOf("w") >= 0) { beforePath = PATH + "/" + childrens.get(i); break; } } } return false; } // 判斷比自已小的節點是否都是讀節點 private boolean isAllReadNodes(List<String> sortNodes) { int pathLength = PATH.length(); int currentIndex = Collections.binarySearch(sortNodes, currentPath.substring(pathLength+1)); for (int i = 0; i < currentIndex - 1; i++) { // 只要有一個寫鎖,則不能直接獲取讀鎖 if (sortNodes.get(i).indexOf("w") >= 0) { return false; } } return true; } @Override protected void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch){ countDownLatch.countDown(); } System.out.println("listen lock unlock"); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //監聽前一個節點的變化 zkClient.subscribeDataChanges(beforePath, lIZkDataListener); if (zkClient.exists(beforePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener); } @Override public boolean lock() { return false; } @Override public void unlock() { if (null != zkClient) { System.out.println("lock unclock"); zkClient.delete(currentPath); zkClient.close(); } } } package com.srr.lock; /** * 鎖工具類 */ public class Lock { /** * 獲取鎖 */ boolean lock(DistributedLock lock) { return lock.lock(); }; /** * 獲取讀鎖 */ boolean readlock(DistributedLock lock) { return lock.readLock(); }; /** * 獲取讀鎖 */ boolean writeLock(DistributedLock lock) { return lock.writeLock(); }; /** * 釋放鎖 */ void unlock(DistributedLock lock) { lock.unlock(); }; } package com.srr.lock; import java.util.concurrent.CountDownLatch; /** * 測試共享鎖 */ public class SharedLockTest { private static volatile int count = 0; public static void main(String[] args) throws Exception { final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(10); new Thread(new Runnable() { @Override public void run() { testWriteLock(8); } }).start(); new Thread(new Runnable() { @Override public void run() { testReadLock(10); } }).start(); new Thread(new Runnable() { @Override public void run() { testReadLock(20); } }).start(); new Thread(new Runnable() { @Override public void run() { testWriteLock(11); } }).start(); new Thread(new Runnable() { @Override public void run() { testWriteLock(30); } }).start(); new Thread(new Runnable() { @Override public void run() { testReadLock(9); } }).start(); countDownLatch.await(); } // 讀鎖 private static void testReadLock(long sleepTime) { try { Lock lock = new Lock(); DistributedLock dlock = new ZKSharedLock(); lock.readlock(dlock); System.out.println("i get readlock ->" + sleepTime); System.out.println("count = "+ count); Thread.sleep(sleepTime); lock.unlock(dlock); } catch (Exception e) { e.printStackTrace(); } } // 寫鎖 private static void testWriteLock(long sleepTime) { try { Lock lock = new Lock(); DistributedLock dlock = new ZKSharedLock(); lock.writeLock(dlock); System.out.println("i get writelock ->" + sleepTime); count++; Thread.sleep(sleepTime); lock.unlock(dlock); } catch (Exception e) { e.printStackTrace(); } } }
運行結果:
從結果可以看出讀鎖和讀鎖可以共享鎖,而寫鎖必須等待讀鎖或者寫鎖釋放之后才能獲取鎖。
最后,zk分布式鎖完美解決方案:
- Apache Curator
- Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
- Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.
網上很多文章竟然標題用Curator實現分布式鎖,大哥Curator框架本身已經實現了分布式鎖而且提供了各種各樣的鎖api供大家使用,我們不用再基於Curator實現分布式鎖,這不是多此一舉嗎?這里給出一個簡單的使用案例,旨在說明意圖:
package com.srr.lock; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 測試場景 * count從1加到101 * 使用redis分布式鎖在分布式環境下保證結果正確 */ public class CuratorDistributedLockTest { private static final String lockPath = "/curator_lock"; //zk地址和端口 public static final String zookeeperConnectionString = "192.168.32.129:2181"; volatile int count = 1; public void inc(){ for(int i = 0;i<10;i++){ count++; System.out.println("count == "+count); } } public int getCount(){ return count; } public static void main(String[] args) throws InterruptedException { final T t = new T(); final Lock lock = new Lock(); final CountDownLatch countDownLatch = new CountDownLatch(4); for(int i = 0;i<4;i++){ new Thread(new Runnable() { @Override public void run() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(10, 5000); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); InterProcessMutex lock = new InterProcessMutex(client, lockPath); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { try { System.out.println("get the lock"); t.inc(); } finally { lock.release(); System.out.println("unlock the lock"); } } }catch (Exception e){ e.printStackTrace(); } countDownLatch.countDown(); } }).start(); } countDownLatch.await(); System.out.println("total count == "+t.getCount()); } }
運行結果:
如果想更多了解Curator框架,請移步http://curator.apache.org/,官網給出了詳細的使用案例及介紹。至此zk實現分布式鎖總結完畢!
原創不易,請多多關注!