Zookeeper實現分布式鎖


幾天分析了一下三種分布式鎖的實現,但是沒有利用zookeeper實現一個分布式鎖,因為感覺基於Zookeeper實現分布式鎖還是稍微復雜的,同時也需要使用Watcher機制,所以就單獨搞一篇Zookeeper實現的分布式鎖。

首先,第一種實現。我們可以利用Zookeeper不能重復創建一個節點的特性來實現一個分布式鎖,這看起來和redis實現分布式鎖很像。但是也是有差異的,后面會詳細分析。
主要流程圖如下:

 

 

 

上面的流程很簡單:

  1. 查看目標Node是否已經創建,已經創建,那么等待鎖。
  2. 如果未創建,創建一個瞬時Node,表示已經占有鎖。
  3. 如果創建失敗,那么證明鎖已經被其他線程占有了,那么同樣等待鎖。
  4. 當釋放鎖,或者當前Session超時的時候,節點被刪除,喚醒之前等待鎖的線程去爭搶鎖。

上面是一個完整的流程,簡單的代碼實現如下:

package com.codertom.params.engine;

import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;

/**
 * Zookeepr實現分布式鎖
 */
public class LockTest {

    private String zkQurom = "localhost:2181";

    private String lockNameSpace = "/mylock";

    private String nodeString = lockNameSpace + "/test1";

    private Lock mainLock;

    private ZooKeeper zk;

