一、監控數據變化,並且只監控一次
1、java代碼
public class ZkDemo {
public static Logger logger = Logger.getLogger(ZkDemo.class);
private static String connectString = "192.168.229.129:2181";
private static int SESSION_TIME_OUT = 60 * 1000;
private ZooKeeper zk = null;
private String newValue = null;
public String getOldValue() {
return oldValue;
}
public void setOldValue(String oldValue) {
this.oldValue = oldValue;
}
private String oldValue = null;
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public String getNewValue() {
return newValue;
}
public void setNewValue(String newValue) {
this.newValue = newValue;
}
/**
* 啟動並連接 Zookeeper
*/
public ZooKeeper start() throws IOException {
return new ZooKeeper(connectString, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
/**
* 創建節點
*/
public void createNode(String path, String data) throws IOException, KeeperException, InterruptedException {
// 返回當前節點的名稱
String currentNode = zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
logger.info("create zookeeper node success,currentNode******" + currentNode);
}
/**
* 獲取當前節點的數據,並且設置觀察者
*/
public String getNodeData(String path) throws IOException, KeeperException, InterruptedException {
byte[] oldData = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
// 該節點的值是否發生變化的標識,true:發生變化,false:未發生變化
Boolean flag = triggerWathcher(path);
logger.info("*******flag:" + flag);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
// 獲取當前節點的值,如果當前節點的值發生變化,將新值覆蓋原先的值
if (newValue != null && !oldValue.equals(newValue)) {
oldValue = newValue;
}
oldValue = new String(oldData, "UTF-8");
return oldValue;
}
/**
* 觸發觀察者
*/
public Boolean triggerWathcher(String path) throws IOException, KeeperException, InterruptedException {
byte[] newData = zk.getData(path, false, new Stat());
newValue = new String(newData, "UTF-8");
if (oldValue.equals(newValue)) {
logger.info("******this node value is no change");
return false;
} else {
logger.info("******oldValue:" + oldValue + "******" + "newValue:" + newValue);
return true;
}
}
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
ZkDemo zkDemo = new ZkDemo();
zkDemo.setZk(zkDemo.start());
// 如果不存在 /xiaomao 這個節點,那么就創建該節點
if (zkDemo.getZk().exists("/xiaomao", false) == null) {
zkDemo.createNode("/xiaomao", "123");
}
zkDemo.getNodeData("/xiaomao");
Thread.sleep(Long.MAX_VALUE);
}
}
2、執行完 java 代碼 zookeeper 客戶端的變化情況
// 查看根節點下所有的子節點,可以看到 /xiaomao 這個子節點已經創建出來了
[zk: localhost:2181(CONNECTED) 75] ls /
[zookeeper, xiaomao]
// 獲取節點 /xiaomao 的值,可以看到它的值就是我們設置的 123
[zk: localhost:2181(CONNECTED) 76] get /xiaomao
123
cZxid = 0x143
ctime = Mon Oct 12 08:00:54 CST 2020
mZxid = 0x143
mtime = Mon Oct 12 08:00:54 CST 2020
pZxid = 0x143
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
3、手動在客戶端變更節點 /xiaomao 的值
// 手動將節點 /xiaomao 的值修改為456
[zk: localhost:2181(CONNECTED) 77] set /xiaomao 456
cZxid = 0x143
ctime = Mon Oct 12 08:00:54 CST 2020
mZxid = 0x144
mtime = Mon Oct 12 08:01:18 CST 2020
pZxid = 0x143
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
// 再一次手動將節點 /xiaomao 的值修改為789
[zk: localhost:2181(CONNECTED) 78] set /xiaomao 789
cZxid = 0x143
ctime = Mon Oct 12 08:00:54 CST 2020
mZxid = 0x145
mtime = Mon Oct 12 08:01:23 CST 2020
pZxid = 0x143
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
// 獲取節點 /xiaomao 現在的值,可以看到值為789
[zk: localhost:2181(CONNECTED) 79] get /xiaomao
789
cZxid = 0x143
ctime = Mon Oct 12 08:00:54 CST 2020
mZxid = 0x145
mtime = Mon Oct 12 08:01:23 CST 2020
pZxid = 0x143
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
4、控制台變化
// 當前節點的名稱
2020-10-12 00:00:56,199 INFO [ZkDemo] - create zookeeper node success,currentNode******/xiaomao
2020-10-12 00:00:56,203 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40014, packet::
2020-10-12 00:01:09,538 DEBUG [org.apache.zookeeper.ClientCnxn] - Got ping response for sessionid: 0x100000365a40014 after 2ms
2020-10-12 00:01:20,361 DEBUG [org.apache.zookeeper.ClientCnxn] - Got notification sessionid:0x100000365a40014
2020-10-12 00:01:20,362 DEBUG [org.apache.zookeeper.ClientCnxn] - Got WatchedEvent state:SyncConnected
2020-10-12 00:01:20,363 DEBUG [org.apache.zookeeper.ClientCnxn] - Got ping response for sessionid: 0x100000365a40014 after 2ms
2020-10-12 00:01:20,364 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40014, packet::
// 雖然我們在 Zookeeper 的控制台設置了兩次值,一次是設置為 456 ,另外一次設置為 789 ,但是由於我們是一次性的監控,所以控制台只有一次變化的值
2020-10-12 00:01:20,364 INFO [ZkDemo] - ******oldValue:123******newValue:456
2020-10-12 00:01:20,364 INFO [ZkDemo] - *******flag:true
二、監控數據變化,動態監控(多次監控)
1、java代碼
public class ZkDemo {
public static Logger logger = Logger.getLogger(ZkDemo.class);
private static String connectString = "192.168.229.129:2181";
private static int SESSION_TIME_OUT = 60 * 1000;
private ZooKeeper zk = null;
private String newValue = null;
public String getOldValue() {
return oldValue;
}
public void setOldValue(String oldValue) {
this.oldValue = oldValue;
}
private String oldValue = null;
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public String getNewValue() {
return newValue;
}
public void setNewValue(String newValue) {
this.newValue = newValue;
}
/**
* 啟動並連接 Zookeeper
*/
public ZooKeeper start() throws IOException {
return new ZooKeeper(connectString, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
/**
* 創建節點
*/
public void createNode(String path, String data) throws IOException, KeeperException, InterruptedException {
// 返回當前節點的名稱
String currentNode = zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
logger.info("create zookeeper node success,currentNode******" + currentNode);
}
/**
* 獲取當前節點的數據,並且設置觀察者
*/
public String getNodeData(String path) throws IOException, KeeperException, InterruptedException {
byte[] oldData = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
// 該節點的值是否發生變化的標識,true:發生變化,false:未發生變化
Boolean flag = triggerWathcher(path);
logger.info("*******flag:" + flag);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
oldValue = new String(oldData, "UTF-8");
return oldValue;
}
/**
* 觸發觀察者
*/
public Boolean triggerWathcher(String path) throws IOException, KeeperException, InterruptedException {
byte[] newData = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
// 回調自身,類似於遞歸,可以實現實時監控
triggerWathcher(path);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
newValue = new String(newData, "UTF-8");
if (oldValue.equals(newValue)) {
logger.info("******this node value is no change");
return false;
} else {
logger.info("******oldValue:" + oldValue + "******" + "newValue:" + newValue);
// 如果值發生了變化,將新值覆蓋原來的值
oldValue = newValue;
return true;
}
}
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
ZkDemo zkDemo = new ZkDemo();
zkDemo.setZk(zkDemo.start());
// 如果不存在 /xiaomao 這個節點,那么就創建該節點
if (zkDemo.getZk().exists("/xiaomaomao", false) == null) {
zkDemo.createNode("/xiaomaomao", "123");
}
zkDemo.getNodeData("/xiaomaomao");
Thread.sleep(Long.MAX_VALUE);
}
}
2、Zookeeper 客戶端操作
// 獲取根節點的值
[zk: localhost:2181(CONNECTED) 82] ls /
[xiaomaomao, zookeeper]
[zk: localhost:2181(CONNECTED) 83] get /xiaomaomao
123
cZxid = 0x149
ctime = Mon Oct 12 08:10:14 CST 2020
mZxid = 0x149
mtime = Mon Oct 12 08:10:14 CST 2020
pZxid = 0x149
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
// 修改節點 /xiaomaomao 的值(原先是 123 ,設置一個相同的值,看一下效果)
[zk: localhost:2181(CONNECTED) 84] set /xiaomaomao 123
cZxid = 0x149
ctime = Mon Oct 12 08:10:14 CST 2020
mZxid = 0x14a
mtime = Mon Oct 12 08:10:32 CST 2020
pZxid = 0x149
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
// 修改節點 /xiaomaomao 的值為 456
[zk: localhost:2181(CONNECTED) 85] set /xiaomaomao 456
cZxid = 0x149
ctime = Mon Oct 12 08:10:14 CST 2020
mZxid = 0x14b
mtime = Mon Oct 12 08:10:38 CST 2020
pZxid = 0x149
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
// 修改節點 /xiaomaomao 的值為 789
[zk: localhost:2181(CONNECTED) 86] set /xiaomaomao 789
cZxid = 0x149
ctime = Mon Oct 12 08:10:14 CST 2020
mZxid = 0x14c
mtime = Mon Oct 12 08:10:42 CST 2020
pZxid = 0x149
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
3、控制台變化
// 設置一個相同的值時,控制台的變化情況
2020-10-12 00:10:33,553 INFO [ZkDemo] - ******this node value is no change
2020-10-12 00:10:33,553 INFO [ZkDemo] - *******flag:false
2020-10-12 00:10:39,593 DEBUG [org.apache.zookeeper.ClientCnxn] - Got notification sessionid:0x100000365a40015
2020-10-12 00:10:39,594 DEBUG [org.apache.zookeeper.ClientCnxn] - Got WatchedEvent state:SyncConnected
2020-10-12 00:10:39,595 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40015,
// 將節點的值修改為 456
2020-10-12 00:10:39,595 INFO [ZkDemo] - ******oldValue:123******newValue:456
2020-10-12 00:10:43,993 DEBUG [org.apache.zookeeper.ClientCnxn] - Got notification sessionid:0x100000365a40015
2020-10-12 00:10:43,993 DEBUG [org.apache.zookeeper.ClientCnxn] - Got WatchedEvent state:SyncConnected type:
2020-10-12 00:10:43,995 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40015,
// 將節點的值修改為 456
2020-10-12 00:10:43,995 INFO [ZkDemo] - ******oldValue:456******newValue:789
