zookeeper主要是為了統一分布式系統中各個節點的工作狀態,在資源沖突的情況下協調提供節點資源搶占,提供給每個節點了解整個集群所處狀態的途徑。這一切的實現都依賴於zookeeper中的事件監聽和通知機制。
zookeeper中的事件和狀態
事件和狀態構成了zookeeper客戶端連接描述的兩個維度。注意,網上很多帖子都是在介紹zookeeper客戶端連接的事件,但是忽略了zookeeper客戶端狀態的變化也是要進行監聽和通知的。這里我們通過下面的兩個表詳細介紹zookeeper中的事件和狀態(zookeeper API中被定義為@Deprecated的事件和狀態就不介紹了)。
zookeeper客戶端與zookeeper server連接的狀態
| 連接狀態 | 狀態含義 |
|---|---|
| KeeperState.Expired | 客戶端和服務器在ticktime的時間周期內,是要發送心跳通知的。這是租約協議的一個實現。客戶端發送request,告訴服務器其上一個租約時間,服務器收到這個請求后,告訴客戶端其下一個租約時間是哪個時間點。當客戶端時間戳達到最后一個租約時間,而沒有收到服務器發來的任何新租約時間,即認為自己下線(此后客戶端會廢棄這次連接,並試圖重新建立連接)。這個過期狀態就是Expired狀態 |
| KeeperState.Disconnected | 就像上面那個狀態所述,當客戶端斷開一個連接(可能是租約期滿,也可能是客戶端主動斷開)這是客戶端和服務器的連接就是Disconnected狀態 |
| KeeperState.SyncConnected | 一旦客戶端和服務器的某一個節點建立連接(注意,雖然集群有多個節點,但是客戶端一次連接到一個節點就行了),並完成一次version、zxid的同步,這時的客戶端和服務器的連接狀態就是SyncConnected |
| KeeperState.AuthFailed | zookeeper客戶端進行連接認證失敗時,發生該狀態 |
需要說明的是,這些狀態在觸發時,所記錄的事件類型都是:EventType.None。
zookeeper中的watch事件(當zookeeper客戶端監聽某個znode節點”/node-x”時)
| zookeeper事件 | 事件含義 |
|---|---|
| EventType.NodeCreated | 當node-x這個節點被創建時,該事件被觸發 |
| EventType.NodeChildrenChanged | 當node-x這個節點的直接子節點被創建、被刪除、子節點數據發生變更時,該事件被觸發。 |
| EventType.NodeDataChanged | 當node-x這個節點的數據發生變更時,該事件被觸發 |
| EventType.NodeDeleted | 當node-x這個節點被刪除時,該事件被觸發。 |
| EventType.None | 當zookeeper客戶端的連接狀態發生變更時,即KeeperState.Expired、KeeperState.Disconnected、KeeperState.SyncConnected、KeeperState.AuthFailed狀態切換時,描述的事件類型為EventType.None |
watch機制
Znode發生變化(Znode本身的增加,刪除,修改,以及子Znode的變化)可以通過Watch機制通知到客戶端。那么要實現Watch,就必須實現org.apache.zookeeper.Watcher接口,並且將實現類的對象傳入到可以Watch的方法中。Zookeeper中所有讀操作(getData(),getChildren(),exists())都可以設置Watch選項。Watch事件具有one-time trigger(一次性觸發)的特性,如果Watch監視的Znode有變化,那么就會通知設置該Watch的客戶端。
在上述說道的所有讀操作中,如果需要Watcher,我們可以自定義Watcher,如果是Boolean型變量,當為true時,則使用系統默認的Watcher,系統默認的Watcher是在Zookeeper的構造函數中定義的Watcher。參數中Watcher為空或者false,表示不啟用Wather。
watch特性1:一次性觸發器
客戶端在Znode設置了Watch時,如果Znode內容發生改變,那么客戶端就會獲得Watch事件。例如:客戶端設置getData("/znode1", true)后,如果/znode1發生改變或者刪除,那么客戶端就會得到一個/znode1的Watch事件,但是/znode1再次發生變化,那客戶端是無法收到Watch事件的,除非客戶端設置了新的Watch。
watch特性2:發送至客戶端
Watch事件是異步發送到Client。Zookeeper可以保證客戶端發送過去的更新順序是有序的。例如:某個Znode沒有設置watcher,那么客戶端對這個Znode設置Watcher發送到集群之前,該客戶端是感知不到該Znode任何的改變情況的。換個角度來解釋:由於Watch有一次性觸發的特點,所以在服務器端沒有Watcher的情況下,Znode的任何變更就不會通知到客戶端。不過,即使某個Znode設置了Watcher,且在Znode有變化的情況下通知到了客戶端,但是在客戶端接收到這個變化事件,但是還沒有再次設置Watcher之前,如果其他客戶端對該Znode做了修改,這種情況下,Znode第二次的變化客戶端是無法收到通知的。這可能是由於網絡延遲或者是其他因素導致,所以我們使用Zookeeper不能期望能夠監控到節點每次的變化。Zookeeper只能保證最終的一致性,而無法保證強一致性。
watch特性3:設置watch的數據內容
Znode改變有很多種方式,例如:節點創建,節點刪除,節點改變,子節點改變等等。Zookeeper維護了兩個Watch列表,一個節點數據Watch列表,另一個是子節點Watch列表。getData()和exists()設置數據Watch,getChildren()設置子節點Watch。兩者選其一,可以讓我們根據不同的返回結果選擇不同的Watch方式,getData()和exists()返回節點的內容,getChildren()返回子節點列表。因此,setData()觸發內容Watch,create()觸發當前節點的內容Watch或者是其父節點的子節點Watch。delete()同時觸發父節點的子節點Watch和內容Watch,以及子節點的內容Watch。
Zookeeper Watcher的運行機制
1,Watch是輕量級的,其實就是本地JVM的Callback,服務器端只是存了是否有設置了Watcher的布爾類型。(源碼見:org.apache.zookeeper.server.FinalRequestProcessor)
2,在服務端,在FinalRequestProcessor處理對應的Znode操作時,會根據客戶端傳遞的watcher變量,添加到對應的ZKDatabase(org.apache.zookeeper.server.ZKDatabase)中進行持久化存儲,同時將自己NIOServerCnxn做為一個Watcher callback,監聽服務端事件變化
3,Leader通過投票通過了某次Znode變化的請求后,然后通知對應的Follower,Follower根據自己內存中的zkDataBase信息,發送notification信息給zookeeper客戶端。
4,Zookeeper客戶端接收到notification信息后,找到對應變化path的watcher列表,挨個進行觸發回調。
流程圖

import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * Zookeeper Wathcher * 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類) * @author(alienware) * @since 2015-6-14 */ public class ZooKeeperWatcher implements Watcher { /** 定義原子變量 */ AtomicInteger seq = new AtomicInteger(); /** 定義session失效時間 */ private static final int SESSION_TIMEOUT = 10000; /** zookeeper服務器地址 */ private static final String CONNECTION_ADDR = "192.168.1.121:2181,192.168.1.122:2181,192.168.1.123:2181"; /** zk父路徑設置 */ private static final String PARENT_PATH = "/p"; /** zk子路徑設置 */ private static final String CHILDREN_PATH = "/p/c"; /** 進入標識 */ private static final String LOG_PREFIX_OF_MAIN = "【Main】"; /** zk變量 */ private ZooKeeper zk = null; /**用於等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 創建ZK連接 * @param connectAddr ZK服務器地址列表 * @param sessionTimeout Session超時時間 */ public void createConnection(String connectAddr, int sessionTimeout) { this.releaseConnection(); try { //this表示把當前對象進行傳遞到其中去(也就是在主函數里實例化的new ZooKeeperWatcher()實例對象) zk = new ZooKeeper(connectAddr, sessionTimeout, this); System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務器"); connectedSemaphore.await(); } catch (Exception e) { e.printStackTrace(); } } /** * 關閉ZK連接 */ public void releaseConnection() { if (this.zk != null) { try { this.zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 創建節點 * @param path 節點路徑 * @param data 數據內容 * @return */ public boolean createPath(String path, String data, boolean needWatch) { try { //設置監控(由於zookeeper的監控都是一次性的所以 每次必須設置監控) this.zk.exists(path, needWatch); System.out.println(LOG_PREFIX_OF_MAIN + "節點創建成功, Path: " + this.zk.create( /**路徑*/ path, /**數據*/ data.getBytes(), /**所有可見*/ Ids.OPEN_ACL_UNSAFE, /**永久存儲*/ CreateMode.PERSISTENT ) + ", content: " + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 讀取指定節點數據內容 * @param path 節點路徑 * @return */ public String readData(String path, boolean needWatch) { try { System.out.println("讀取數據操作..."); return new String(this.zk.getData(path, needWatch, null)); } catch (Exception e) { e.printStackTrace(); return ""; } } /** * 更新指定節點數據內容 * @param path 節點路徑 * @param data 數據內容 * @return */ public boolean writeData(String path, String data) { try { System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 刪除指定節點 * * @param path * 節點path */ public void deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path); } catch (Exception e) { e.printStackTrace(); } } /** * 判斷指定節點是否存在 * @param path 節點路徑 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 獲取子節點 * @param path 節點路徑 */ private List<String> getChildren(String path, boolean needWatch) { try { System.out.println("讀取子節點操作..."); return this.zk.getChildren(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 刪除所有節點 */ public void deleteAllTestPath(boolean needWatch) { if(this.exists(CHILDREN_PATH, needWatch) != null){ this.deleteNode(CHILDREN_PATH); } if(this.exists(PARENT_PATH, needWatch) != null){ this.deleteNode(PARENT_PATH); } } /** * 收到來自Server的Watcher通知后的處理。 */ @Override public void process(WatchedEvent event) { System.out.println("進入 process 。。。。。event = " + event); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } if (event == null) { return; } // 連接狀態 KeeperState keeperState = event.getState(); // 事件類型 EventType eventType = event.getType(); // 受影響的path String path = event.getPath(); //原子對象seq 記錄進入process的次數 String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】"; System.out.println(logPrefix + "收到Watcher通知"); System.out.println(logPrefix + "連接狀態:\t" + keeperState.toString()); System.out.println(logPrefix + "事件類型:\t" + eventType.toString()); if (KeeperState.SyncConnected == keeperState) { // 成功連接上ZK服務器 if (EventType.None == eventType) { System.out.println(logPrefix + "成功連接上ZK服務器"); connectedSemaphore.countDown(); } //創建節點 else if (EventType.NodeCreated == eventType) { System.out.println(logPrefix + "節點創建"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //更新節點 else if (EventType.NodeDataChanged == eventType) { System.out.println(logPrefix + "節點數據更新"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //更新子節點 else if (EventType.NodeChildrenChanged == eventType) { System.out.println(logPrefix + "子節點變更"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } //刪除節點 else if (EventType.NodeDeleted == eventType) { System.out.println(logPrefix + "節點 " + path + " 被刪除"); } else ; } else if (KeeperState.Disconnected == keeperState) { System.out.println(logPrefix + "與ZK服務器斷開連接"); } else if (KeeperState.AuthFailed == keeperState) { System.out.println(logPrefix + "權限檢查失敗"); } else if (KeeperState.Expired == keeperState) { System.out.println(logPrefix + "會話失效"); } else ; System.out.println("--------------------------------------------"); } /** * <B>方法名稱:</B>測試zookeeper監控<BR> * <B>概要說明:</B>主要測試watch功能<BR> * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //建立watcher //當前客戶端可以稱為一個watcher 觀察者角色 ZooKeeperWatcher zkWatch = new ZooKeeperWatcher(); //創建連接 zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT); //System.out.println(zkWatch.zk.toString()); Thread.sleep(1000); // 清理節點 zkWatch.deleteAllTestPath(false); //-----------------第一步: 創建父節點 /p ------------------------// if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) { Thread.sleep(1000); //-----------------第二步: 讀取節點 /p 和 讀取/p節點下的子節點(getChildren)的區別 --------------// // 讀取數據 zkWatch.readData(PARENT_PATH, true); // 讀取子節點(監控childNodeChange事件) zkWatch.getChildren(PARENT_PATH, true); // 更新數據 zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + ""); Thread.sleep(1000); // 創建子節點 zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", true); //-----------------第三步: 建立子節點的觸發 --------------// // zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true); // zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true); //-----------------第四步: 更新子節點數據的觸發 --------------// //在進行修改之前,我們需要watch一下這個節點: Thread.sleep(1000); zkWatch.readData(CHILDREN_PATH, true); zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + ""); } Thread.sleep(10000); // 清理節點 zkWatch.deleteAllTestPath(false); Thread.sleep(10000); zkWatch.releaseConnection(); } }