    public LockTest(){
        try {
            zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("Receive event "+watchedEvent);
                    if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                        System.out.println("connection is established...");
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    private void ensureRootPath() throws InterruptedException {
        try {
            if (zk.exists(lockNameSpace,true)==null){
                zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    private void watchNode(String nodeString, final Thread thread) throws InterruptedException {
        try {
            zk.exists(nodeString, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println( "==" + watchedEvent.toString());
                    if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                        System.out.println("Threre is a Thread released Lock==============");
                        thread.interrupt();
                    }
                    try {
                        zk.exists(nodeString,new Watcher() {
                            @Override
                            public void process(WatchedEvent watchedEvent) {
                                System.out.println( "==" + watchedEvent.toString());
                                if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                                    System.out.println("Threre is a Thread released Lock==============");
                                    thread.interrupt();
                                }
                                try {
                                    zk.exists(nodeString,true);
                                } catch (KeeperException e) {
                                    e.printStackTrace();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }

                        });
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            });
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 獲取鎖
     * @return
     * @throws InterruptedException
     */
    public boolean lock() throws InterruptedException {
        String path = null;
        ensureRootPath();
        watchNode(nodeString,Thread.currentThread());
        while (true) {
            try {
                path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            } catch (KeeperException e) {
                System.out.println(Thread.currentThread().getName() + "  getting Lock but can not get");
                try {
                    Thread.sleep(5000);
                }catch (InterruptedException ex){
                    System.out.println("thread is notify");
                }
            }
            if (!Strings.nullToEmpty(path).trim().isEmpty()) {
                System.out.println(Thread.currentThread().getName() + "  get Lock...");
                return true;
            }
        }
    }

    /**
     * 釋放鎖
     */
    public void unlock(){
        try {
            zk.delete(nodeString,-1);
            System.out.println("Thread.currentThread().getName() +  release Lock...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public static void main(String args[]) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0;i<4;i++){
            service.execute(()-> {
                LockTest test = new LockTest();
                try {
                    test.lock();
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                test.unlock();
            });
        }
        service.shutdown();
    }
}

代碼比較糙,但是大致的實現思路和上述一致,這里需要注意:

  1. 因為使用的是原生的Zookeeper API實現,Watch需要重復的設置,所以代碼復雜的些。
  2. 喚醒直接用的Thread.interupt這樣其實控制流程其實是不好的。

其實上面的實現有優點也有缺點:
優點:
實現比較簡單,有通知機制,能提供較快的響應,有點類似reentrantlock的思想,對於節點刪除失敗的場景由Session超時保證節點能夠刪除掉。
缺點:
重量級,同時在大量鎖的情況下會有“驚群”的問題。

“驚群”就是在一個節點刪除的時候,大量對這個節點的刪除動作有訂閱Watcher的線程會進行回調,這對Zk集群是十分不利的。所以需要避免這種現象的發生。

解決“驚群”:

為了解決“驚群“問題,我們需要放棄訂閱一個節點的策略,那么怎么做呢?

  1. 我們將鎖抽象成目錄,多個線程在此目錄下創建瞬時的順序節點,因為Zk會為我們保證節點的順序性,所以可以利用節點的順序進行鎖的判斷。
  2. 首先創建順序節點,然后獲取當前目錄下最小的節點,判斷最小節點是不是當前節點,如果是那么獲取鎖成功,如果不是那么獲取鎖失敗。
  3. 獲取鎖失敗的節點獲取當前節點上一個順序節點,對此節點注冊監聽,當節點刪除的時候通知當前節點。
  4. 當unlock的時候刪除節點之后會通知下一個節點。

上面的實現和reentrantlock的公平鎖實現還是比較類似的,下面是簡單的實現:

package com.codertom.params.engine;

import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by zhiming on 2017-02-05.
 */
public class FairLockTest {

    private String zkQurom = "localhost:2181";

    private String lockName = "/mylock";

    private String lockZnode = null;

    private ZooKeeper zk;

    public FairLockTest(){
        try {
            zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("Receive event "+watchedEvent);
                    if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                        System.out.println("connection is established...");
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    private void ensureRootPath(){
        try {
            if (zk.exists(lockName,true)==null){
                zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 獲取鎖
     * @return
     * @throws InterruptedException
     */
    public void lock(){
        String path = null;
        ensureRootPath();
            try {
                path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                lockZnode = path;
                List<String> minPath = zk.getChildren(lockName,false);
                System.out.println(minPath);
                Collections.sort(minPath);
                System.out.println(minPath.get(0)+" and path "+path);
                if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) {
                    System.out.println(Thread.currentThread().getName() + "  get Lock...");
                    return;
                }
                String watchNode = null;
                for (int i=minPath.size()-1;i>=0;i--){
                    if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){
                        watchNode = minPath.get(i);
                        break;
                    }
                }

                if (watchNode!=null){
                    final String watchNodeTmp = watchNode;
                    final Thread thread = Thread.currentThread();
                    Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() {
                        @Override
                        public void process(WatchedEvent watchedEvent) {
                            if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                                thread.interrupt();
                            }
                            try {
                                zk.exists(lockName + "/" + watchNodeTmp,true);
                            } catch (KeeperException e) {
                                e.printStackTrace();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }

                    });
                    if(stat != null){
                        System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode);
                    }
                }
                try {
                    Thread.sleep(1000000000);
                }catch (InterruptedException ex){
                    System.out.println(Thread.currentThread().getName() + " notify");
                    System.out.println(Thread.currentThread().getName() + "  get Lock...");
                    return;
                }

            } catch (Exception e) {
               e.printStackTrace();
            }
    }

    /**
     * 釋放鎖
     */
    public void unlock(){
        try {
            System.out.println(Thread.currentThread().getName() +  "release Lock...");
            zk.delete(lockZnode,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }



    public static void main(String args[]) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0;i<4;i++){
            service.execute(()-> {
                FairLockTest test = new FairLockTest();
                try {
                    test.lock();
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                test.unlock();
            });
        }
        service.shutdown();
    }

}

同樣上面的程序也有幾點需要注意:

  1. Zookeeper的API沒有提供直接的獲取上一個節點或者最小節點的API需要我們自己實現。
  2. 使用了interrupt做線程的喚醒,這樣不科學,因為不想將JVM的lock引進來所以沒有用countdownlatch來做流程控制。
  3. Watch也是要重新設置的,這里使用了Watch的復用,所以代碼簡單些。

其實上面的實現還是很復雜的,因為你需要反復的去關注Watcher,實現一個Demo可以,做一個生產環境可用的Lock並不容易。因為你的代碼bug在生產環境上會引起很嚴重的bug。

其實對於Zookeeper的一些常用功能是有一些成熟的包實現的,像Curator。Curator的確是足夠牛逼,不僅封裝了Zookeeper的常用API,也包裝了很多常用Case的實現。但是它的編程風格其實還是吧比較難以接受的。

可以用Curator輕易的實現一個分布式鎖:

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

是的就這么簡單,一個直接拿過來可用的輪子。

基於Zookeeper的分布式鎖就說完了。基於Zookeeper實現分布式鎖,其實是不常用的。雖然它實現鎖十分優雅,但編程復雜,同時還要單獨維護一套Zookeeper集群,頻繁的Watch對Zookeeper集群的壓力還是蠻大的,如果不是原有的項目以來Zookeeper,同時鎖的量級比較小的話,還是不用為妙。



作者:一只小哈
鏈接:https://www.jianshu.com/p/5d12a01018e1
來源:簡書


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM