作者:fredalxin
地址:https://fredal.xin/zookeeper-watcher
我們可以使用 zookeeper 作為注冊中心來實現服務的注冊與發現,curator 框架提供了 curator-x-discovery 擴展實現了開箱即用的服務注冊發現,但更多時候我們還是選擇自己去實現,那這個時候我們需要額外關注 zookeeper 的 1 個特性,即 wathcer。
在微服務場景中,watcher 機制主要提供了服務通知功能,比如 Instance1 這個實例在 Service1 服務節點下注冊了一個 emphemeral 子節點后,它的某個服務消費者根據依賴配置在 Service1 節點上注冊了一個子節點 watcher,就如圖中的紅鑰匙。
子節點類型的 watcher 會觀測 Service1 的子節點,即 InstanceX 節點,但不會觀測孫子節點 config1。那么當 Instance1 節點掛掉之后,watcher 可以做到通知給注冊自身的那個服務消費者,通知完一次后 wacther 也就被銷毀了。
wacther 原理框架
zookeeper 的 watcher 主要由 client、server 以及 watchManager 之間配合完成,包括 watcher 注冊以及觸發 2 個階段。
在 client 端注冊表為 ZkWatchManager,其中包括了 dataWatches、existWatches 以及 childWatches。在 server 端的注冊表在 DataTree 類中,封裝了 2 類 WatchManager,即 dataWatches 和 existWatches。dataWatches 代表當前節點的數據監聽,childWathes 代表子節點監聽,與 client 比少的 existWatches 也很容易理解,因為節點是否存在需要客戶端去判斷。
注冊階段客戶端的 getData 和 exists 請求可以注冊 dataWatches,getChilden 可以注冊 childWatches。而觸發階段,setData 請求會觸發當前節點 dataWatches,create 請求會觸發當前節點 dataWatches 以及父節點的 childWatches,delete 請求則會觸發當前節點、父節點、子節點的 dataWatches,以及父節點的 childWatches。
watchManager包含兩個非常重要的數據結構:watchTable和watch2Paths。前者表示path-set
請求階段的傳輸數據(包括 watcher 信息)會封裝在 request 和 response 中,比如 getData 請求會封裝 getDataRequest/getDataResponse。而觸發階段的 watcher 通知則通過事件 event 進行通信,server 端會發送一個 watcherEvent,而 client 端則會將其轉換成 watchedEvent 再進行處理。
每個客戶端都會維護 2 個線程,SendThread 負責處理客戶端與服務端的請求通信,比如發送 getDataRequest,而 EventThread 則負責處理服務端的事件通知,即 watcher 的事件。
watcher 注冊源碼
我們來看看 watcher 注冊的部分源碼。首先是在客戶端,以 Zookeeper 中 getData 方法為例,會入隊一個 watch 為 true 的 packet。
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException {
...
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
...
}
可以看到這邊封裝了 GetDataRequest 以及 GetDataResponse,而 request 中設置了 watch 參數為 true,最后將其進行 submitRequest,submitRequest 干的事兒其實就是將這些放入事件隊列等待 sendThread 調度發送。
接着這個請求會被服務端所接收到,所有請求的服務端處理都在 FinalRequestProcessor#processRequest 方法中進行。
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
...
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
...
}
這邊會通過一些 case 來判斷請求類型,還是以 getData 為例,最終會調用到 DataTree 的 getData 方法,我們之前講到 DataTree 里包含了 2 種 watcher,那這邊除了獲取數據外,自然是注冊 dataWatchers 了。
public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
DataNode n = (DataNode)this.nodes.get(path);
if (n == null) {
throw new NoNodeException();
} else {
synchronized(n) {
n.copyStat(stat);
if (watcher != null) {
this.dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
}
addWatch 方法主要是將數據節點的路徑以及 ServerCnxn(遠程通信信息) 信息存儲到 WatchManager 的 watchTable 和 watch2Paths 中。至此服務端已經接受到了 watcher 並注冊到了 watchManager 中了。
我們將客戶端自己也會保存一個 watchManager,這里其實是在接收到 getData 響應后進行的,在 ClientCnxn$SendThread 類的 readResponse->finishPacket 方法中。
private void finishPacket(ClientCnxn.Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized(p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
this.eventThread.queuePacket(p);
}
}
可以看到這邊調用了 watchRegistration 的 register 方法,而它就是根據請求類型來裝入對應的 watchManager 中了(dataWatches、existWatches、childWatches)。
整個大致的時序圖可以參考下面:
watcher 觸發源碼
wathcer 觸發部分,我們還以 服務端 DataTree 類處理 setData 請求 為例。
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
...
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
可以看到在處理完數據后調用了 triggerWatch,它干的事兒是從之前的 watchManager 中獲得 watchers,然后一個個調用 process 方法。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
獲取了需要本次觸發的監聽后,在 watchTable 和 watch2Paths 中還移除了自身,所以 watcher 是單次的。這里封裝好了 watchedEvent 后塞入到了 Watcher的process 方法中,process 方法其實就是發送通知,以 Watcher的一個實現類NioServerCnxn 為例就是調用了其 sendResponse 方法將通知事件發送到客戶端,發送前會將 watchedEvent 轉換成 watcherEvent 進行發送。
那么客戶端首先接收到請求的仍然是 ClientCnxn$sendThread 的 readResponse 方法,這里講 watcherEvent 轉換為 watchedEvent 后入列 eventThread 的事件隊列 等待后續進行處理。
...
WatchedEvent we = new WatchedEvent(event);
if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
}
ClientCnxn.this.eventThread.queueEvent(we);
...
我們直接看下 EventThread 的 run 方法吧,方法很簡單,就是不斷從 waitingEvents 事件隊列中取通知事件。然后調用 processEvent 方法處理事件。
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
...省略
}
這里就是簡單地取出本次事件需要通知的 watcher 集合,然后循環調用每個 watcher 的 process 方法了。那么在自己實現服務注冊發現的場景里,顯然 watcher 的 process 方法是我們自定義的啦。
整個 watcher 觸發的時序圖可以參考下面:
至此,zookeeper 的整個 watcher 交互邏輯就已經結束了。
近期熱文推薦:
1.600+ 道 Java面試題及答案整理(2021最新版)
2.終於靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式發布,全新顛覆性版本!
覺得不錯,別忘了隨手點贊+轉發哦!