zookeeper開源客戶端curator


  zookeeper的原生api相對來說比較繁瑣,比如:對節點添加監聽事件,當監聽觸發后,我們需要再次手動添加監聽,否則監聽只生效一次;再比如,斷線重連也需要我們手動代碼來判斷處理等等。對於curator的介紹,從網上百度了一段:Curator是Netflix開源的一套zookeeper客戶端框架,用它來操作zookeeper更加方便,按Curator官方所比喻的,guava to JAVA,curator to zookeeper,Curator采用了fluent風格的代碼,非常簡潔

---------------正-----文-----------------------------------------------------------------------------------------------------------------------------------------------

  Curator包含6部分,均可提供單獨jar包,每個包簡單介紹如下:

  client:zk-client的替代品,提供一些底層處理跟工具類;

  framework:高級封裝,大大簡化了zk的客戶端編程,包含對zk的連接管理,重試機制等;

  repices:提供了一些常用的操作,比如持續監聽,鎖,選舉等;

  utilities:各種工具類;

  errors:curator對異常跟錯誤的處理;

  extendsion:擴展包;

  基本api代碼解釋:

  1、建立連接

  建立連接需要指定zk地址以及重試策略等,先上代碼再解釋:

 RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);//重試5次,每次間隔時間指數增長(有具體增長公式)
 RetryPolicy retry1 = new RetryNTimes(5, 5000);//重試5次,每次間隔5秒
 RetryPolicy retry2 = new RetryUntilElapsed(60000 * 2, 5000);//重試2分鍾,每次間隔5秒
//普通創建 CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 5000, retry); //fluent風格創建 CuratorFramework client1 = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .connectionTimeoutMs(5000) //連接超時時間 .sessionTimeoutMs(3000) //會話超時時間 .retryPolicy(retry) .build(); //建立連接 client.start();

  如注釋,創建客戶端連接我們通常需要指定重試策略,curator提供了3種重試機制,分別如上;對於fluent風格,就是每個操作都返回了一個對象,我們可以一直通過[.方法名]的方式書寫代碼;client創建了之后,需要調用start方法才能真正去建立連接。會話超時時間是指當連接發生故障時,由於zk的心跳機制檢測,服務端認為會話超時的時間,會清除session;

  2、創建、刪除、更新節點

  連接建立之后,我們可以在服務器上進行創建節點的操作,代碼如下:

 //創建節點 
 String path = client.create()
          .creatingParentsIfNeeded()        //對節點路徑上沒有的節點進行創建
          .withMode(CreateMode.EPHEMERAL)   //臨時節點
          .forPath("/curator/test", "123".getBytes());  //節點路徑,節點的值

 //刪除節點
 client.delete()
        .guaranteed()      //刪除失敗,則客戶端持續刪除,直到節點刪除為止
        .deletingChildrenIfNeeded()   //刪除相關子節點
        .withVersion(-1)    //無視版本,直接刪除
        .forPath("/curator/mytest");
 //更新節點信息
 Stat stat2 = new Stat();
 byte[] theValue2 = client.getData().storingStatIn(stat).forPath("/curator/test");
 client.setData()
        .withVersion(stat2.getVersion())  //版本校驗,與當前版本不一致則更新失敗,-1則無視版本信息進行更新
        .forPath("/curator/test", "456".getBytes());
 //判斷節點是否存在(存在返回節點信息,不存在則返回null)
 Stat s = client.checkExists().forPath("/curator/test");

  持久節點是persistent,我們創建的節點可能有好幾層,如果服務器不存在父節點則會報錯並創建失敗,createingParentsIfNodeed()的作用是在父節點不存在的時候進行創建。刪除操作可能由於網絡抖動等情況導致刪除失敗,由於節點數據操作一般對業務影響較大,故多數都會帶持續刪除的動作來確保正確刪除;節點更新刪除等操作若考慮版本校驗,則采用代碼所示方式,在獲取節點數據的時候對節點狀態進行賦值,然后通過節點狀態可以獲得版本信息。判斷節點是否存在,一般通過節點信息判斷,若不存在,則節點信息為null。

  3、獲取字節點列表

  只有一行代碼,返回string類型的list

 //獲取子節點列表
 List<String> paths = client.getChildren().forPath("/curator");

  4、異步操作

  異步操作不會阻塞代碼執行,對於操作完成后的業務處理,需要設定回調函數來完成。以判斷節點是否存在為例:

 ExecutorService es = Executors.newFixedThreadPool(5);//異步操作線程池,
 //異步判斷操作
 Stat s1 = client.checkExists().inBackground().forPath("/curator/test"); //無回調
 client.checkExists().inBackground(new BackgroundCallback() {  //有回調
     @Override
     public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
         CuratorEventType c = curatorEvent.getType();//事件類型,可在CuratorEventType看到具體種類
         int r = curatorEvent.getResultCode();//0,執行成功,其它,執行失敗
         Object o = curatorEvent.getContext();//事件上下文,一般是由調用方法傳入,供回調函數使用的參數
         String p = curatorEvent.getPath();//節點路徑
         List<String> li = curatorEvent.getChildren();//子節點列表
         byte[] datas = curatorEvent.getData();//節點數據
         //一些其它操作
     }
 },es).forPath("/curator/test");

  異步操作實際是在后台另起一個線程來完成該操作,若線程較多勢必會影響服務器性能,所以要用線程池來盡量降低對服務器的消耗。需要考慮線程池的關閉操作,較繁瑣,不作贅述。

  5、節點、子節點監聽

  節點監聽需要用repices包中的NodeCache來完成,代碼如下:

 //節點監聽
 final NodeCache cache = new NodeCache(client,"/curator/test");
 cache.start();
 cache.getListenable().addListener(new NodeCacheListener() {//監聽對象
     @Override
     public void nodeChanged() throws Exception {//重寫監聽方法
         byte[] ret = cache.getCurrentData().getData();
         System.out.println("當前節點內容是:"+ new String(ret));
     }
 });

  子節點的監聽需要用PathChildrenCache來完成,跟節點本身不一樣,代碼如下:

 //子節點監聽
 final PathChildrenCache pccache = new PathChildrenCache(client,"/curator",true);//true指當子節點變化時,獲取子節點內容
 pccache.start();
 pccache.getListenable().addListener(new PathChildrenCacheListener() {
     @Override
     public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {//重寫監聽方法
         switch (pathChildrenCacheEvent.getType()){//子節點的事件類型
             case CHILD_ADDED:
                 System.out.println(pathChildrenCacheEvent.getData());//通過pathChildrenCacheEvent,可以獲取到節點相關的數據
                 break;
             case CHILD_REMOVED:
                 System.out.println(pathChildrenCacheEvent.getData().getPath());
                 break;
             case CHILD_UPDATED:
                 break;
             default:
                 break;
         }
     }
 });

  6、權限控制部分,略。

  只是簡單羅列了curator的一些基本用法,距離具體應用還是有很大差距,稍后將動手寫一下選主程序來練手試試,希望順利。

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM