zookeeper實現分布式鎖總結,看這一篇足矣(設計模式應用實戰)


zk實現分布式鎖縱觀網絡各種各樣的帖子層出不窮,筆者查閱很多資料發現一個問題,有些文章只寫原理並沒有具體實現,有些文章雖然寫了實現但是並不全面

借這個周末給大家做一個總結,代碼拿來就可以用並且每一種實現都經過了測試沒有bug。下面我們先從最簡單的實現開始介紹:

  • 簡單的實現
package com.srr.lock;

/**
 * @Description 分布式鎖的接口
 */
abstract  public interface DistributedLock {
    /**
     * 獲取鎖
     */
    boolean lock();
    /**
     * 解鎖
     */
    void unlock();

    abstract boolean readLock();
    abstract boolean writeLock();
}

package com.srr.lock;

/**
 * 簡單的zk分布式做實現策略
 * 性能比較低會導致羊群效應
 */
public abstract class SimplerZKLockStrategy implements DistributedLock{
    /**
     * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現
     * @throws Exception
     */
    @Override
    public boolean lock() {
        //獲取鎖成功
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"獲取鎖成功");
            return true;
        }else{  //獲取鎖失敗
            //阻塞一直等待
            waitLock();
            //遞歸,再次獲取鎖
            return lock();
        }
    }

    /**
     * 嘗試獲取鎖,子類實現
     */
    protected abstract boolean tryLock() ;
    /**
     * 等待獲取鎖,子類實現
     */
    protected abstract void waitLock();
    /**
     * 解鎖:刪除key
     */
    @Override
    public  abstract void unlock();
}

package com.srr.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.concurrent.CountDownLatch;

/**
 * 分布式鎖簡單實現
 */
public class SimpleZKLock extends SimplerZKLockStrategy{
    private static final String PATH = "/lowPerformance_zklock";
    private CountDownLatch countDownLatch = null;
    //zk地址和端口
    public static final String ZK_ADDR = "192.168.32.129:2181";
    //創建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR);

    @Override
    protected boolean tryLock() {
        //如果不存在這個節點,則創建持久節點
        try{
            zkClient.createEphemeral(PATH, "lock");
            return true;
        }catch (Exception e){
            return false;
        }
    }

    @Override
    protected void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch){
                    countDownLatch.countDown();
                }
                System.out.println("listen lock unlock");
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //監聽前一個節點的變化
        zkClient.subscribeDataChanges(PATH, lIZkDataListener);
        if (zkClient.exists(PATH)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(PATH, lIZkDataListener);

    }

    @Override
    public void unlock() {
        if (null != zkClient) {
            System.out.println("lock unclock");
            zkClient.delete(PATH);
        }
    }

    @Override
    public boolean readLock() {
        return true;
    }

    @Override
    public boolean writeLock() {
        return true;
    }
}

package com.srr.lock;

import redis.clients.jedis.Jedis;

import java.util.concurrent.CountDownLatch;

/**
 *  測試場景
 *  count從1加到4
 *  使用簡單的分布式鎖在分布式環境下保證結果正確
 */
public class T {

    volatile int  count = 1;

    public void inc(){
        for(int i = 0;i<3;i++){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            count++;
            System.out.println("count == "+count);
        }
    }

    public int getCount(){
       return count;
    }

    public static void main(String[] args) throws InterruptedException {
        final T t = new T();
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i = 0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributedLock distributedLock = new SimpleZKLock();
                    if(lock.lock(distributedLock)){
                        t.inc();
                        lock.unlock(distributedLock);
                        countDownLatch.countDown();
                    }
                    System.out.println("count == "+t.getCount());
                }
            }).start();
        }
        countDownLatch.await();
    }
}

運行結果:

 

 這種方式實現雖然簡單,但是會引發羊群效應,因為每個等待鎖的客戶端都需要注冊監聽lock節點的刪除事件,如果客戶端並發請求很多,那么這將會非常消耗zookeeper集群

