基於zookeeper的分布式鎖


基於zk的分布式鎖:
   大概原理:仍然跟基於db或者redis一致,就是注冊節點,然后刪除。不同的是zk因為可以對節點的事件進行監聽,那么在收到節點刪除的事件時,正在阻塞的線程便可以發起新的搶占鎖的請求。當然,真正生產的代碼一般不是這么寫的,因為這樣的情況下如果等待的線程非常多,那么zk向所有注冊點的廣播就要消耗大量的帶寬,也會極大的消耗zk的性能,這顯然是不合理的。所以,基於臨時有序節點的分布式鎖的優勢就非常明顯了,所有節點只關注自己的前節點,消耗少,線程等待時間可控,而且高可用。
---------Talk Is Cheap, Just Show Me The Code-------------------------------------------------
第一版本的zk分布式鎖(做法跟db、redis完全相同):
鎖工具類主要方法:
鎖的獲取:
/**
     * 阻塞獲取鎖
     */
    public boolean lock(){
        if(tryLock()){
            return true;
        }
        waitForLock();
        return lock();
    }

    /**
     * 非阻塞獲取鎖
     */
    public boolean tryLock(){
        try{
            zkClient.createPersistent("/lock");
            System.out.println("搶到了鎖"+Thread.currentThread().getName());
        }catch (ZkNodeExistsException e){
            System.out.println("沒搶到鎖"+Thread.currentThread().getName());
            return false;
        }
        return true;
    }

關鍵的搶占失敗后等待方法:

  /**
     * 監聽節點,等待搶鎖
     */
    private void waitForLock(){
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {}

            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("有鎖釋放了,開始搶占鎖"+Thread.currentThread().getName());
                if(cdl != null){
                    cdl.countDown();
                }
            }
        };

        zkClient.subscribeDataChanges("/lock",listener);
        if(zkClient.exists("/lock")){//節點是否存在
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{//節點可能剛好被刪除
            lock();
        }
    }

鎖的釋放:

  /**
     * 解鎖
     */
    public void unlock(){
        System.out.println("刪除鎖"+Thread.currentThread().getName());
        zkClient.delete("/lock");
    }  

測試代碼:

  private CountDownLatch cdl = new CountDownLatch(num);

    @Test
    public void testLock(){
        for(int i=0;i<num;i++){
            MyThread t = new MyThread("mythread"+i);
            t.start();
            cdl.countDown();
        }
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //內部類
    class MyThread extends Thread{
        private String threadName;
        public MyThread(){};
        public MyThread(String threadName){
            this.threadName = threadName;
        }
        @Override
        public void run() {
            ZkLock zkLock = new ZkLock();
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(zkLock.lock()){
                System.out.println("線程-"+threadName+"獲得鎖,orderId為:"+MyResources.getInstance().getNextId());
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                zkLock.unlock();
            }
        }
    }  

執行結果一部分如下:

 

已經不用再說什么了,這么多通知已經說明了一切,每刪除一個節點通知所有訂閱節點的代價是高昂的,這是不能忍受的,得改!

第二版本的zk分布式鎖(臨時有序節點版):
  /**
     * 阻塞鎖
     * @return
     */
    public boolean lock(){
        if(tryLock()){
            return true;
        }else{
            waitForLock();
            return lock();
        }
    }

    /**
     * 非阻塞鎖
     * @return
     */
    public boolean tryLock(){
        if(currentPath == null){
            currentPath = zkClient.createEphemeralSequential(lockPath+"/lock",Thread.currentThread().getName());
            System.out.println(Thread.currentThread().getName()+"創建節點:"+currentPath+"=================================================");
            List<String> nodeList = zkClient.getChildren(lockPath);
            Collections.sort(nodeList);
            if(currentPath.equals(lockPath+"/"+nodeList.get(0))){//當前注冊節點為最靠前節點
                return true;
            }else{
                int weizhi = Collections.binarySearch(nodeList,currentPath.substring(8));
                beforePath = lockPath +"/"+nodeList.get(weizhi-1);
                return false;
            }
        }else{
            return true;
        }
    } 

一樣重要的等待方法:

//鎖等待
    private boolean waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                if(cdl!=null){
                    System.out.println("前面的節點"+beforePath+"被刪除了=====================");
                    cdl.countDown();
                }
            }
        };

        zkClient.subscribeDataChanges(beforePath,listener);
        if(zkClient.exists(beforePath)){//前節點存在
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }else{//訂閱的時候,可能前邊線程處理較快,已經刪除了
            return true;
        }
    }

釋放鎖:

   /**
     * 釋放鎖
     * @return
     */
    public boolean unlock(){
        return zkClient.delete(currentPath);
    } 

 測試代碼:

private int num = 10;
    private CountDownLatch cdl = new CountDownLatch(num);

    @Test
    public void testLock(){
        for(int i=0;i<num;i++){
            ZkLockTest2.MyThread t = new ZkLockTest2.MyThread("mythread"+i);
            t.start();
            cdl.countDown();
        }
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //內部類
    class MyThread extends Thread{
        private String threadName;
        public MyThread(){};
        public MyThread(String threadName){
            this.threadName = threadName;
        }
        @Override
        public void run() {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ZkLock2 zkLock = new ZkLock2();
            if(zkLock.lock()){
                System.out.println("線程-"+threadName+"獲得鎖,orderId為:"+MyResources.getInstance().getNextId());
                zkLock.unlock();
            }
        }
    }

執行結果如下(所有):  

 

結束語:
  三篇結束了,對比三種實現方式結論如下:
  a、基於db的分布式鎖,相對容易理解,易上手;但依賴數據庫,對數據庫有損耗(這個影響其實比較小,多數時候可以忽略),可能會有短時間死鎖等,其它線程重試的時間不確定,整體時間利用率不好把控(個人認為這個是主要的,其它的可重入,死鎖之類的有其它辦法解決)。
  b、基於redis的分布式鎖,性能遠高於db的,依賴redis(這個跟db一般不會掛,但就怕萬一),主備之間可能數據不一致,一樣線程休眠時間不好確定,整體時間利用率不好把控(個人認為這個也是主要原因,解決辦法的話,休眠時間設置為正常單線程處理業務時間的2-3倍(經驗值),所有線程休眠時間在某個時間段內隨機,不要固定時間;其它問題多數可以有辦法解決)。
  c、基於zk的分布式鎖,性能略遜redis但一樣遠高於db,得益於zk的高可用,不用擔心掛掉的問題,而且由於臨時有序節點的排序跟節點監聽,解決了休眠問題;缺點實現略復雜。
只是自己學習之作,沒有考慮生產中具體的需求,可能有理解不正確的地方,歡迎批評斧正。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM