zookeeper實現分布式鎖


使用zookeeper實現分布式鎖是分布式鎖的實現方式的一種,相對於redis的實現,zookeeper的顯現能夠實現鎖的獲得順序,不出現死鎖等特點,關於zookeeper分布式鎖的實現原理大致總結如下:

  • 客戶端向zookeeper的某一個持久節點下注冊臨時有序節點
  • 獲取該父節點下的所有臨時有序節點,將自己剛才注冊的臨時節點與節點集合的第一個做比較,如果相同則表示自己是第一個注冊也就是可以獲得鎖的,否則,注冊比前置節點的監聽事件,如果監聽到前置節點被刪除(即節點狀態發生變化),則重新獲取節點集合並進行比較

原理性的東西網上很多很全面,現在我貼下代碼,做一個簡陋的分布式鎖的實現

1.下載zookeeper服務器到本地,啟動

https://zookeeper.apache.org/releases.html#download

解壓縮后,修改conf目錄下的zoo_sample.cfg復制為zoo.cfg,修改里面的配置,沒有/data和/log目錄就新建一個吧

 

 

 在bin目錄下啟動zk_server.cmd,然后啟動zk_client.cmd客戶端來驗證下服務器是否已經啟動成功

 

2.pom.xml引入zkclient的客戶端jar包

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>

 

3.代碼實現  

package com.cw.zklock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.ZooKeeper;

import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class ZookeeperLock{

    private ZkClient zkClient;

    private static final String ROOT_PATH="/lock";

    private String lockName;
    private String eNodeName;
    private String eNodeNo;

    public  ZookeeperLock(String lockName){
        zkClient = new ZkClient("localhost:2181",2000);
        //連接zk
        this.lockName =ROOT_PATH.concat("/").concat(lockName);
        //創建持久節點,該節點下后面創建臨時有序節點
        if (!zkClient.exists(this.lockName)) {
            zkClient.createPersistent(this.lockName);
        }
    }

    public void lock(){
        for (;;) {
            if (eNodeName == null) {
                //創建臨時有序節點
                eNodeName = zkClient.createEphemeralSequential(lockName.concat("/"), System.currentTimeMillis());
                eNodeNo=eNodeName.substring(eNodeName.lastIndexOf("/")+1);
            }
            List<String> children = zkClient.getChildren(lockName);
            //節點排序
            Collections.sort(children);
            if (children.size() > 0) {
                if (children.get(0).equals(eNodeNo)) {
                    //自己創建的就是第一個臨時順序節點,意味着拿到了鎖
                    System.out.println(eNodeNo+"------------》獲得了鎖");
                    break;
                }else {
                    CountDownLatch latch = new CountDownLatch(1);
                    //獲取改節點的前一個節點,注冊監聽事件,並等待
                   String prevPath= children.get(children.indexOf(eNodeNo)-1);
                    System.out.println(eNodeNo+"注冊監聽到------》"+prevPath);
                    zkClient.subscribeDataChanges(lockName.concat("/").concat(prevPath), new IZkDataListener() {
                        @Override
                        public void handleDataChange(String s, Object o) throws Exception {
                            latch.countDown();
                        }

                        @Override
                        public void handleDataDeleted(String s) throws Exception {
                            System.out.println(eNodeNo+"!!!!!!!!!!!被喚醒");
                            latch.countDown();
                        }
                    });

                    try {
                        System.out.println(eNodeNo+"??????????????開始等待");
                        latch.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }else {
                //異常情況
                System.out.println("子節點數據為空,拋出異常");
                throw new RuntimeException("獲取鎖異常");
            }
        }
    }

    public void unLock(){
        zkClient.delete(eNodeName);
        zkClient.close();
        System.out.println(eNodeNo+"********************釋放了鎖");
    }

    public static void main(String[] args) {
        //測試5個線程的爭搶
        int threadCount=5;
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(()->{
                try {
                    latch.await();
                    ZookeeperLock lock = new ZookeeperLock("lockTest");
                    lock.lock();
                    //模擬處理業務邏輯
                    Thread.sleep(1000+ new Random().nextInt(100)*10);
                    lock.unLock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            thread.start();
            latch.countDown();
        }


    }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM