2020-02-08 補充本篇博文所描述的watcher回調的流程圖
watcher存在的必要性
舉個特容易懂的例子: 假如我的項目是基於dubbo+zookeeper搭建的分布式項目, 我有三個功能相同的服務提供者,用zookeeper當成注冊中心,我的三個項目得注冊進zookeeper才能對外暴露服務,但是問題來了,寫的java代碼怎么才能注冊進zookeeper呢?當然加入依賴,寫好配置文件再啟動就成了,這時,這三個服務體提供者就是zookeeper的客戶端了,zookeeper的客戶端不止一個,我選擇了哪個依賴,就是哪個客戶端,光有服務提供者不成啊,對外提供服務,我得需要服務消費者啊,於是用同樣的方式,把消費者也注冊進zookeeper,zookeeper中就存在了4個node,也就是4個客戶端,服務消費者訂閱zookeeper,向它拉取服務提供者的address,然后把地址緩存在本地, 進而可以遠程調用服務消費者,那么問題又來了,萬一哪一台服務提供者掛了,怎么辦呢?zookeeper是不是得通知消費者呢? 萬一哪一天服務提供者的address變了,是不是也得通知消費者? 這就是watcher存在的意義,它解決了這件事
watcher的類型
keeperState | EventType | 觸發條件 | 說明 |
---|---|---|---|
SyncConnected | None(-1) | 客戶端與服務端建立連接 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeCreate(1) | watcher監聽的數據節點被創建 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeDeleted(2) | Watcher監聽的數據節點被刪除 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeDataChanged(3) | watcher監聽的node數據內容發生改變 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeChildrenChange(4) | 被監聽的數據節點的節點列表發生變更 | 客戶端與服務端處於連接狀態 |
Disconnect | None(-1) | 客戶端與服務端斷開連接 | 客戶端與服務端斷開連接 |
Expired (-112) | None(-1) | 會話超時 | session過期,收到異常SessionExpiredException |
AuthFailed | None(-1) | 1.使用了錯誤的scheme 2,SALS權限驗證失敗了 | 收到異常AuthFailedException |
實驗場景:
假設我們已經成功啟動了zookeeper的服務端和客戶端,並且預先添加了watcher,然后使用控制台動態的修改下node的data,我們會發現watcher回調的現象
添加的鈎子函數代碼如下:
public class ZookepperClientTest {
public static void main(String[] args) throws Exception {
ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.err.println("連接,觸發");
}
});
Stat stat = new Stat();
// todo 下面添加的事件監聽器可是實現事件的消費訂閱
String content = new String(client.getData("/node1", new Watcher() {
@Override
public void process(WatchedEvent event) {
// todo 任何連接上這個節點的客戶端修改了這個節點的 data數據,都會引起process函數的回調
// todo 特點1: watch只能使用1次
if (event.getType().equals(Event.EventType.NodeDataChanged)){
System.err.println("當前節點數據發生了改變");
}
}
}, stat));
看如上的代碼, 添加了一個自己的watcher
也就是client.getData("/node1", new Watcher() {}
這是個回調的鈎子函數,執行時不會運行,當滿足的某個條件時才會執行, 比如: node1被刪除了, node1的data被修改了
getData做了哪些事情?
源碼如下: getdata,顧名思義,返回服務端的node的data+stat, 當然是當服務端的node發生了變化后調用的
主要主流如下幾個工作
- 創建
WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
- 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象
- 創建一個request,並且初始化這個
request.head=getData=4
- 調用ClientCnxn.submitRequest(...) , 將現存的這些信息進一步封裝
- request.setWatch(watcher != null);說明他並沒有將watcher封裝進去,而是僅僅做了個有沒有watcher的標記
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
// todo 校驗path
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
// todo DataWatchRegistration 繼承了 WatchRegistration
// todo DataWatchRegistration 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
// todo 創建一個請求頭
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
// todo 創建了一個GetDataRequest
GetDataRequest request = new GetDataRequest();
// todo 給這個請求初始化,path 是傳遞進來的path,但是 watcher不是!!! 如果我們給定了watcher , 這里面的條件就是 true
request.setPath(serverPath);
request.setWatch(watcher != null); // todo 可看看看服務端接收到請求是怎么辦的
GetDataResponse response = new GetDataResponse();
// todo 同樣由 clientCnxn 上下文進行提交請求, 這個操作應該同樣是阻塞的
// todo EventThread 和 SendThread 同時使用一份 clientCnxn的 submitRequest()
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
的源碼我寫在下面, 這里來到這個方法中,一眼能看到,它依然是阻塞的式的,並且requet被進一步封裝進packet
更重要的是 queuePacket()
方法的最后一個參數,存在我們剛剛創建的path+watcher的封裝類
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// todo 來到這個 queuePacket() 方法在下面, 這個方法就是將 用戶輸入-> string ->>> request ->>> packet 的過程
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
// todo 使用同步代碼塊,在下面的進行 同步阻塞等待, 直到有了Response響應才會跳出這個循環, 這個finished狀態就是在客戶端接受到服務端的
// todo 的響應后, 將服務端的響應解析出來,然后放置到 pendingqueue里時,設置上去的
synchronized (packet) {
while (!packet.finished) {
// todo 這個等待是需要喚醒的
packet.wait();
}
}
// todo 直到上面的代碼塊被喚醒,才會這個方法才會返回
return r;
}
同樣,在queuePacket()方法中將packet提交到outgoingQueue中,最終被seadThread消費發送到服務端
服務端如何處理watchRegistration不為空的packet
后續我准備用一整篇博客詳解單機模式下服務端處理請求的流程,所以這篇博客只說結論
在服務端,用戶的請求最終會按順序流向三個Processor
,分別是
- PrepRequestProcessor
- 負責進行一些狀態的修改
- SyncRequestProcessor
- 將事務日志同步到磁盤
- FinalRequestProcessor
- 相應用戶的請求
我們直接去看FinalRequestProcessor
的 public void processRequest(Request request) {}方法
,看他針對getData()
方式的請求做出了哪些動作.下面來了個小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
跟進watcher的有無給服務端添加不同的Watcher
真的得划重點了,當我發現這一點時,我的心情是超級激動的,就像發現了新大陸一樣
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
// todo 這里的操作 getDataRequest.getWatch() ? cnxn : null 對應可客戶端的 跟進watcher有沒有而決定往服務端傳遞 true 還是false 相關
// todo 跟進去 getData()
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
//todo cnxn的Processor()被回調, 往客戶端發送數據 , 什么時候觸發呢? 就是上面的 處理事務時的回調 第127行
// todo 構建了一個 rsp ,在本類的最后面將rsp 響應給client
rsp = new GetDataResponse(b, stat);
break;
}
繼續跟進這個getData()
在服務端維護了一份path+watcher的map
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
// todo 將path 和 watcher 綁定在一起
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
客戶端打開命令行,修改服務端node的狀態
書接上回,當客戶單的代碼去創建ClientCnxn
時,有下面的邏輯 , 它開啟了兩條守護線程, sendThread負責向服務端發送心跳,已經和服務端進行用戶相關的IO交流, EventThread就負責和txn事務相關的處理邏輯,級別上升到針對node
// todo start就是啟動了在構造方法中創建的線程
public void start() {
sendThread.start();
eventThread.start();
}
到目前為止,客戶端就有如下三條線程了
- 負責處理用戶在控制台輸入命令的主線程
- 守護線程1: seadThread
- 守護線程2: eventThread
跟進主線程的處理用戶輸入部分的邏輯代碼如下:
下面的代碼的主要邏輯就是處理用戶輸入的命令,當通過if-else選擇分支判斷用戶到底輸入的啥命令
按照我們的假定的場景,用戶輸入的命令是這樣的 set /path newValue
所以,毫無疑問,經過解析后代碼會去執行下面的stat = zk.setData(path, args[2].getBytes(),
部分
// todo zookeeper客戶端, 處理用戶輸入命令的具體邏輯
// todo 用大白話講,下面其實就是把 從控制台獲取的用戶的輸入信息轉換成指定的字符, 然后發送到服務端
// todo MyCommandOptions 是處理命令行選項和shell腳本的工具類
protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
// todo 在這個方法中可以看到很多的命令行所支持的命令
Stat stat = new Stat();
// todo 獲取命令行輸入中 0 1 2 3 ... 位置的內容, 比如 0 位置是命令 1 2 3 位置可能就是不同的參數
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
return false;
}
if (!commandMap.containsKey(cmd)) {
usage();
return false;
}
boolean watch = args.length > 2;
String path = null;
List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
LOG.debug("Processing " + cmd);
if (cmd.equals("quit")) {
System.out.println("Quitting...");
zk.close();
System.exit(0);
} else if (cmd.equals("set") && args.length >= 3) {
path = args[1];
stat = zk.setData(path, args[2].getBytes(),
args.length > 3 ? Integer.parseInt(args[3]) : -1);
printStat(stat);
繼續跟進stat = zk.setData(path, args[2].getBytes(),
下面的邏輯也很簡單,就是將用戶的輸入封裝進來request中,通過ClientCnxn
類的submit方法提交到一個隊列中,等待着sendThread去消費
這次有目的的看一下submitRequest的最后一個參數為null, 這個參數是WatchRegistration的位置,一開始置為null
public Stat setData(final String path, byte data[], int version)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setData);
SetDataRequest request = new SetDataRequest();
request.setPath(serverPath);
request.setData(data);
request.setVersion(version);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat();
}
跟進這個submitRequest()方法, 源碼如下,不處所料的是,它同樣被阻塞住了,直到服務端給了它響應
當前代碼的主要邏輯就是將request封裝進packet,然后將packet添加到ClintCnxn維護的outgoingQueue隊列中等待sendThread的消費
這次來到這個方法是因為我們在控制台輸入的set 命令而觸發的,比較重要的是本次packet攜帶的WatchRegistration==null, 毫無疑問,這次服務端在FinalRequestProcessor中再處理時取出的watcher==null, 也就不會將path+watcher保存進maptable中
重要:發送事務消息
在FinalRequestProcessor
的public void processRequest(Request request) {}
方法中,有如下代碼
//todo 請求頭不為空
if (request.hdr != null) {
// 獲取請求頭
TxnHeader hdr = request.hdr;
// 獲取事務
Record txn = request.txn;
// todo 跟進這個方法-----<--!!!!!!-----處理事務的邏輯,在這里面有向客戶端發送事件的邏輯, 回調客戶端的watcher----!!!!!!-->
rc = zks.processTxn(hdr, txn);
}
繼續跟進去
// todo 處理事物日志
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
// todo 繼續跟進去!!!!!!!!!
// todo 跟進 processTxn(hdr, txn)
rc = getZKDatabase().processTxn(hdr, txn);
跟進ZkDatabase.java
中的processTxn(hdr, txn)方法
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
// todo 跟進 processTxn
return dataTree.processTxn(hdr, txn);
}
跟進到DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) { // todo 根據客客戶端發送過來的type進行switch,
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
// todo 跟進這個創建節點的方法
createNode(
createTxn.getPath(),
根據請求頭的值,進而判斷出走到那個switch的分支,當前我們在控制台觸發,進入到setData分支如下:跟進這個方法中可以看到它主要做了如下幾件事
- 使用傳遞進來的新值替代舊data
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
**觸發指定的事件watch,什么事件呢? NodeDataChange, 觸發了哪個watcher呢? 跟進去查看 **
//todo setData
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
// todo 修改內存的數據
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix;
if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
// todo 終於 看到了 服務端 關於觸發NodeDataChanged的事件
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
補充Watch & EventType 類圖
跟進去 dataWatches.triggerWatch(path, EventType.NodeDataChanged);
,源碼如下, 主要的邏輯就是取出存放在服務端的watch,然后逐個回調他們的processor函數,問題來了,到底是哪些watcher呢? 其實就是我們在客戶端啟動時添加getData()時存進去的wather,也就是ServerCnxn
// todo 跟進去服務端的 觸發事件, 但是吧, 很納悶. 就是沒有往客戶端發送數據的邏輯
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// todo 繼續跟進去, 看它如何回調的
w.process(e);
}
return watchers;
}
懷着激動的心情去看看ServerCnxn的process()方法做了什么事?
來到ServerCnxn的實現類NIOServerCnxn, 確實很激動,看到了服務端在往客戶端發送事務型消息, 並且new ReplyHeader(-1, -1L, 0)第一個位置上的參數是-1, 這一點很重要,因為客戶端在接受到這個xid=-1的標記后,就會將這條響應交給EventThread處理
@Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
// todo 往服務端發送了 e event類型消息
sendResponse(h, e, "notification");
}
處理回調回調watch使用的響應
進入到SendThread
的讀就緒源碼部分,如下: 它根據header.xid=-1就知道了這是事務類型的響應
// todo 服務端拋出來的事件, 客戶端將把他存在EventThread的 watingEvents 隊列中
// todo 它的實現邏輯也是這樣, 會有另外一個線程不斷的消費這個隊列
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
// todo 創建watcherEvent 並將服務端發送回來的數據,反序列化進這個對象中
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
// todo 將server path 反轉成 client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//todo 跟進去
eventThread.queueEvent(we);
return;
}
}
在這個方法的最后,將這個相應添加進EventThread
消費的隊列中,跟進 eventThread.queueEvent(we);
// todo
public void queueEvent(WatchedEvent event) {
// todo 如果事件的類型是 none, 或者sessionState = 直接返回
/**
* todo 事件的類型被設計成 watcher 接口的枚舉
* None (-1),
* NodeCreated (1),
* NodeDeleted (2),
* NodeDataChanged (3),
* NodeChildrenChanged (4);
*/
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
// todo 根據事件的具體類型,將觀察者具體化, 跟進去
// todo 這個類是ClientCnxn的輔助類,作用就是將watcher 和它觀察的事件封裝在一起
WatcherSetEventPair pair = new WatcherSetEventPair(
//todo 跟進這個 materialize方法. 其實就是從map中取出了和當前client關聯的全部 watcher set
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
// todo 將watch集合 和 event 進行排隊(按順序添加到隊列里了), 以便后續處理 , 怎么處理呢? 就在EventThread的run循環中消費
// todo watingEvent ==> LinkedBlockingQueue<Object>
waitingEvents.add(pair);
}
上面的代碼主要做了如下幾件事:
- 從map中取出和當前事件相關的全部watcher
- 將watcher set 添加進 waitingEvents隊列中,等待EventThead的消費
跟進 watcher.materialize(event.getState(), event.getType(),
會追到下面的代碼
case NodeDataChanged: // todo node中的data改變和 nodeCreate 都會來到下面的分支
case NodeCreated:
synchronized (dataWatches) {
// todo dataWatches 就是剛才存放 path : watcher 的map
// todo dataWatches.remove(clientPath) 移除並返回clientPath對應的watcher , 放入 result 中
addTo(dataWatches.remove(clientPath), result);
}
上面的dataWatches 就是保存path+watcher set的map, 上面的操作是移除並返回指定的watcher,這也說明了,為什么zk原生客戶端添加的watcher僅僅會回調一次
EventThread是如何消費waitingEvents的
EventThread是一條守護線程, 因此它擁有自己的不斷在運行的run方法,它就是在這個run方法中對這個隊列進行消費的
@Override
public void run() {
try {
isRunning = true;
// todo 同樣是無限的循環
while (true) {
// todo 從watingEvnets 中取出一個 WatcherSetEventPair
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// todo 本類方法,處理這個事件,繼續進入,方法就在下面
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catc
繼續跟進它的processEvent(event)
,最終會在這個方法中調用下面的代碼,這里的watcher就是我在本篇博客的開始位置添加進去的watcher,至此打完收工
watcher.process(pair.event);
總結:
當客戶端啟動時添加watcher對某一個特定path上的node進行監聽時 , 客戶端的watcher被封裝進WatcherRegistion中再進一步發送的服務端
watcher不為空的packet達到服務端后會被巧妙的處理,將ServerCnxn當成watcher注冊添加到服務端維護的那份watcher map table中
當watcher關聯的node發生了NodeCreate,NodeDeleted ,NodeDataChannged,NodeChildrenChannged時,在最后一個處理器就會觸發發送事務類型事件的動作,其實就是回調ServerCnxn的process()方法
事務類型的響應返回到客戶端,跟進xid區分出到底是哪種響應,如-1是NodeDataChanged,最終會把這個事務事件提交到EventThread消費的waitingEvents等待EventThread消費它,回調客戶端的watcher的process()方法
如果覺得對您有幫助,歡迎點個推薦, 如果有錯誤,歡迎指出