的資源,嚴重的化則會導致zookeeper集群宕機也不是沒有可能。

  • 高性能實現,解決羊群效應問題
package com.srr.lock;

/**
 * @Description 分布式鎖的接口
 */
abstract  public interface DistributedLock {
    /**
     * 獲取鎖
     */
    boolean lock();
    /**
     * 解鎖
     */
    void unlock();

    abstract boolean readLock();
    abstract boolean writeLock();
}

package com.srr.lock;

public abstract class BlockingZKLockStrategy implements DistributedLock{
    /**
     * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現
     * @throws Exception
     */
    @Override
    public final boolean lock() {
        //獲取鎖成功
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"獲取鎖成功");
            return true;
        }else{  //獲取鎖失敗
            //阻塞一直等待
            waitLock();
            //遞歸,再次獲取鎖
            return true;
        }
    }

    /**
     * 嘗試獲取鎖,子類實現
     */
    protected abstract boolean tryLock() ;
    /**
     * 等待獲取鎖,子類實現
     */
    protected abstract void waitLock();
    /**
     * 解鎖:刪除key
     */
    @Override
    public  abstract void unlock();
}

package com.srr.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class BlockingZKLock extends BlockingZKLockStrategy{
    private static final String PATH = "/highPerformance_zklock";
    //當前節點路徑
    private String currentPath;
    //前一個節點的路徑
    private String beforePath;
    private CountDownLatch countDownLatch = null;
    //zk地址和端口
    public static final String ZK_ADDR = "192.168.32.129:2181";
    //超時時間
    public static final int SESSION_TIMEOUT = 30000;
    //創建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);

    public BlockingZKLock() {
        //如果不存在這個節點,則創建持久節點
        if (!zkClient.exists(PATH)) {
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    protected boolean tryLock() {
        //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
        //if (null == currentPath || "".equals(currentPath)) {
            //在path下創建一個臨時的順序節點
        currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
        //}
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //獲取所有的臨時節點,並排序
        List<String> childrens = zkClient.getChildren(PATH);

        Collections.sort(childrens);

        if (currentPath.equals(PATH+"/"+childrens.get(0))) {
            return true;
        }else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
            beforePath = PATH+"/"+childrens.get(wz-1);
        }
        return false;

    }

    @Override
    protected void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch){
                    countDownLatch.countDown();
                }
                System.out.println("listen lock unlock");
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //監聽前一個節點的變化
        zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);

    }

    @Override
    public void unlock() {
        if (null != zkClient) {
            System.out.println("lock unclock");
            zkClient.delete(currentPath);
        }
    }

    @Override
    public boolean readLock() {
        return true;
    }

    @Override
    public boolean writeLock() {
        return true;
    }
}

package com.srr.lock;


import java.util.concurrent.CountDownLatch;

/**
 *  測試場景
 *  count從1加到4
 *  使用高性能的分布式鎖在分布式環境下保證結果正確
 */
public class T {

    volatile int  count = 1;

    public void inc(){
        for(int i = 0;i<3;i++){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            count++;
            System.out.println("count == "+count);
        }
    }

    public int getCount(){
       return count;
    }

    public static void main(String[] args) throws InterruptedException {
        final T t = new T();
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i = 0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributedLock distributedLock = new BlockingZKLock();
                    if(lock.lock(distributedLock)){
                        t.inc();
                        lock.unlock(distributedLock);
                        countDownLatch.countDown();
                    }
                    System.out.println("count == "+t.getCount());
                }
            }).start();
        }
        countDownLatch.await();
    }
}

這種實現客戶端只需監聽它前一個節點的變化,不需要監聽所有的節點,從而提高了zookeeper鎖的性能。

  • 共享鎖(S鎖)
  • 寫到這個,看了網絡上很多錯誤的文章實現把排它鎖當做共享鎖

共享鎖正確是實現姿勢如下:

 

package com.srr.lock;

/**
 * @Description 分布式鎖的接口
 */
abstract  public interface DistributedLock {
    /**
     * 獲取鎖
     */
    boolean lock();
    /**
     * 解鎖
     */
    void unlock();

    abstract boolean readLock();
    abstract boolean writeLock();
}

package com.srr.lock;

/**
 * 共享鎖策略
 */
abstract public class ZKSharedLockStrategy implements DistributedLock{
    @Override
    public boolean readLock() {
        //獲取鎖成功
        if (tryReadLock()){
            System.out.println(Thread.currentThread().getName()+"獲取讀鎖成功");
            return true;
        }else{  //獲取鎖失敗
            //阻塞一直等待
            waitLock();
            //遞歸,再次獲取鎖
            return true;
        }
    }

    @Override
    public boolean writeLock() {
        //獲取鎖成功
        if (tryWriteLock()){
            System.out.println(Thread.currentThread().getName()+"獲取寫鎖成功");
            return true;
        }else{  //獲取鎖失敗
            //阻塞一直等待
            waitLock();
            //遞歸,再次獲取鎖
            return true;
        }
    }

    /**
     * 嘗試獲取鎖,子類實現
     */
    protected abstract boolean tryWriteLock() ;

    /**
     * 嘗試獲取鎖,子類實現
     */
    protected abstract boolean tryReadLock() ;

    /**
     * 等待獲取鎖,子類實現
     */
    protected abstract void waitLock();

    /**
     * 解鎖:刪除key
     */
    @Override
    public  abstract void unlock();
}

package com.srr.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 共享鎖
 */
public class ZKSharedLock extends ZKSharedLockStrategy{

    private static final String PATH = "/zk-root-readwrite-lock";
    //當前節點路徑
    private String currentPath;
    //前一個節點的路徑
    private String beforePath;
    private CountDownLatch countDownLatch = null;
    //zk地址和端口
    public static final String ZK_ADDR = "192.168.32.129:2181";
    //超時時間
    public static final int SESSION_TIMEOUT = 30000;
    //創建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
    public ZKSharedLock() {
        //如果不存在這個節點,則創建持久節點
        if (!zkClient.exists(PATH)) {
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    protected boolean tryWriteLock() {
        //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
        if (null == currentPath || "".equals(currentPath)) {
        //在path下創建一個臨時的順序節點
        currentPath = zkClient.createEphemeralSequential(PATH+"/w", "writelock");
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //獲取所有的臨時節點,並排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);

        if (currentPath.equals(PATH+"/"+childrens.get(0))) {
            return true;
        }else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
            beforePath = PATH+"/"+childrens.get(wz-1);
        }
        return false;
    }

    @Override
    protected boolean tryReadLock() {
        //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
        if (null == currentPath || "".equals(currentPath)) {
            //在path下創建一個臨時的順序節點
            currentPath = zkClient.createEphemeralSequential(PATH+"/r", "readklock");
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //獲取所有的臨時節點,並排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);

        if (currentPath.equals(PATH+"/"+childrens.get(0))) {
            return true;
        }else if(isAllReadNodes(childrens)){
            return true;
        }else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));

            for (int i = wz - 1; i > 0; i--) {
                // 找到了離得最近的一個寫節點,那么它的后一個節點要么是一個讀節點,要么就是待加鎖的節點本身
                if (childrens.get(i).indexOf("w") >= 0) {
                    beforePath = PATH + "/" + childrens.get(i);
                    break;
                }
            }
        }
        return false;
    }

    // 判斷比自已小的節點是否都是讀節點
    private boolean isAllReadNodes(List<String> sortNodes) {
        int pathLength = PATH.length();
        int currentIndex =  Collections.binarySearch(sortNodes, currentPath.substring(pathLength+1));
        for (int i = 0; i < currentIndex - 1; i++) {
            // 只要有一個寫鎖,則不能直接獲取讀鎖
            if (sortNodes.get(i).indexOf("w") >= 0) {
                return false;
            }
        }

        return true;
    }

    @Override
    protected void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch){
                    countDownLatch.countDown();
                }
                System.out.println("listen lock unlock");
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //監聽前一個節點的變化
        zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
    }

    @Override
    public boolean lock() {
        return false;
    }

    @Override
    public void unlock() {
        if (null != zkClient) {
            System.out.println("lock unclock");
            zkClient.delete(currentPath);
            zkClient.close();
        }
    }
}

package com.srr.lock;

/**
 * 鎖工具類
 */
public class Lock {
    /**
     * 獲取鎖
     */
    boolean lock(DistributedLock lock) {
        return lock.lock();
    };

    /**
     * 獲取讀鎖
     */
    boolean readlock(DistributedLock lock) {
        return lock.readLock();
    };

    /**
     * 獲取讀鎖
     */
    boolean writeLock(DistributedLock lock) {
        return lock.writeLock();
    };

    /**
     * 釋放鎖
     */
    void unlock(DistributedLock lock) {
        lock.unlock();
    };
}

package com.srr.lock;

import java.util.concurrent.CountDownLatch;

/**
 * 測試共享鎖
 */
public class SharedLockTest {
    private static volatile int count = 0;
    public static void main(String[] args) throws Exception {
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(10);

        new Thread(new Runnable() {
            @Override
            public void run() {
                testWriteLock(8);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testReadLock(10);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testReadLock(20);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testWriteLock(11);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testWriteLock(30);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                testReadLock(9);
            }
        }).start();

        countDownLatch.await();
    }

    // 讀鎖
    private static void testReadLock(long sleepTime) {
        try {
            Lock lock = new Lock();
            DistributedLock dlock = new ZKSharedLock();
            lock.readlock(dlock);
            System.out.println("i get readlock ->" + sleepTime);
            System.out.println("count = "+ count);
            Thread.sleep(sleepTime);
            lock.unlock(dlock);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 寫鎖
    private static void testWriteLock(long sleepTime) {
        try {
            Lock lock = new Lock();
            DistributedLock dlock = new ZKSharedLock();
            lock.writeLock(dlock);
            System.out.println("i get writelock ->" + sleepTime);
            count++;
            Thread.sleep(sleepTime);
            lock.unlock(dlock);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

運行結果:

 

 

 從結果可以看出讀鎖和讀鎖可以共享鎖,而寫鎖必須等待讀鎖或者寫鎖釋放之后才能獲取鎖。

最后,zk分布式鎖完美解決方案:

  • Apache Curator
  • Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
  • Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.

網上很多文章竟然標題用Curator實現分布式鎖,大哥Curator框架本身已經實現了分布式鎖而且提供了各種各樣的鎖api供大家使用,我們不用再基於Curator實現分布式鎖,這不是多此一舉嗎?這里給出一個簡單的使用案例,旨在說明意圖:

package com.srr.lock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 *  測試場景
 *  count從1加到101
 *  使用redis分布式鎖在分布式環境下保證結果正確
 */
public class CuratorDistributedLockTest {
    private static final String lockPath = "/curator_lock";
    //zk地址和端口
    public static final String zookeeperConnectionString = "192.168.32.129:2181";

    volatile int  count = 1;

    public void inc(){
        for(int i = 0;i<10;i++){
            count++;
            System.out.println("count == "+count);
        }
    }

    public int getCount(){
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        final T t = new T();
        final Lock lock = new Lock();
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        for(int i = 0;i<4;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(10, 5000);
                    CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                    client.start();
                    InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                    try {
                        if (lock.acquire(10 * 1000, TimeUnit.SECONDS))
                        {
                            try
                            {
                                System.out.println("get the lock");
                                t.inc();
                            }
                            finally
                            {
                                lock.release();
                                System.out.println("unlock the lock");
                            }
                        }
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            }).start();
        }

        countDownLatch.await();
        System.out.println("total count == "+t.getCount());
    }
}

運行結果:

 

 

 如果想更多了解Curator框架,請移步http://curator.apache.org/,官網給出了詳細的使用案例及介紹。至此zk實現分布式鎖總結完畢!

 原創不易,請多多關注!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM