zookeeper的原生api相對來說比較繁瑣,比如:對節點添加監聽事件,當監聽觸發后,我們需要再次手動添加監聽,否則監聽只生效一次;再比如,斷線重連也需要我們手動代碼來判斷處理等等。對於curator的介紹,從網上百度了一段:Curator是Netflix開源的一套zookeeper客戶端框架,用它來操作zookeeper更加方便,按Curator官方所比喻的,guava to JAVA,curator to zookeeper,Curator采用了fluent風格的代碼,非常簡潔。
---------------正-----文-----------------------------------------------------------------------------------------------------------------------------------------------
Curator包含6部分,均可提供單獨jar包,每個包簡單介紹如下:
client:zk-client的替代品,提供一些底層處理跟工具類;
framework:高級封裝,大大簡化了zk的客戶端編程,包含對zk的連接管理,重試機制等;
repices:提供了一些常用的操作,比如持續監聽,鎖,選舉等;
utilities:各種工具類;
errors:curator對異常跟錯誤的處理;
extendsion:擴展包;
基本api代碼解釋:
1、建立連接
建立連接需要指定zk地址以及重試策略等,先上代碼再解釋:
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);//重試5次,每次間隔時間指數增長(有具體增長公式) RetryPolicy retry1 = new RetryNTimes(5, 5000);//重試5次,每次間隔5秒 RetryPolicy retry2 = new RetryUntilElapsed(60000 * 2, 5000);//重試2分鍾,每次間隔5秒
//普通創建 CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 5000, retry); //fluent風格創建 CuratorFramework client1 = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .connectionTimeoutMs(5000) //連接超時時間 .sessionTimeoutMs(3000) //會話超時時間 .retryPolicy(retry) .build(); //建立連接 client.start();
如注釋,創建客戶端連接我們通常需要指定重試策略,curator提供了3種重試機制,分別如上;對於fluent風格,就是每個操作都返回了一個對象,我們可以一直通過[.方法名]的方式書寫代碼;client創建了之后,需要調用start方法才能真正去建立連接。會話超時時間是指當連接發生故障時,由於zk的心跳機制檢測,服務端認為會話超時的時間,會清除session;
2、創建、刪除、更新節點
連接建立之后,我們可以在服務器上進行創建節點的操作,代碼如下:
//創建節點 String path = client.create() .creatingParentsIfNeeded() //對節點路徑上沒有的節點進行創建 .withMode(CreateMode.EPHEMERAL) //臨時節點 .forPath("/curator/test", "123".getBytes()); //節點路徑,節點的值 //刪除節點 client.delete() .guaranteed() //刪除失敗,則客戶端持續刪除,直到節點刪除為止 .deletingChildrenIfNeeded() //刪除相關子節點 .withVersion(-1) //無視版本,直接刪除 .forPath("/curator/mytest"); //更新節點信息 Stat stat2 = new Stat(); byte[] theValue2 = client.getData().storingStatIn(stat).forPath("/curator/test"); client.setData() .withVersion(stat2.getVersion()) //版本校驗,與當前版本不一致則更新失敗,-1則無視版本信息進行更新 .forPath("/curator/test", "456".getBytes()); //判斷節點是否存在(存在返回節點信息,不存在則返回null) Stat s = client.checkExists().forPath("/curator/test");
持久節點是persistent,我們創建的節點可能有好幾層,如果服務器不存在父節點則會報錯並創建失敗,createingParentsIfNodeed()的作用是在父節點不存在的時候進行創建。刪除操作可能由於網絡抖動等情況導致刪除失敗,由於節點數據操作一般對業務影響較大,故多數都會帶持續刪除的動作來確保正確刪除;節點更新刪除等操作若考慮版本校驗,則采用代碼所示方式,在獲取節點數據的時候對節點狀態進行賦值,然后通過節點狀態可以獲得版本信息。判斷節點是否存在,一般通過節點信息判斷,若不存在,則節點信息為null。
3、獲取字節點列表
只有一行代碼,返回string類型的list
//獲取子節點列表 List<String> paths = client.getChildren().forPath("/curator");
4、異步操作
異步操作不會阻塞代碼執行,對於操作完成后的業務處理,需要設定回調函數來完成。以判斷節點是否存在為例:
ExecutorService es = Executors.newFixedThreadPool(5);//異步操作線程池, //異步判斷操作 Stat s1 = client.checkExists().inBackground().forPath("/curator/test"); //無回調 client.checkExists().inBackground(new BackgroundCallback() { //有回調 @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { CuratorEventType c = curatorEvent.getType();//事件類型,可在CuratorEventType看到具體種類 int r = curatorEvent.getResultCode();//0,執行成功,其它,執行失敗 Object o = curatorEvent.getContext();//事件上下文,一般是由調用方法傳入,供回調函數使用的參數 String p = curatorEvent.getPath();//節點路徑 List<String> li = curatorEvent.getChildren();//子節點列表 byte[] datas = curatorEvent.getData();//節點數據 //一些其它操作 } },es).forPath("/curator/test");
異步操作實際是在后台另起一個線程來完成該操作,若線程較多勢必會影響服務器性能,所以要用線程池來盡量降低對服務器的消耗。需要考慮線程池的關閉操作,較繁瑣,不作贅述。
5、節點、子節點監聽
節點監聽需要用repices包中的NodeCache來完成,代碼如下:
//節點監聽 final NodeCache cache = new NodeCache(client,"/curator/test"); cache.start(); cache.getListenable().addListener(new NodeCacheListener() {//監聽對象 @Override public void nodeChanged() throws Exception {//重寫監聽方法 byte[] ret = cache.getCurrentData().getData(); System.out.println("當前節點內容是:"+ new String(ret)); } });
子節點的監聽需要用PathChildrenCache來完成,跟節點本身不一樣,代碼如下:
//子節點監聽 final PathChildrenCache pccache = new PathChildrenCache(client,"/curator",true);//true指當子節點變化時,獲取子節點內容 pccache.start(); pccache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {//重寫監聽方法 switch (pathChildrenCacheEvent.getType()){//子節點的事件類型 case CHILD_ADDED: System.out.println(pathChildrenCacheEvent.getData());//通過pathChildrenCacheEvent,可以獲取到節點相關的數據 break; case CHILD_REMOVED: System.out.println(pathChildrenCacheEvent.getData().getPath()); break; case CHILD_UPDATED: break; default: break; } } });
6、權限控制部分,略。
只是簡單羅列了curator的一些基本用法,距離具體應用還是有很大差距,稍后將動手寫一下選主程序來練手試試,希望順利。