參考:從Paxos到Zookeeper分布式一致性原理和實踐
使用的zk依賴是cdh5.16.2的3.4.5
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5-cdh5.16.2</version>
</dependency>
代碼,exist函數來檢查節點是否存在,同時會注冊一個watch
package com.bigdata.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZkExample implements Watcher {
public static CountDownLatch connectedSemaphore = new CountDownLatch(1);
// private static Stat stat = new Stat();
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
zk = new ZooKeeper("master:2181", 5000, new ZkExample());
System.out.println(zk.getState());
try {
connectedSemaphore.await();
// 創建一個節點
String path = "/app6";
// 注冊watch
Stat stat = zk.exists(path, true);
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// version=-1,修改節點
zk.setData(path, "555".getBytes(), -1);
// 注冊watch
zk.exists(path, true);
System.out.println(stat.getVersion());
// version=1,修改節點
Stat stat2 = zk.setData(path, "55555".getBytes(), stat.getVersion());
// 注冊watch
zk.exists(path, true);
System.out.println(stat2.getVersion());
// version=-1,刪除節點,正常
zk.delete(path, stat2.getVersion());
Thread.sleep(10000); // 10秒延時
} catch (InterruptedException e) {
System.out.println("Zk session established" + e);
}
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent);
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
connectedSemaphore.countDown();
} else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged ||
watchedEvent.getType() == Event.EventType.NodeDeleted || watchedEvent.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println(zk.getChildren(watchedEvent.getPath(), true));
//
} catch (Exception e) {
System.out.println(e);
}
}
}
}
}
輸出

