Curator提供兩種Watcher來監聽節點的變化。
后文中的ct:
CuratorFramework ct; @Before public void before(){ ct = CuratorFrameworkFactory.builder() //ip:端口 .connectString("192.168.10.132:2181,192.168.10.133:2181,192.168.10.135:2181") //超時時間 .sessionTimeoutMs(5000) //連接斷開5秒后,會進行一次重連 .retryPolicy(new RetryOneTime(5000)) //命名空間,該命名空間作為父節點 .namespace("ct").build(); //打開連接 ct.start(); } @After public void after(){ ct.close(); }
1.Node Cache
只是監聽某一個特定的節點,監聽節點的新增和修改
//監視某個節點的數據變化 final NodeCache nodeCache = new NodeCache(ct, "/node1"); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { //節點變化時的回調方法 public void nodeChanged() throws Exception { System.out.println(nodeCache.getCurrentData().getPath()); System.out.println(new String(nodeCache.getCurrentData().getData())); } }); Thread.sleep(100000); nodeCache.close();
2.PathChildren Cache
監控一個ZNode的子節點,當一個子節點增加更新刪除時。Path Cache會改變它的狀態,會包含最新的子節點、數據和狀態。
//第三個參數:事件中是否可以獲取節點的數據 PathChildrenCache cache = new PathChildrenCache(ct, "/node1", true); cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { //節點變化時的回調方法 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println(event.getType()); System.out.println(event.getData().getPath()); System.out.println(new String(event.getData().getData())); } }); Thread.sleep(100000); cache.close();
3.事務
一系列操作要么全部成功,要么全部失敗
//開啟事務 ct.inTransaction() .create().forPath("/w","w".getBytes()) .and() .delete().forPath("/zxc") //提交事務 .and().commit();