zookeeper之分布式鎖以及分布式計數器(通過curator框架實現)


有人可能會問zookeeper我知道,但是curator是什么呢?

其實curator是apachede針對zookeeper開發的一個api框架是apache的頂級項目 他與zookeeper原生api相比更加簡潔方便使用

特別就是注冊watcher這方面.再也不用我們手工去重復注冊watcher了.我們只需監聽一下然后curator全給我們做了.而且支持遞歸創建節點

和遞歸刪除節點.

更大的優勢是實現分布式鎖和分布式計數器以及分布式的同步更加方便.

以前我們基於zk原生的api實現分布式鎖相當麻煩,但是我們基於curator去實現分布式鎖那就是特別簡單的事了.

廢話不多說直接上代碼

一個簡單的demo

/**
 * 分布式鎖
 */
public class CouratorLock {

    //初始化url
    private static final String url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";

    private static int count=10;

    public static void main(String[] args){

        for(int i=0;i<10;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework zk= CuratorFrameworkFactory.builder()
                            .sessionTimeoutMs(5000)
                            .retryPolicy(new RetryNTimes(3,1000))
                            .connectionTimeoutMs(50000)
                            .connectString(url)
                            .build();
                    zk.start();
                    //分布式鎖
                    InterProcessMutex lock=new InterProcessMutex(zk,"/lock");
                    try {
                        //枷鎖
                        lock.acquire();
                        get();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }finally {
                        try {
                            //釋放鎖
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }
    public static void get(){
        count--;
        System.out.println(count);
    }
}

  

沒錯就是這么簡單,curator為我們提供了InterProcessMutex這個類來實現分布式的鎖

 

其中.acquire();這個方法就是開始枷鎖..release();就是釋放鎖.

看很簡單吧.相比我們用zk原生的api實現起來超級簡單

其實底層還是基於臨時節點實現的,而且curator支持一個更加強大的功能,就是當你的客戶端下線以后再次啟動如果再次期間你監聽的節點的有變化,

curator的watcher會立馬進行回調..

所以使用curator開發相當方便,

我們看下curator的CRUD

 

 

 //通過工廠創建連接
        CuratorFramework cachezk= CuratorFrameworkFactory.builder()
                .connectString(url)
                .sessionTimeoutMs(10000)
                .retryPolicy(new RetryNTimes(1000,10))
                .build();
        cachezk.start(); 
 //創建節點
        cachezk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/gang","第一個watcher".getBytes());
        //更新節點
        cachezk.setData().forPath("/super","修改節點".getBytes());
        //刪除節點
        cachezk.delete().deletingChildrenIfNeeded().forPath("/super");
        //關閉連接
        cachezk.close();
    }

  


下面我們看下curator是怎么實現分布式計數器的

 

 

其實也是很簡單,以為復雜的curator都提我們做完了

 

/**
 * 分布式的計數器
 */
public class CuratorCounter {
    //初始化url
    private static final String url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";

    private static int count=10;

    public static void main(String[] args) throws Exception {

        CuratorFramework zk= CuratorFrameworkFactory.builder()
                .sessionTimeoutMs(5000)
                .retryPolicy(new RetryNTimes(3,1000))
                .connectionTimeoutMs(50000)
                .connectString(url)
                .build();
        zk.start();
        //分布式計數器
        DistributedAtomicInteger counter=new DistributedAtomicInteger(zk,"/super",new RetryNTimes(3,100));
        //初始化
        //counter.forceSet(0);
        AtomicValue<Integer> value = counter.increment();
        System.out.println("原值為"+value.preValue());
        System.out.println("更改后的值為"+value.postValue());
        System.out.println("狀態"+value.succeeded());

    }
}

  

 


DistributeAtomicInteger就是curator為我們提供的分布式計數器的類

 

 

關於分布式同步機制我就不給大家介紹了.其實也是很簡單的.你們自行去了解吧

 

能力有限,有錯誤之處還望指出

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM