Zookeeper實現分布式鎖


分布式鎖的幾種實現方式:

  目前幾乎很多大型網站及應用都是分布式部署的,分布式場景中的數據一致性問題一直是一個比較重要的話題。分布式的CAP理論告訴我們“任何一個分布式系統都無法同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多只能同時滿足兩項。”所以,很多系統在設計之初就要對這三者做出取舍。在互聯網領域的絕大多數的場景中,都需要犧牲強一致性來換取系統的高可用性,系統往往只需要保證“最終一致性”,只要這個最終時間是在用戶可以接受的范圍內即可。

  在很多場景中,我們為了保證數據的最終一致性,需要很多的技術方案來支持,比如分布式事務、分布式鎖等。有的時候,我們需要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,Java中其實提供了很多並發處理相關的API,但是這些API在分布式場景中就無能為力了。也就是說單純的Java Api並不能提供分布式鎖的能力。所以針對分布式鎖的實現目前有多種方案。

  針對分布式鎖的實現,目前比較常用的有以下幾種方案:

  基於數據庫實現分布式鎖 基於緩存(redis,memcached,tair)實現分布式鎖 基於Zookeeper實現分布式鎖

  現在模擬一個使用Zookeeper實現分布式鎖,假設有A,B,C三台客戶端去訪問資源,調用zookeeper獲得鎖。客戶端三個在zookeeper的 /locks節點下創建一個/lock節點,由於節點是唯一性的特性,只有一個人會創建成功,其余兩個創建失敗,會進入監聽/locks節點的變化,如果/locks下子節點/lock節點發生變化,其余兩個可以去拿鎖,這樣是否好呢? 這樣子會導致驚群效應。就是一個觸發使得在短時間呢會觸發大量的watcher事件,但是只有一個客戶端能拿到鎖。所以這種方式不建議。

  有一種比較好的方法就是利用 zookeeper 的有序節點的特性,基本思路:

  1. 在獲取分布式鎖的時候在locker節點下創建臨時順序節點,釋放鎖的時候刪除該臨時節點。
  2. 客戶端調用createNode方法在locks下創建臨時順序節點,然后調用getChildren(“locks”)來獲取locks下面的所有子節點,注意此時不用設置任何Watcher。
  3. 客戶端獲取到所有的子節點path之后,如果發現自己創建的子節點序號最小,那么就認為該客戶端獲取到了鎖。
  4. 如果發現自己創建的節點並非locks所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,然后對其調用exist()方法,同時對其注冊事件監聽器。
  5. 之后,讓這個被關注的節點刪除,則客戶端的Watcher會收到相應通知,此時再次判斷自己創建的節點是否是locks子節點中序號最小的,如果是則獲取到了鎖,如果不是則重復以上步驟繼續獲取到比自己小的一個節點並注冊監聽。

   下面看一下我的代碼實現:

public class DistributedLock implements Lock, Watcher { private ZooKeeper zk = null; private String ROOT_LOCK = "/locks"; //定義根節點
    private String WAIT_LOCK; //等待前一個鎖
    private String CURRENT_LOCK; //表示當前的鎖 // 作為監聽前繼節點阻塞
    private CountDownLatch countDownLatch; //
    //作為連接阻塞,等待成功連接事件到達才放下門閂
    private CountDownLatch connectCountDownLatch =new CountDownLatch(1); public DistributedLock() { try { zk = new ZooKeeper("192.168.1.101:2181", 10000, this); connectCountDownLatch.await(); //判斷根節點是否存在
            Stat stat = zk.exists(ROOT_LOCK, true); if (stat == null) {//如果不存在創建
                zk.create(ROOT_LOCK, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } /** * 嘗試獲取鎖 */ @Override public boolean tryLock() { try { //創建臨時有序節點
            CURRENT_LOCK = zk.create(ROOT_LOCK + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + "->" + CURRENT_LOCK + ",嘗試競爭鎖"); List<String> childrens = zk.getChildren(ROOT_LOCK, false); //獲取根節點下的所有子節點
            SortedSet<String> sortedSet = new TreeSet();//定義一個集合進行排序
            for (String children : childrens) { // 排序
                sortedSet.add(ROOT_LOCK + "/" + children); } String firstNode = sortedSet.first(); //獲得當前所有子節點中最小的節點 // 取出比我創建的節點還小的節點,沒有的話為null
            SortedSet<String> lessThenMe = ((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK); if (CURRENT_LOCK.equals(firstNode)) {//通過當前的節點和子節點中最小的節點進行比較,如果相等,表示獲得鎖成功
                return true; } if (!lessThenMe.isEmpty()) { WAIT_LOCK = lessThenMe.last();//獲得比當前節點更小的最后一個節點,設置給WAIT_LOCK
 } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public void lock() { if (this.tryLock()) { //如果獲得鎖成功
            System.out.println(Thread.currentThread().getName() + "->" + CURRENT_LOCK + "->獲得鎖成功"); return; } try { waitForLock(WAIT_LOCK); //沒有獲得鎖,繼續等待獲得鎖
        } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } private boolean waitForLock(String prev) throws KeeperException, InterruptedException { //監聽當前節點的上一個節點 注冊事件,這里需要在默認的 watch 事件里面處理 // 這里是我們之前提到的 watch 事件觸發最后執行的 process 回調里面的 請看最下行代碼
        Stat stat = zk.exists(prev, true); if (stat != null) { System.out.println(Thread.currentThread().getName() + "->等待鎖" + ROOT_LOCK + "/" + prev + "釋放"); countDownLatch = new CountDownLatch(1); countDownLatch.await();// 進入等待,這里需要 // watcher觸發以后,還需要再次判斷當前等待的節點是不是最小的
            System.out.println(Thread.currentThread().getName() + "->獲得鎖成功"); } return true; } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { System.out.println(Thread.currentThread().getName() + "->釋放鎖" + CURRENT_LOCK); try { // -1 表示無論如何先把這個節點刪了再說
            zk.delete(CURRENT_LOCK, -1); CURRENT_LOCK = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } @Override public Condition newCondition() { return null; } @Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected==event.getState()){ //如果收到了服務端的響應事件,連接成功
 connectCountDownLatch.countDown(); } // 事件回調 countDownLatch.countDown();
        if (this.countDownLatch != null) { this.countDownLatch.countDown(); } } }

  代碼中實現了  Lock,Watcher 兩個接口。主要用到的是lock 里面的trylock方法,嘗試去獲取鎖。然后還有watcher里面的處理回調的方法

測試代碼:

 public static void main( String[] args ) throws IOException {
        CountDownLatch countDownLatch=new CountDownLatch(10);
        for(int i=0;i<10;i++){
            new Thread(()->{
                try {
                    countDownLatch.await();
                    DistributedLock distributedLock=new DistributedLock();
                    distributedLock.lock(); //獲得鎖
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"Thread-"+i).start();
            countDownLatch.countDown();
        }
        System.in.read();
    }

  運行結果就可以看到:

Thread-8->/locks/0000000040,嘗試競爭鎖
Thread-5->/locks/0000000044,嘗試競爭鎖
Thread-9->/locks/0000000041,嘗試競爭鎖
Thread-3->/locks/0000000042,嘗試競爭鎖
Thread-7->/locks/0000000046,嘗試競爭鎖
Thread-1->/locks/0000000043,嘗試競爭鎖
Thread-0->/locks/0000000047,嘗試競爭鎖
Thread-4->/locks/0000000045,嘗試競爭鎖
Thread-2->/locks/0000000049,嘗試競爭鎖
Thread-6->/locks/0000000048,嘗試競爭鎖
Thread-8->/locks/0000000040->獲得鎖成功
Thread-9->等待鎖/locks//locks/0000000040釋放
Thread-5->等待鎖/locks//locks/0000000043釋放
Thread-0->等待鎖/locks//locks/0000000046釋放
Thread-1->等待鎖/locks//locks/0000000042釋放
Thread-2->等待鎖/locks//locks/0000000048釋放
Thread-7->等待鎖/locks//locks/0000000045釋放
Thread-6->等待鎖/locks//locks/0000000047釋放
Thread-3->等待鎖/locks//locks/0000000041釋放
Thread-4->等待鎖/locks//locks/0000000044釋放

  這樣子當我們把 Thread-8所鎖的節點 0000000040 刪掉   Thread-9 就會獲取鎖,以此類推。實現分布式鎖

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM