Curator提供兩種Watcher來監聽節點的變化。
后文中的ct:
CuratorFramework ct;
@Before
public void before(){
ct = CuratorFrameworkFactory.builder()
//ip:端口
.connectString("192.168.10.132:2181,192.168.10.133:2181,192.168.10.135:2181")
//超時時間
.sessionTimeoutMs(5000)
//連接斷開5秒后,會進行一次重連
.retryPolicy(new RetryOneTime(5000))
//命名空間,該命名空間作為父節點
.namespace("ct").build();
//打開連接
ct.start();
}
@After
public void after(){
ct.close();
}
1.Node Cache
只是監聽某一個特定的節點,監聽節點的新增和修改
//監視某個節點的數據變化
final NodeCache nodeCache = new NodeCache(ct, "/node1");
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
//節點變化時的回調方法
public void nodeChanged() throws Exception {
System.out.println(nodeCache.getCurrentData().getPath());
System.out.println(new String(nodeCache.getCurrentData().getData()));
}
});
Thread.sleep(100000);
nodeCache.close();
2.PathChildren Cache
監控一個ZNode的子節點,當一個子節點增加更新刪除時。Path Cache會改變它的狀態,會包含最新的子節點、數據和狀態。
//第三個參數:事件中是否可以獲取節點的數據
PathChildrenCache cache = new PathChildrenCache(ct, "/node1", true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
//節點變化時的回調方法
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println(event.getType());
System.out.println(event.getData().getPath());
System.out.println(new String(event.getData().getData()));
}
});
Thread.sleep(100000);
cache.close();
3.事務
一系列操作要么全部成功,要么全部失敗
//開啟事務
ct.inTransaction()
.create().forPath("/w","w".getBytes())
.and()
.delete().forPath("/zxc")
//提交事務
.and().commit();
