聲明:本文轉載自http://shift-alt-ctrl.iteye.com/blog/1847320,轉載請務必聲明。
Watcher機制:目的是為ZK客戶端操作提供一種類似於異步獲得數據的操作.
1)在創建Zookeeper實例時,允許接收一個watcher參數,此參數將會賦值給watchMnanger.defaultWatcher,成為當前客戶端的默認Watcher.需要注意此watcher和其他watcher不同,此wather主要是響應"與鏈接狀態轉換"有關的事件(比如,"建立鏈接","鏈接關閉"等,參見KeeperState).此默認watcher有zk client本地持有且生命周期伴隨整個zookeeper實例,而不是"一次觸發即消亡",當Client收到EventType,NONE類型的消息時,則會觸發這個"默認wather"被執行..(參見:消息類型)
2)ZKWatchManager是客戶端watcher管理器,負責跟蹤多種watcher,watcher被分為dataWatches,existWatches,childWatches.每種類型的watcher將會被存在各自的Map中(key為path,value為Set<Watcher>,由此可見,在一個path上一種類型操作重復注冊同一個watcher對象,事實上只會生效一次,不同的watcher對象是可以的).記住:這些watcher只是一些存根,由ZKWatchManager負責管理,並不會隨請求發送給server,而只會發給server此請求類型是否注冊了watch(源碼:request.setWatch(boolean))
3)對於setData,exist,getChildren操作,都可以接收boolean類型的watcher標識和Watcher對象,boolean類型告知請求使用defaultWatcher對象注冊事件.
4)在ZKDatabase中,包括一個DataTree,此dataTree持有對nodes以及相關的watcher的數據.在server端,WatcherManager是管理client注冊的watcher,它只管理dataWatches和childWatches,沒有對exist類型的watch.其數據結構為HashSet<path,Set<Watcher>>,和ZKWatchManager一致.(對於exist類型的請求,sever端將其watch加入dataWatches中,這個很好理解)
5)請求到達server之后,在FinalRequestProcessor中,將會處理各種請求,如果檢測到request.getWatch()為true,即請求要求注冊watch,那么將會把ServerCnxn和path關聯起來,加入到WatherManager相應的列表中.
6)客戶端的請求響應之后,由SendThread.readResponse()處理響應,如果響應code為成功且此請求中注冊了watch,那么將會把此wath添加到響應的watch列表中。
7)ServerCnxn(抽象類)實現了Watcher接口,每個client在server端都對應一個ServerCnxn,此類(子類)是client請求/響應的處理器,不過所有的請求最終還是由一個線程負責通信。在ServerCnxn處理請求時出現異常或者client關閉,將會導致ServerCnxn調用close()方法,此方法中有個分支操作就是從DataTree中的兩種watches列表中刪除其關聯的watch。
8)WatcherManager是server端watch管理器,此類包含2個不同的數據結構用來存儲watch以方便查詢,其中一個是watch2path為HashMap<Watcher, HashSet<String>>;另一個是watchTable為HashMap<String, HashSet<Watcher>>。其實這2個map保存的數據一樣,只是查詢的場景不同;這2個map將會被同時操作。
9)DataTree持有2個WatchManager對象,分別為dataWatches用於管理注冊data操作的watch,childWatches用於管理注冊child操作的watch。
10)WatchManager中還有一個很重要的操作,trigerWatch(String path,EvenType type),當server接受到例如createNode/deleteNode/setData等操作時,將會操作ZKDatabase來操作DataTree中的數據,當然dataTree的數據改動,將會觸發相應patch(節點)上的watch(有可能一個操作會導致多種watch被觸發),trigerWatch就是在這些時機下被調用。此操作中就是從watchManager中將相應path下注冊的watch移除,並依次調用watch.process()。此process()做了一件事情,就是向client發送一個nofication消息,此消息中包含一個WatchEvent對象,此對象封裝了事件的類型/path等。
11)客戶端接受到nofication,並反序獲取WatchEvent,然后和server端的watcherManager一樣,ZKWatcherManager根據event類型,從相應的一個或多個watches列表中分別移除相應path的watch,並將這些“移除”的watches再次封裝成一個WatcherSetEventPair,此對象持有event和watches集合。最后將此pair加入event隊列。
12)client的EventThread將會不斷輪詢,從event隊列中獲取pair,並遍歷pair中關聯的watcher,依次調用watcher的process()方法。。當然此watcher的process方法是client用戶自己實現的,因為watcher對象是client用戶在實例化zookeeper時包括各種操作時交付給zookeeper的。所以用戶應該根據自己的需要,在client受到event時做自己的處理。
F1.Watch生命周期
- Zookeeper提供了如下幾種可以"注冊watch"的操作:exist,getChildren,getData;而對於create,setData,delete是有可能觸發"watcher"的操作.
- 客戶端並不會把用戶創建的watcher對象傳遞給Server,而是傳遞給server一個標記(boolean值)告知server此請求所涉及到的patch上是否有watcher..
- 對於client端請求是隊列話的,即一個操作阻塞直到server端響應.(異步操作稍后介紹,它不阻塞)
- Server對Client的每個請求的響應體中,都會明確告知此次響應的類型(是正常操作響應還是"事件",操作對應的xid,結果類型,錯誤信息等等);如果響應體中沒有錯誤信息且其他校驗正常的話,我們認為此次請求被正確的執行了.
- 可能考慮到在Client與Server端傳遞wath對象所帶來的程序復雜度,ZK采取了"分制"的方式,在Client端和Server端分別采取了不同的技巧來保存Watch列表;(參見上述)
- Server在接收Client請求時,會檢測此次request體中是否持有watcher信息,如果有,則會導致Server端的watcher列表中新增一個此path關聯的watch,只有exist/getChildren/getData會導致此操作.記住watcher信息將會被保存在ZKDatabase中(內存中,而非持久,ZKDatabase會持久Session/ACL/Data).
- 那么對於create/setData/delete請求,將會觸發watcher列表的檢測,比如create操作,創建一個path,在實際的數據存儲結束后,將會在watch列表中遍歷是否有此path所關聯的watches,如果有,則依次觸發.
- 觸發watch其實很簡單,對於server端而言,它持有了每個path所關聯的watch列表,而且每個watch實例正是一個ServerCnxn對象(每個Client與Server的連接處理器,就是一個ServerCnxn對象),因為觸發一個watcher將是便捷性將是顯而易見的,直接將此watcher事件所對應的path/類型直接通過IO的方式發送出去;因此哪個Client注冊了事件,將會被響應的ServerCnxn處理;集群中每個Server幾乎會在同一時間向Client交付事件消息.可能因為網絡的問題,不可確保他們能夠在極短的時間差內都獲得事件.
- "插隊",是因為對於watcher事件,將不再和其他Client操作放在同一隊列中,而是直接通過IO發送,因為ServerCnxn處理client響應是同步的(方法是同步方法),即事件信息將會在當前packet發送之后被立即發送.
- 事件一旦被server觸發,將會在watcher列表中刪除,因此watcher是一次性的(同一個path下的同一類型watcher).我們不能依賴wathcer來全權檢測數據的變更,因為網絡斷開可能會導致事件通知的丟失;當事件被觸發之后,server端將刪除事件,即使client端再次注冊watcher,那么"上一次事件"和"重新注冊事件"這段事件內,仍然有可能數據已經變更.(備注:Watcher watch = watchTable.remove(path);watch.process();首先從watchtable中移除watch,然后再將watch信息發送給client端,即使在發送時網絡異常,watch也不會再次put到watchTable中,事實上此時watch已經被消費.)
- Client接收到Event響應結果之后,將會把此消息體放在eventQueue中,等待EventThread去remove並觸發.
- EventThread將event隊列中的事件,逐個移除並處理,每移除一個event,都會導致Client本地維護的watcher列表刪除相應的watcher(根據path和event類型決定),移除之后並獲取到Client維護的watcher對象(此對象就是先前的操作中注冊的watcher),watcher對象明確了回調方法,此時將會執行watcher.process(),那么調用者的業務方法將會在此刻被執行.[對於業務方法被執行,從整個周期中,我們可以認為是異步的].
- 對於節點的create操作,將會觸發先前注冊的"exist""getChildren"事件被觸發;對於節點的delete操作,將會觸發先前注冊的"exsit""getChildren"事件被觸發;對於節點的setData操作,將會觸發先前注冊的"getData"事件被觸發......每個觸發的事件都會包含事件的類型(比如:nodeCreate,nodeDelete等),對於用戶自定義的watch.process()方法中可以根據事件類型做特定的處理.
- 對於Server端遇到session關閉,連接關閉等異常時,都會觸發和此連接(ServerCnxn)關聯的watch列表.
- 不過對於Client端卻做了"彌補";"zookeeper.disableAutoWatchReset"這個系統參數的意義就是"是否關閉watch自動重置";如果此參數為false(即為開啟"自動重置"),那么在Client端遇到連接異常(比如重連操作)時,都會將本地已有的watcher列表全部發送給Server(此操作稱為"setWatches"),如果連接成功,那么新的server仍然會持有watcher列表,接下來事件將會被如期觸發,就像網絡異常根本就沒發生一樣..那么為什么ZK沒有默認開啟此參數呢?可能考慮到這是個雙刃劍,Client有可能在網絡異常時會做其他的操作(因為網絡異常,最終也會觸發一個本地的Event,Client可以在此Event中做自定義操作);也有可能在網絡異常期間,Cluster中的數據已經被改變,極有可能這些事件中的部分事件已經被錯過,即使接下來被觸發,也將不能正確的反應目前的現狀.如果你期望獲得正確的結果,要么重新注冊watcher,要么檢測現有的數據是否已經改變.
Zookeeper客戶端不僅提供了同步操作,還有異步操作,對於create/delete/exist/setData等,ZK分別提供了同步和異步方法,我們上述了解到的,都是同步操作,簡單做如下列舉:
public Stat exists(String path,Watcher watcher):同步方法,檢測path是否存在,如果存在則返回節點的全信息,否則返回null.如果此后此path被創建或者刪除,則觸發watcher.
public void exist(String path,Watcher watcher,StatCallback cb,Object ctx):這個方法就是異步的,它需要指定一個StatCallback實例,以便在請求被處理之后,異步的執行callback操作.
我相信你一定知道如何將調用過程設計為"異步"[提示:異步即為操作隊列話 + callback調用].
在Zookeeper中,同步方法樣例:
- public ReplyHeader submitRequest(RequestHeader h, Record request,
- Record response, WatchRegistration watchRegistration)
- throws InterruptedException {
- ReplyHeader r = new ReplyHeader();
- //將請求加入隊列,此隊列將會被SendThread操作,並依此發送請求.
- Packet packet = queuePacket(h, r, request, response, null, null, null,
- null, watchRegistration);
- //直接阻塞當前請求
- synchronized (packet) {
- while (!packet.finished) {
- packet.wait();//此處阻塞,直到響應,響應被接受后,會對此packet.notify()調用.
- }
- }
- return r;//返回處理的結果
- }
那么對於異步操作,只調用queuePacket(....)將請求添加到隊列,然后exist方法就直接返回了.不過在響應被成功接收后,會額外的檢測此packet是否有callback,如果有,就立即執行:
- private void finishPacket(Packet p) {
- if (p.watchRegistration != null) {
- p.watchRegistration.register(p.replyHeader.getErr());
- }
- //此處就是檢測callback
- if (p.cb == null) {
- synchronized (p) {
- p.finished = true;
- p.notifyAll();
- }
- } else {
- p.finished = true;
- eventThread.queuePacket(p);//將異步調用packet添加到事件隊列,依此被處理.
- }
- }
到目前為止,watcher機制我們已經走到"頭"了...