轉載:https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/
http://blog.csdn.net/tycoon1988/article/details/38405101
client在一個節點上設置watch,隨后節點內容改變,client將獲取事件。當節點內容再次改變,client不會獲取這個事件,除非它又執行了一次讀操作並設置watch
watch事件異步發送至觀察者。比如說client執行一次寫操作,節點數據內容發生變化,操作返回后,而watch事件可能還在發往client的路上。這種情況下,zookeeper提供有序保證:client不會得知數據變化,直到它獲取watch事件。網絡延遲或其他因素可能導致不同client在不同時刻獲取watch事件和操作返回值。
分布式系統從根本上來說就是不同節點上的進程並發執行,並且相互之間對進程的行為進行協調處理的過程。不同節點上的進程互相協調行為的過程叫做分布式同步。許多分布式系統需要一個進程作為任務的協調者,執行一些其他進程並不執行的特殊的操作,一般情況下哪個進程擔當任務的協調者都無所謂,但是必須有一個進程作為協調者,自動選舉出一個協調者的過程就是分布式選舉。ZooKeeper 正是為了解決這一系列問題而生的。這一講我們來談談 Watcher 機制,首先介紹一個監控示例,然后我們再來聊聊 Watcher 機制原理。
ZooKeeper Watcher 機制
集群狀態監控示例
為了確保集群能夠正常運行,ZooKeeper 可以被用來監視集群狀態,這樣就可以提供集群高可用性。使用 ZooKeeper 的瞬時(ephemeral)節點概念可以設計一個集群機器狀態檢測機制:
1. 每一個運行了 ZooKeeper 客戶端的生產環境機器都是一個終端進程,我們可以在它們連接到 ZooKeeper 服務端后在 ZooKeeper 服務端創建一系列對應的瞬時節點,可以用/hostname 來進行區分。
2. 這里還是采用監聽(Watcher)方式來完成對節點狀態的監視,通過對/hostname 節點的 NodeChildrenChanged 事件的監聽來完成這一目標。監聽進程是作為一個獨立的服務或者進程運行的,它覆蓋了 process 方法來實現應急措施。
3. 由於是一個瞬時節點,所以每次客戶端斷開時 znode 會立即消失,這樣我們就可以監聽到集群節點異常。
4.NodeChildrenChanged 事件觸發后我們可以調用 getChildren 方法來知道哪台機器發生了異常。
清單 1.ClusterMonitor 類
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterMonitor implements Runnable{
private static String membershipRoot = "/Members";
private final Watcher connectionWatcher;
private final Watcher childrenWatcher;
private ZooKeeper zk;
boolean alive = true;
public ClusterMonitor(String HostPort) throws IOException,InterruptedException,KeeperException{
connectionWatcher = new Watcher(){
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
if(event.getType() == Watcher.Event.EventType.None &&
event.getState() == Watcher.Event.KeeperState.SyncConnected){
System.out.println("\nconnectionWatcher Event Received:%s"+event.toString());
}
}
};
childrenWatcher = new Watcher(){
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println("\nchildrenWatcher Event Received:%s"+event.toString());
if(event.getType()==Event.EventType.NodeChildrenChanged){
try{
//Get current list of child znode and reset the watch
List<
String
> children = zk.getChildren(membershipRoot, this);
System.out.println("Cluster Membership change,Members: "+children);
}catch(KeeperException ex){
throw new RuntimeException(ex);
}catch(InterruptedException ex){
Thread.currentThread().interrupt();
alive = false;
throw new RuntimeException(ex);
}
}
}
};
zk = new ZooKeeper(HostPort,2000,connectionWatcher);
//Ensure the parent znode exists
if(zk.exists(membershipRoot, false) == null){
zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//Set a watch on the parent znode
List<
String
> children = zk.getChildren(membershipRoot, childrenWatcher);
System.err.println("Members:"+children);
}
public synchronized void close(){
try{
zk.close();
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
try{
synchronized(this){
while(alive){
wait();
}
}
}catch(InterruptedException ex){
ex.printStackTrace();
Thread.currentThread().interrupt();
}finally{
this.close();
}
}
public static void main(String[] args) throws IOException,InterruptedException,KeeperException{
if(args.length != 1){
System.err.println("Usage:ClusterMonitor<
Host:Port
>");
System.exit(0);
}
String hostPort = args[0];
new ClusterMonitor(hostPort).run();
}
}
|
清單 2.ClusterClient 類
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterClient implements Watcher,Runnable{
private static String membershipRoot = "/Members";
ZooKeeper zk;
public ClusterClient(String hostPort,Long pid){
String processId = pid.toString();
try{
zk = new ZooKeeper(hostPort,2000,this);
}catch(IOException ex){
ex.printStackTrace();
}
if(zk!=null){
try{
zk.create(membershipRoot+'/'+processId, processId.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}catch(KeeperException | InterruptedException ex){
ex.printStackTrace();
}
}
}
public synchronized void close(){
try{
zk.close();
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println("\nEvent Received:%s"+event.toString());
}
@Override
public void run() {
// TODO Auto-generated method stub
try{
synchronized(this){
while(true){
wait();
}
}
}catch(InterruptedException ex){
ex.printStackTrace();
Thread.currentThread().interrupt();
}finally{
this.close();
}
}
public static void main(String[] args){
if(args.length!=1){
System.err.println("Usage:ClusterClient<
Host:Port
>");
System.exit(0);
}
String hostPort=args[0];
//Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0,index));
new ClusterClient(hostPort,processId).run();
}
}
|
清單 3.Eclipse 運行輸出
|
1
2
3
|
childrenWatcher Event Received:%sWatchedEvent state:SyncConnected type:NodeChildrenChanged path:/Members
Cluster Membership change,Members: [dweref0000000009, test100000000003, dsdawqeqw0000000008,
test111110000000004, test22220000000005, dsda32130000000007, dsda0000000006, test10000000002]
|
我們通過 zkCli 方式對被監聽的/Members 這個 ZNODE 操作,增加一個子節點,您會在 zkCli 里看到如清單 4 所示輸出。
清單 4.ZKCli 創建 ZNode 子節點
|
1
2
|
[zk: localhost:2181(CONNECTED) 0] create -s /Members/dweref rew23rf
Created /Members/dweref0000000009 [zk: localhost:2181(CONNECTED) 4]
|
上面的示例我們演示了如何發起對於一個 ZNODE 的監聽,當該 ZNODE 被改變后,我們會觸發對應的方法進行處理,這類方式可以被用在數據監聽、集群狀態監聽等用途。
回調函數
由於 Watcher 機制涉及到回調函數,所以我們先來介紹一下回調函數的基礎知識。
打個比方,有一家旅館提供叫醒服務,但是要求旅客自己決定叫醒的方法。可以是打客房電話,也可以是派服務員去敲門,睡得死怕耽誤事的,還可以要求往自己頭上澆盆水。這里,“叫醒”這個行為是旅館提供的,相當於庫函數,但是叫醒的方式是由旅客決定並告訴旅館的,也就是回調函數。而旅客告訴旅館怎么叫醒自己的動作,也就是把回調函數傳入庫函數的動作,稱為登記回調函數(to register a callback function)。
乍看起來,回調似乎只是函數間的調用,但仔細一琢磨,可以發現兩者之間的一個關鍵的不同:在回調中,我們利用某種方式,把回調函數像參數一樣傳入中間函數。可以這么理解,在傳入一個回調函數之前,中間函數是不完整的。換句話說,程序可以在運行時,通過登記不同的回調函數,來決定、改變中間函數的行為。這就比簡單的函數調用要靈活太多了。
回調實際上有兩種:阻塞式回調和延遲式回調。兩者的區別在於:阻塞式回調里,回調函數的調用一定發生在起始函數返回之前;而延遲式回調里,回調函數的調用有可能是在起始函數返回之后。我們來看一個簡單的示例。
清單 5.Caller 類
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class Caller
{
public MyCallInterface mc;
public void setCallfuc(MyCallInterface mc)
{
this.mc= mc;
}
public void call(){
this.mc.method();
}
}
|
清單 6.MyCallInterface 接口
|
1
2
3
|
public interface MyCallInterface {
public void method();
}
|
清單 7.CallbackClass 類
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class CallbackClass implements MyCallInterface{
public void method()
{
System.out.println("回調函數");
}
public static void main(String args[])
{
Caller call = new Caller();
call.setCallfuc(new CallbackClass());
call.call();
}
}
|
清單 8. 運行結果
|
1
|
回調函數
|
原理及源代碼解釋
實現原理
ZooKeeper 允許客戶端向服務端注冊一個 Watcher 監聽,當服務端的一些指定事件觸發了這個 Watcher,那么就會向指定客戶端發送一個事件通知來實現分布式的通知功能。
ZooKeeper 的 Watcher 機制主要包括客戶端線程、客戶端 WatchManager 和 ZooKeeper 服務器三部分。在具體工作流程上,簡單地講,客戶端在向 ZooKeeper 服務器注冊 Watcher 的同時,會將 Watcher 對象存儲在客戶端的 WatchManager 中。當 ZooKeeper 服務器端觸發 Watcher 事件后,會向客戶端發送通知,客戶端線程從 WatchManager 中取出對應的 Watcher 對象來執行回調邏輯。如清單 9 所示,WatchManager 創建了一個 HashMap,這個 HashMap 被用來存放 Watcher 對象。
清單 9.WatchManager 類
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
private final HashMap<
String
, HashSet<Watcher>> watchTable =
new HashMap<
String
, HashSet<Watcher>>();
public synchronized void addWatch(String path, Watcher watcher) {
HashSet<
Watcher
> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<
Watcher
>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<
String
> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<
String
>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
|
整個 Watcher 注冊和通知流程如圖 1 所示。
圖 1.Watcher 注冊和通知流程圖

Watcher 接口
Watcher 的理念是啟動一個客戶端去接收從 ZooKeeper 服務端發過來的消息並且同步地處理這些信息。ZooKeeper 的 Java API 提供了公共接口 Watcher,具體操作類通過實現這個接口相關的方法來實現從所連接的 ZooKeeper 服務端接收數據。如果要處理這個消息,需要為客戶端注冊一個 CallBack(回調)對象。Watcher 接口定義在 org.apache.zookeeper 包里面,代碼如清單 10 所示。
清單 10.Watcher 接口
|
1
2
3
|
public interface Watcher {
abstract public void process(WatchedEvent event);
}
|
在 Watcher 接口里面,除了回調函數 process 以外,還包含 KeeperState 和 EventType 兩個枚舉類,分別代表了通知狀態和事件類型,如圖 2 所示。
圖 2.Watcher通知狀態和事件類型表

process 方法是 Watcher 接口中的一個回調方法,當 ZooKeeper 向客戶端發送一個 Watcher 事件通知時,客戶端就會對相應的 process 方法進行回調,從而實現對事件的處理。
process 方法包含 WatcherEvent 類型的參數,WatchedEvent 包含了每一個事件的三個基本屬性:通知狀態(KeeperState)、事件類型(EventType)和節點路徑(Path),ZooKeeper 使用 WatchedEvent 對象來封裝服務端事件並傳遞給 Watcher,從而方便回調方法 process 對服務端事件進行處理。
WatchedEvent 和 WatcherEvent 都表示的是同一個事物,都是對一個服務端事件的封裝。不同的是,WatchedEvent 是一個邏輯事件,用於服務端和客戶端程序執行過程中所需的邏輯對象,而 WatcherEvent 因為實現了序列化接口,因此可以用於網絡傳輸。
服務端在線程 WatchedEvent 事件之后,會調用 getWrapper 方法將自己包裝成一個可序列化的 WatcherEvent 事件,如清單 7 所示,以便通過網絡傳輸到客戶端。客戶端在接收到服務端的這個事件對象后,首先會將 WatcherEvent 事件還原成一個 WatchedEvent 事件,並傳遞給 process 方法處理,回調方法 process 根據入參就能夠解析出完整的服務端事件了。
清單 11. 可序列化的事件
|
1
2
3
4
5
|
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(),
keeperState.getIntValue(),
path);
}
|
客戶端注冊 Watcher 流程
清單 1 所示代碼中采用了 ZooKeeper 構造函數來傳入一個 Watcher,如代碼 zk = new ZooKeeper(HostPort,2000,connectionWatcher);在這行代碼里,第三個參數是連接到 ZooKeeper 服務端的 connectionWatcher 事件監聽,這個 Watcher 將作為整個 ZooKeeper 會話期間的默認 Watcher,會一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 里面。
客戶端的請求基本都是在 ClientCnxn 里面進行操作,當收到請求后,客戶端會對當前客戶端請求進行標記,將其設置為使用 Watcher 監聽,同時會封裝一個 Watcher 的注冊信息 WatchRegistration 對象,用於暫時保存數據節點的路徑和 Watcher 的對應關系。
清單 12.getChildren 方法添加 watch 事件
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
|
在 ZooKeeper 中,Packet 是一個最小的通信協議單元,即數據包。Pakcet 用於進行客戶端與服務端之間的網絡傳輸,任何需要傳輸的對象都需要包裝成一個 Packet 對象。在 ClientCnxn 中 WatchRegistration 也會被封裝到 Pakcet 中,然后由 SendThread 線程調用 queuePacke 方法把 Packet 放入發送隊列中等待客戶端發送,這又是一個異步過程,分布式系統采用異步通信是一個普遍認同的觀念。隨后,SendThread 線程會通過 readResponse 方法接收來自服務端的響應,異步地調用 finishPacket 方法從 Packet 中取出對應的 Watcher 並注冊到 ZKWatchManager 中去,如清單 13 所示。
清單 13.getChildren 方法添加 watch 事件
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
|
除了上面介紹的方式以外,ZooKeeper 客戶端也可以通過 getData、getChildren 和 exist 三個接口來向 ZooKeeper 服務器注冊 Watcher,無論使用哪種方式,注冊 Watcher 的工作原理是一致的。如清單 14 所示,getChildren 方法調用了 WatchManager 類的 addWatch 方法添加了 watcher 事件。
清單 14.getChildren 方法添加 watcher 事件
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public ArrayList<
String
> getChildren(String path, Stat stat,
Watcher watcher) throws KeeperException.NoNodeException {
DataNodeV1 n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
ArrayList<
String
> children = new ArrayList<
String
>();
children.addAll(n.children);
if (watcher != null) {
childWatches.addWatch(path, watcher);
}
return children;
}
}
|
如清單 15 所示,現在需要從這個封裝對象中再次提取出 Watcher 對象來,在 register 方法里面,客戶端將 Watcher 對象轉交給 ZKWatchManager,並最終保存在一個 Map 類型的數據結構 dataWatches 里面,用於將數據節點的路徑和 Watcher 對象進行一一映射后管理起來。
注意,WatcherRegistation 除了 Header 和 request 兩個屬性被傳遞到了服務端,其他都沒有到服務端,否則服務端就容易出現內存緊張甚至溢出的危險,因為數據量太大了。這就是 ZooKeeper 為什么適用於分布式環境的原因,它在網絡中傳輸的是消息,而不是數據包實體。
清單 15.processRequest 代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<
String
, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<
Watcher
> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<
Watcher
>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
|
服務端處理 Watcher 流程
如圖 3 所示是服務端處理 Watcher 的一個完整序列圖。
圖 3. 服務端處理 Watcher 序列圖

注意,以下所有代碼均為精簡版,去除了日志、判斷分支,只在源碼上保留了主線代碼。
FinalRequestProcessor 類接收到客戶端請求后,會調用 processRequest 方法進行處理,會進一步轉向 ZooKeeperServer 的 processRequest 進行進一步處理,處理結由 ZKDatabase 類返回,如清單 16-18 所示。
清單 16.processRequest 代碼
|
1
2
3
4
5
6
7
|
public void processRequest(Request request) {
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
rc = zks.processTxn(hdr, txn);
}
|
清單 17.ZooKeeperServer 代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
rc = getZKDatabase().processTxn(hdr, txn);
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst
.getTimeOut());
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
|
清單 18.ZKDatabase 代碼
|
1
2
3
4
5
6
7
8
9
10
|
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
switch (header.getType()) {
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
|
對於注冊 Watcher 請求,FinalRequestProcessor 的 ProcessRequest 方法會判斷當前請求是否需要注冊 Watcher,如果為 true,就會將當前的 ServerCnxn 對象和數據節點路徑傳入 getData 方法中去。ServerCnxn 是一個 ZooKeeper 客戶端和服務器之間的連接接口,代表了一個客戶端和服務器的連接,我們后面講到的 process 回調方法,實際上也是從這里回調的,所以可以把 ServerCnxn 看作是一個 Watcher 對象。數據節點的節點路徑和 ServerCnxn 最終會被存儲在 WatchManager 的 watchTable 和 watch2Paths 中。
清單 19. 判斷是否注冊 Watcher 代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
|
如前所述,WatchManager 負責 Watcher 事件的觸發,它是一個統稱,在服務端 DataTree 會托管兩個 WatchManager,分別是 dataWatches 和 childWatches,分別對應數據變更 Watcher 和子節點變更 Watcher。
清單 20.WatchManger 兩個隊列
|
1
2
3
4
5
|
private final HashMap<
String
, HashSet<Watcher>> watchTable =
new HashMap<
String
, HashSet<Watcher>>();
private final HashMap<
Watcher
, HashSet<String>> watch2Paths =
new HashMap<
Watcher
, HashSet<String>>();
|
回到主題,如清單 21 到 23 所示,當發生 Create、Delete、NodeChange(數據變更)這樣的事件后,DataTree 會調用相應方法去觸發 WatchManager 的 triggerWatch 方法,該方法返回 ZNODE 的信息,自此進入到回調本地 process 的序列。
清單 21.processTxn 代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
switch (header.getType()) {
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
|
清單 22.setData 代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNodeV1 n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
|
清單 23.triggerWatch 代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public Set<
Watcher
> triggerWatch(String path, EventType type, Set<
Watcher
> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
//將事件類型(EventType)、通知狀態(WatchedEvent)、節點路徑封裝成一個 WatchedEvent 對象
HashSet<
Watcher
> watchers;
synchronized (this) {
//根據數據節點的節點路徑從 watchTable 里面取出對應的 Watcher。如果沒有找到 Watcher 對象,
說明沒有任何客戶端在該數據節點上注冊過 Watcher,直接退出。如果找打了 Watcher 就將其提取出來,
同時會直接從 watchTable 和 watch2Paths 里刪除 Watcher,即 Watcher 是一次性的,觸發一次就失效了。
watchers = watchTable.remove(path);
for (Watcher w : watchers) {
HashSet<
String
> paths = watch2Paths.get(w);
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//對於需要注冊 Watcher 的請求,ZooKeeper 會把請求對應的惡 ServerCnxn 作為一個 Watcher 存儲,
所以這里調用的 process 方法實質上是 ServerCnxn 的對應方法
w.process(e);
}
return watchers;
}
|
從上面的代碼我們可以總結出,如果想要處理一個 Watcher,需要執行的步驟如下所示:
1. 將事件類型(EventType)、通知狀態(WatchedEvent)、節點路徑封裝成一個 WatchedEvent 對象。
2. 根據數據節點的節點路徑從 watchTable 里面取出對應的 Watcher。如果沒有找到 Watcher 對象,說明沒有任何客戶端在該數據節點上注冊過 Watcher,直接退出。如果找到了 Watcher 就將其提取出來,同時會直接從 watchTable 和 watch2Paths 里刪除 Watcher,即 Watcher 是一次性的,觸發一次就失效了。
3. 對於需要注冊 Watcher 的請求,ZooKeeper 會把請求對應的 ServerCnxn 作為一個 Watcher 存儲,所以這里調用的 process 方法實質上是 ServerCnxn 的對應方法,如清單 24 所示,在請求頭標記“-1”表示當前是一個通知,將 WatchedEvent 包裝成 WatcherEvent 用於網絡傳輸序列化,向客戶端發送通知,真正的回調方法在客戶端,就是我們清單 10 里面定義的 process() 方法。
清單 24.ServerCnxn 類代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
|
如清單 24 所示,客戶端收到消息后,會調用 ClientCnxn 的 SendThread.readResponse 方法來進行統一處理,如清單所示。如果響應頭 replyHdr 中標識的 Xid 為 02,表示是 ping,如果為-4,表示是驗證包,如果是-1,表示這是一個通知類型的響應,然后進行反序列化、處理 chrootPath、還原 WatchedEvent、回調 Watcher 等步驟,其中回調 Watcher 步驟將 WacthedEvent 對象交給 EventThread 線程,在下一個輪詢周期中進行 Watcher 回調。
清單 25.SendThread 線程代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class SendThread extends ZooKeeperThread {
private long lastPingSentNs;
private final ClientCnxnSocket clientCnxnSocket;
private Random r = new Random(System.nanoTime());
private boolean isFirstConnect = true;
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
|
如清單 25 所示,SendThread 接收到服務端的通知事件后,會通過調用 EventThread 類的 queueEvent 方法將事件傳給 EventThread 線程,queueEvent 方法根據該通知事件,從 ZKWatchManager 中取出所有相關的 Watcher,如清單 26 所示。
清單 26.EventThread 線程代碼
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class EventThread extends ZooKeeperThread {
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
|
客戶端在識別出事件類型 EventType 之后,會從相應的 Watcher 存儲中刪除對應的 Watcher,獲取到相關的 Watcher 之后,會將其放入 waitingEvents 隊列,該隊列從字面上就能理解是一個待處理隊列,線程的 run 方法會不斷對該該隊列進行處理,這就是一種異步處理思維的實現。
清單 27.ZKWatchManager 取出 Watcher
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public Set<
Watcher
> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<
Watcher
> result = new HashSet<
Watcher
>();
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
|
清單 28.EventThread 線程的 run 方法
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
|
ZooKeeper Watcher 特性總結
1. 注冊只能確保一次消費
無論是服務端還是客戶端,一旦一個 Watcher 被觸發,ZooKeeper 都會將其從相應的存儲中移除。因此,開發人員在 Watcher 的使用上要記住的一點是需要反復注冊。這樣的設計有效地減輕了服務端的壓力。如果注冊一個 Watcher 之后一直有效,那么針對那些更新非常頻繁的節點,服務端會不斷地向客戶端發送事件通知,這無論對於網絡還是服務端性能的影響都非常大。
2. 客戶端串行執行
客戶端 Watcher 回調的過程是一個串行同步的過程,這為我們保證了順序,同時,需要開發人員注意的一點是,千萬不要因為一個 Watcher 的處理邏輯影響了整個客戶端的 Watcher 回調。
3. 輕量級設計
WatchedEvent 是 ZooKeeper 整個 Watcher 通知機制的最小通知單元,這個數據結構中只包含三部分的內容:通知狀態、事件類型和節點路徑。也就是說,Watcher 通知非常簡單,只會告訴客戶端發生了事件,而不會說明事件的具體內容。例如針對 NodeDataChanged 事件,ZooKeeper 的 Watcher 只會通知客戶指定數據節點的數據內容發生了變更,而對於原始數據以及變更后的新數據都無法從這個事件中直接獲取到,而是需要客戶端主動重新去獲取數據,這也是 ZooKeeper 的 Watcher 機制的一個非常重要的特性。另外,客戶端向服務端注冊 Watcher 的時候,並不會把客戶端真實的 Watcher 對象傳遞到服務端,僅僅只是在客戶端請求中使用 boolean 類型屬性進行了標記,同時服務端也僅僅只是保存了當前連接的 ServerCnxn 對象。這樣輕量級的 Watcher 機制設計,在網絡開銷和服務端內存開銷上都是非常廉價的。
結束語
本文首先介紹了一個簡單的監聽示例代碼,通過監聽 ZNode 的變化,觸發回調函數來實現觸發后的業務處理,接下來簡單介紹了一點回調函數的基本知識,然后我們開始討論 Watcher 機制的實現原理,從 Watcher 接口開始聊,引申出 WatcherEvent 類型,再到添加 watcher 事件以及回調函數基本原理介紹,最后對 Watcher 機制的設計原理進行了三點總結。
