zookeeper有watch事件,是一次性觸發的,當watch監視的數據發生變化時,通知設置了該watch的client,即watcher。
同樣,其watcher是監聽數據發送了某些變化,那就一定會有對應的事件類型,和狀態類型。
事件類型:(znode節點相關的)
EventType.NodeCreated
EventType.NodeDataChanged
EventType.NodeChildrenChanged
EventType.NodeDeleted
狀態類型:(是跟客戶端實例相關的)
KeeperState.Disconnected
KeeperState.SyncConnected
KeeperState.AuthFailed
KeeperState.Expired
wather的特性:一次性,客戶端串行執行,輕量。
一次性:對於ZK的watcher,你只需要記住一點:zookeeper有watch事件,是一次性觸發的,當watch監視的數據發生變化時,通知設置了該watch的client,即watcher,由於zookeeper的監控都是一次性的,所以每次都必須監控。
客戶端串行執行:客戶端watcher回調的過程是一個串行同步的過程,這為我們保證了順序,同時需要開發人員注意一點,千萬不用因為一個watcher的處理邏輯影響了整個客戶端的watcher回調。
輕量:WatcherEvent是Zookeeper整個Watcher通知機制的最小通知單元。整個單元結構只包含三部分:通知狀態,事件類型和節點路徑。也就是說Watcher通知非常的簡單,只會告訴客戶端發生了事件而不會告知其具體內容,需要客戶自己去進行獲取,而不會直接提供具體的數據內容。
我們通過一個示例,詳細學習下Watcher的概念和其目的。Watcher示例:
【ZookeeperWatcher】
1 package bhz.zookeeper.watcher; 2 3 import java.util.List; 4 import java.util.concurrent.CountDownLatch; 5 import java.util.concurrent.atomic.AtomicInteger; 6 7 import org.apache.zookeeper.CreateMode; 8 import org.apache.zookeeper.WatchedEvent; 9 import org.apache.zookeeper.Watcher; 10 import org.apache.zookeeper.Watcher.Event.EventType; 11 import org.apache.zookeeper.Watcher.Event.KeeperState; 12 import org.apache.zookeeper.ZooDefs.Ids; 13 import org.apache.zookeeper.ZooKeeper; 14 import org.apache.zookeeper.data.Stat; 15 16 /** 17 * Zookeeper Wathcher 18 * 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類) 19 * @author(alienware) 20 * @since 2015-6-14 21 */ 22 public class ZooKeeperWatcherYuCong implements Watcher { 23 24 /** 定義原子變量 */ 25 AtomicInteger seq = new AtomicInteger(); 26 /** 定義session失效時間 */ 27 private static final int SESSION_TIMEOUT = 10000; 28 /** zookeeper服務器地址 */ 29 private static final String CONNECTION_ADDR = "127.0.0.1:2181"; 30 /** zk父路徑設置 */ 31 private static final String PARENT_PATH = "/p"; 32 /** zk子路徑設置 */ 33 private static final String CHILDREN_PATH = "/p/c"; 34 /** 進入標識 */ 35 private static final String LOG_PREFIX_OF_MAIN = "【Main】"; 36 /** zk變量 */ 37 private ZooKeeper zk = null; 38 /**用於等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */ 39 private CountDownLatch connectedSemaphore = new CountDownLatch(1); 40 41 /** 42 * 創建ZK連接 43 * @param connectAddr ZK服務器地址列表 44 * @param sessionTimeout Session超時時間 45 */ 46 public void createConnection(String connectAddr, int sessionTimeout) { 47 this.releaseConnection(); 48 try { 49 //this表示把當前對象進行傳遞到其中去(也就是在主函數里實例化的new ZooKeeperWatcher()實例對象) 50 zk = new ZooKeeper(connectAddr, sessionTimeout, this); 51 System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務器"); 52 connectedSemaphore.await(); 53 } catch (Exception e) { 54 e.printStackTrace(); 55 } 56 } 57 58 /** 59 * 關閉ZK連接 60 */ 61 public void releaseConnection() { 62 if (this.zk != null) { 63 try { 64 this.zk.close(); 65 } catch (InterruptedException e) { 66 e.printStackTrace(); 67 } 68 } 69 } 70 71 /** 72 * 創建節點 73 * @param path 節點路徑 74 * @param data 數據內容 75 * @return 76 */ 77 public boolean createPath(String path, String data, boolean needWatch) { 78 try { 79 //設置監控(由於zookeeper的監控都是一次性的所以 每次必須設置監控) 80 this.zk.exists(path, needWatch); 81 System.out.println(LOG_PREFIX_OF_MAIN + "節點創建成功, Path: " + 82 this.zk.create( /**路徑*/ 83 path, 84 /**數據*/ 85 data.getBytes(), 86 /**所有可見*/ 87 Ids.OPEN_ACL_UNSAFE, 88 /**永久存儲*/ 89 CreateMode.PERSISTENT ) + 90 ", content: " + data); 91 } catch (Exception e) { 92 e.printStackTrace(); 93 return false; 94 } 95 return true; 96 } 97 98 /** 99 * 讀取指定節點數據內容 100 * @param path 節點路徑 101 * @return 102 */ 103 public String readData(String path, boolean needWatch) { 104 try { 105 System.out.println("讀取數據操作..."); 106 return new String(this.zk.getData(path, needWatch, null)); 107 } catch (Exception e) { 108 e.printStackTrace(); 109 return ""; 110 } 111 } 112 113 /** 114 * 更新指定節點數據內容 115 * @param path 節點路徑 116 * @param data 數據內容 117 * @return 118 */ 119 public boolean writeData(String path, String data) { 120 try { 121 System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " + 122 this.zk.setData(path, data.getBytes(), -1)); 123 } catch (Exception e) { 124 e.printStackTrace(); 125 return false; 126 } 127 return true; 128 } 129 130 /** 131 * 刪除指定節點 132 * 133 * @param path 134 * 節點path 135 */ 136 public void deleteNode(String path) { 137 try { 138 this.zk.delete(path, -1); 139 System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path); 140 } catch (Exception e) { 141 e.printStackTrace(); 142 } 143 } 144 145 /** 146 * 判斷指定節點是否存在 147 * @param path 節點路徑 148 */ 149 public Stat exists(String path, boolean needWatch) { 150 try { 151 return this.zk.exists(path, needWatch); 152 } catch (Exception e) { 153 e.printStackTrace(); 154 return null; 155 } 156 } 157 158 /** 159 * 獲取子節點 160 * @param path 節點路徑 161 */ 162 private List<String> getChildren(String path, boolean needWatch) { 163 try { 164 System.out.println("讀取子節點操作..."); 165 return this.zk.getChildren(path, needWatch); 166 } catch (Exception e) { 167 e.printStackTrace(); 168 return null; 169 } 170 } 171 172 /** 173 * 刪除所有節點 174 */ 175 public void deleteAllTestPath(boolean needWatch) { 176 if(this.exists(CHILDREN_PATH, needWatch) != null){ 177 this.deleteNode(CHILDREN_PATH); 178 } 179 if(this.exists(PARENT_PATH, needWatch) != null){ 180 this.deleteNode(PARENT_PATH); 181 } 182 } 183 184 /** 185 * 收到來自Server的Watcher通知后的處理。 186 */ 187 @Override 188 public void process(WatchedEvent event) { 189 190 System.out.println("進入 process 。。。。。event = " + event); 191 192 try { 193 Thread.sleep(200); 194 } catch (InterruptedException e) { 195 e.printStackTrace(); 196 } 197 198 if (event == null) { 199 return; 200 } 201 202 // 連接狀態 203 KeeperState keeperState = event.getState(); 204 // 事件類型 205 EventType eventType = event.getType(); 206 // 受影響的path 207 String path = event.getPath(); 208 //原子對象seq 記錄進入process的次數 209 String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】"; 210 211 System.out.println(logPrefix + "收到Watcher通知"); 212 System.out.println(logPrefix + "連接狀態:\t" + keeperState.toString()); 213 System.out.println(logPrefix + "事件類型:\t" + eventType.toString()); 214 215 if (KeeperState.SyncConnected == keeperState) { 216 // 成功連接上ZK服務器 217 if (EventType.None == eventType) { 218 System.out.println(logPrefix + "成功連接上ZK服務器"); 219 connectedSemaphore.countDown(); 220 } 221 //創建節點 222 else if (EventType.NodeCreated == eventType) { 223 System.out.println(logPrefix + "節點創建"); 224 try { 225 Thread.sleep(100); 226 } catch (InterruptedException e) { 227 e.printStackTrace(); 228 } 229 } 230 //更新節點 231 else if (EventType.NodeDataChanged == eventType) { 232 System.out.println(logPrefix + "節點數據更新"); 233 try { 234 Thread.sleep(100); 235 } catch (InterruptedException e) { 236 e.printStackTrace(); 237 } 238 } 239 //更新子節點 240 else if (EventType.NodeChildrenChanged == eventType) { 241 System.out.println(logPrefix + "子節點變更"); 242 try { 243 Thread.sleep(3000); 244 } catch (InterruptedException e) { 245 e.printStackTrace(); 246 } 247 } 248 //刪除節點 249 else if (EventType.NodeDeleted == eventType) { 250 System.out.println(logPrefix + "節點 " + path + " 被刪除"); 251 } 252 else { 253 System.out.println(logPrefix + "其他事件:" + eventType); 254 }; 255 } 256 else if (KeeperState.Disconnected == keeperState) { 257 System.out.println(logPrefix + "與ZK服務器斷開連接"); 258 } 259 else if (KeeperState.AuthFailed == keeperState) { 260 System.out.println(logPrefix + "權限檢查失敗"); 261 } 262 else if (KeeperState.Expired == keeperState) { 263 System.out.println(logPrefix + "會話失效"); 264 } 265 else { 266 System.out.println(logPrefix + "其他狀態:" + keeperState); 267 }; 268 269 System.out.println("--------------------------------------------"); 270 271 } 272 273 /** 274 * <B>方法名稱:</B>測試zookeeper監控<BR> 275 * <B>概要說明:</B>主要測試watch功能<BR> 276 * @param args 277 * @throws Exception 278 */ 279 public static void main(String[] args) throws Exception { 280 281 //建立watcher //當前客戶端可以稱為一個watcher 觀察者角色 282 ZooKeeperWatcherYuCong zkWatch = new ZooKeeperWatcherYuCong(); 283 //創建連接 284 zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT); 285 System.out.println(zkWatch.zk.toString()); 286 287 Thread.sleep(1000); 288 289 // 清理節點 290 zkWatch.deleteAllTestPath(false); 291 292 //-----------------第一步: 創建父節點 /p ------------------------// 293 if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", false)) { 294 295 Thread.sleep(1000); 296 297 //-----------------第二步: 讀取節點 /p 和 讀取/p節點下的子節點(getChildren)的區別 --------------// 298 // 讀取數據 299 zkWatch.readData(PARENT_PATH, false); 300 301 // 讀取子節點(監控childNodeChange事件) 302 zkWatch.getChildren(PARENT_PATH, false); 303 304 // 更新數據 305 zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + ""); 306 Thread.sleep(1000); 307 308 // 創建子節點 309 zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", false); 310 311 312 //-----------------第三步: 建立子節點的觸發 --------------// 313 // zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true); 314 // zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true); 315 316 //-----------------第四步: 更新子節點數據的觸發 --------------// 317 //在進行修改之前,我們需要watch一下這個節點: 318 Thread.sleep(1000); 319 zkWatch.readData(CHILDREN_PATH, true); 320 zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + ""); 321 322 } 323 324 Thread.sleep(10000); 325 // 清理節點 326 zkWatch.deleteAllTestPath(false); 327 328 329 Thread.sleep(10000); 330 zkWatch.releaseConnection(); 331 332 } 333 334 }