zookeeper的原生api相對來說比較繁瑣,比如:對節點添加監聽事件,當監聽觸發后,我們需要再次手動添加監聽,否則監聽只生效一次;再比如,斷線重連也需要我們手動代碼來判斷處理等等。對於curator的介紹,從網上百度了一段:Curator是Netflix開源的一套zookeeper客戶端框架,用它來操作zookeeper更加方便,按Curator官方所比喻的,guava to JAVA,curator to zookeeper,Curator采用了fluent風格的代碼,非常簡潔。
---------------正-----文-----------------------------------------------------------------------------------------------------------------------------------------------
Curator包含6部分,均可提供單獨jar包,每個包簡單介紹如下:
client:zk-client的替代品,提供一些底層處理跟工具類;
framework:高級封裝,大大簡化了zk的客戶端編程,包含對zk的連接管理,重試機制等;
repices:提供了一些常用的操作,比如持續監聽,鎖,選舉等;
utilities:各種工具類;
errors:curator對異常跟錯誤的處理;
extendsion:擴展包;
基本api代碼解釋:
1、建立連接
建立連接需要指定zk地址以及重試策略等,先上代碼再解釋:
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);//重試5次,每次間隔時間指數增長(有具體增長公式) RetryPolicy retry1 = new RetryNTimes(5, 5000);//重試5次,每次間隔5秒 RetryPolicy retry2 = new RetryUntilElapsed(60000 * 2, 5000);//重試2分鍾,每次間隔5秒
//普通創建 CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 5000, retry); //fluent風格創建 CuratorFramework client1 = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .connectionTimeoutMs(5000) //連接超時時間 .sessionTimeoutMs(3000) //會話超時時間 .retryPolicy(retry) .build(); //建立連接 client.start();
如注釋,創建客戶端連接我們通常需要指定重試策略,curator提供了3種重試機制,分別如上;對於fluent風格,就是每個操作都返回了一個對象,我們可以一直通過[.方法名]的方式書寫代碼;client創建了之后,需要調用start方法才能真正去建立連接。會話超時時間是指當連接發生故障時,由於zk的心跳機制檢測,服務端認為會話超時的時間,會清除session;
2、創建、刪除、更新節點
連接建立之后,我們可以在服務器上進行創建節點的操作,代碼如下:
//創建節點
String path = client.create()
.creatingParentsIfNeeded() //對節點路徑上沒有的節點進行創建
.withMode(CreateMode.EPHEMERAL) //臨時節點
.forPath("/curator/test", "123".getBytes()); //節點路徑,節點的值
//刪除節點
client.delete()
.guaranteed() //刪除失敗,則客戶端持續刪除,直到節點刪除為止
.deletingChildrenIfNeeded() //刪除相關子節點
.withVersion(-1) //無視版本,直接刪除
.forPath("/curator/mytest");
//更新節點信息
Stat stat2 = new Stat();
byte[] theValue2 = client.getData().storingStatIn(stat).forPath("/curator/test");
client.setData()
.withVersion(stat2.getVersion()) //版本校驗,與當前版本不一致則更新失敗,-1則無視版本信息進行更新
.forPath("/curator/test", "456".getBytes());
//判斷節點是否存在(存在返回節點信息,不存在則返回null)
Stat s = client.checkExists().forPath("/curator/test");
持久節點是persistent,我們創建的節點可能有好幾層,如果服務器不存在父節點則會報錯並創建失敗,createingParentsIfNodeed()的作用是在父節點不存在的時候進行創建。刪除操作可能由於網絡抖動等情況導致刪除失敗,由於節點數據操作一般對業務影響較大,故多數都會帶持續刪除的動作來確保正確刪除;節點更新刪除等操作若考慮版本校驗,則采用代碼所示方式,在獲取節點數據的時候對節點狀態進行賦值,然后通過節點狀態可以獲得版本信息。判斷節點是否存在,一般通過節點信息判斷,若不存在,則節點信息為null。
3、獲取字節點列表
只有一行代碼,返回string類型的list
//獲取子節點列表
List<String> paths = client.getChildren().forPath("/curator");
4、異步操作
異步操作不會阻塞代碼執行,對於操作完成后的業務處理,需要設定回調函數來完成。以判斷節點是否存在為例:
ExecutorService es = Executors.newFixedThreadPool(5);//異步操作線程池,
//異步判斷操作
Stat s1 = client.checkExists().inBackground().forPath("/curator/test"); //無回調
client.checkExists().inBackground(new BackgroundCallback() { //有回調
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
CuratorEventType c = curatorEvent.getType();//事件類型,可在CuratorEventType看到具體種類
int r = curatorEvent.getResultCode();//0,執行成功,其它,執行失敗
Object o = curatorEvent.getContext();//事件上下文,一般是由調用方法傳入,供回調函數使用的參數
String p = curatorEvent.getPath();//節點路徑
List<String> li = curatorEvent.getChildren();//子節點列表
byte[] datas = curatorEvent.getData();//節點數據
//一些其它操作
}
},es).forPath("/curator/test");
異步操作實際是在后台另起一個線程來完成該操作,若線程較多勢必會影響服務器性能,所以要用線程池來盡量降低對服務器的消耗。需要考慮線程池的關閉操作,較繁瑣,不作贅述。
5、節點、子節點監聽
節點監聽需要用repices包中的NodeCache來完成,代碼如下:
//節點監聽
final NodeCache cache = new NodeCache(client,"/curator/test");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {//監聽對象
@Override
public void nodeChanged() throws Exception {//重寫監聽方法
byte[] ret = cache.getCurrentData().getData();
System.out.println("當前節點內容是:"+ new String(ret));
}
});
子節點的監聽需要用PathChildrenCache來完成,跟節點本身不一樣,代碼如下:
//子節點監聽
final PathChildrenCache pccache = new PathChildrenCache(client,"/curator",true);//true指當子節點變化時,獲取子節點內容
pccache.start();
pccache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {//重寫監聽方法
switch (pathChildrenCacheEvent.getType()){//子節點的事件類型
case CHILD_ADDED:
System.out.println(pathChildrenCacheEvent.getData());//通過pathChildrenCacheEvent,可以獲取到節點相關的數據
break;
case CHILD_REMOVED:
System.out.println(pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_UPDATED:
break;
default:
break;
}
}
});
6、權限控制部分,略。
只是簡單羅列了curator的一些基本用法,距離具體應用還是有很大差距,稍后將動手寫一下選主程序來練手試試,希望順利。
