zookeeper(四)核心watch和watcher


zookeeper有watch事件,是一次性觸發的,當watch監視的數據發生變化時,通知設置了該watch的client,即watcher。

同樣,其watcher是監聽數據發送了某些變化,那就一定會有對應的事件類型,和狀態類型。

事件類型:(znode節點相關的)

EventType.NodeCreated

EventType.NodeDataChanged

EventType.NodeChildrenChanged

EventType.NodeDeleted

狀態類型:(是跟客戶端實例相關的)

KeeperState.Disconnected

KeeperState.SyncConnected

KeeperState.AuthFailed

KeeperState.Expired

 

wather的特性:一次性,客戶端串行執行,輕量。

一次性:對於ZK的watcher,你只需要記住一點:zookeeper有watch事件,是一次性觸發的,當watch監視的數據發生變化時,通知設置了該watch的client,即watcher,由於zookeeper的監控都是一次性的,所以每次都必須監控。

客戶端串行執行:客戶端watcher回調的過程是一個串行同步的過程,這為我們保證了順序,同時需要開發人員注意一點,千萬不用因為一個watcher的處理邏輯影響了整個客戶端的watcher回調。

輕量:WatcherEvent是Zookeeper整個Watcher通知機制的最小通知單元。整個單元結構只包含三部分:通知狀態,事件類型和節點路徑。也就是說Watcher通知非常的簡單,只會告訴客戶端發生了事件而不會告知其具體內容,需要客戶自己去進行獲取,而不會直接提供具體的數據內容。

 

我們通過一個示例,詳細學習下Watcher的概念和其目的。Watcher示例:

【ZookeeperWatcher】

 

  1 package bhz.zookeeper.watcher;
  2 
  3 import java.util.List;
  4 import java.util.concurrent.CountDownLatch;
  5 import java.util.concurrent.atomic.AtomicInteger;
  6 
  7 import org.apache.zookeeper.CreateMode;
  8 import org.apache.zookeeper.WatchedEvent;
  9 import org.apache.zookeeper.Watcher;
 10 import org.apache.zookeeper.Watcher.Event.EventType;
 11 import org.apache.zookeeper.Watcher.Event.KeeperState;
 12 import org.apache.zookeeper.ZooDefs.Ids;
 13 import org.apache.zookeeper.ZooKeeper;
 14 import org.apache.zookeeper.data.Stat;
 15 
 16 /**
 17  * Zookeeper Wathcher 
 18  * 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類)
 19  * @author(alienware)
 20  * @since 2015-6-14
 21  */
 22 public class ZooKeeperWatcherYuCong implements Watcher {
 23 
 24     /** 定義原子變量 */
 25     AtomicInteger seq = new AtomicInteger();
 26     /** 定義session失效時間 */
 27     private static final int SESSION_TIMEOUT = 10000;
 28     /** zookeeper服務器地址 */
 29     private static final String CONNECTION_ADDR = "127.0.0.1:2181";
 30     /** zk父路徑設置 */
 31     private static final String PARENT_PATH = "/p";
 32     /** zk子路徑設置 */
 33     private static final String CHILDREN_PATH = "/p/c";
 34     /** 進入標識 */
 35     private static final String LOG_PREFIX_OF_MAIN = "【Main】";
 36     /** zk變量 */
 37     private ZooKeeper zk = null;
 38     /**用於等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */
 39     private CountDownLatch connectedSemaphore = new CountDownLatch(1);
 40 
 41     /**
 42      * 創建ZK連接
 43      * @param connectAddr ZK服務器地址列表
 44      * @param sessionTimeout Session超時時間
 45      */
 46     public void createConnection(String connectAddr, int sessionTimeout) {
 47         this.releaseConnection();
 48         try {
 49             //this表示把當前對象進行傳遞到其中去(也就是在主函數里實例化的new ZooKeeperWatcher()實例對象)
 50             zk = new ZooKeeper(connectAddr, sessionTimeout, this);
 51             System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務器");
 52             connectedSemaphore.await();
 53         } catch (Exception e) {
 54             e.printStackTrace();
 55         }
 56     }
 57 
 58     /**
 59      * 關閉ZK連接
 60      */
 61     public void releaseConnection() {
 62         if (this.zk != null) {
 63             try {
 64                 this.zk.close();
 65             } catch (InterruptedException e) {
 66                 e.printStackTrace();
 67             }
 68         }
 69     }
 70 
 71     /**
 72      * 創建節點
 73      * @param path 節點路徑
 74      * @param data 數據內容
 75      * @return 
 76      */
 77     public boolean createPath(String path, String data, boolean needWatch) {
 78         try {
 79             //設置監控(由於zookeeper的監控都是一次性的所以 每次必須設置監控)
 80             this.zk.exists(path, needWatch);
 81             System.out.println(LOG_PREFIX_OF_MAIN + "節點創建成功, Path: " + 
 82                                this.zk.create(    /**路徑*/ 
 83                                                    path, 
 84                                                    /**數據*/
 85                                                    data.getBytes(), 
 86                                                    /**所有可見*/
 87                                                    Ids.OPEN_ACL_UNSAFE, 
 88                                                    /**永久存儲*/
 89                                                    CreateMode.PERSISTENT ) +     
 90                                ", content: " + data);
 91         } catch (Exception e) {
 92             e.printStackTrace();
 93             return false;
 94         }
 95         return true;
 96     }
 97 
 98     /**
 99      * 讀取指定節點數據內容
100      * @param path 節點路徑
101      * @return
102      */
103     public String readData(String path, boolean needWatch) {
104         try {
105             System.out.println("讀取數據操作...");
106             return new String(this.zk.getData(path, needWatch, null));
107         } catch (Exception e) {
108             e.printStackTrace();
109             return "";
110         }
111     }
112 
113     /**
114      * 更新指定節點數據內容
115      * @param path 節點路徑
116      * @param data 數據內容
117      * @return
118      */
119     public boolean writeData(String path, String data) {
120         try {
121             System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " +
122                                 this.zk.setData(path, data.getBytes(), -1));
123         } catch (Exception e) {
124             e.printStackTrace();
125             return false;
126         }
127         return true;
128     }
129 
130     /**
131      * 刪除指定節點
132      * 
133      * @param path
134      *            節點path
135      */
136     public void deleteNode(String path) {
137         try {
138             this.zk.delete(path, -1);
139             System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
140         } catch (Exception e) {
141             e.printStackTrace();
142         }
143     }
144 
145     /**
146      * 判斷指定節點是否存在
147      * @param path 節點路徑
148      */
149     public Stat exists(String path, boolean needWatch) {
150         try {
151             return this.zk.exists(path, needWatch);
152         } catch (Exception e) {
153             e.printStackTrace();
154             return null;
155         }
156     }
157 
158     /**
159      * 獲取子節點
160      * @param path 節點路徑
161      */
162     private List<String> getChildren(String path, boolean needWatch) {
163         try {
164             System.out.println("讀取子節點操作...");
165             return this.zk.getChildren(path, needWatch);
166         } catch (Exception e) {
167             e.printStackTrace();
168             return null;
169         }
170     }
171 
172     /**
173      * 刪除所有節點
174      */
175     public void deleteAllTestPath(boolean needWatch) {
176         if(this.exists(CHILDREN_PATH, needWatch) != null){
177             this.deleteNode(CHILDREN_PATH);
178         }
179         if(this.exists(PARENT_PATH, needWatch) != null){
180             this.deleteNode(PARENT_PATH);
181         }        
182     }
183     
184     /**
185      * 收到來自Server的Watcher通知后的處理。
186      */
187     @Override
188     public void process(WatchedEvent event) {
189         
190         System.out.println("進入 process 。。。。。event = " + event);
191         
192         try {
193             Thread.sleep(200);
194         } catch (InterruptedException e) {
195             e.printStackTrace();
196         }
197         
198         if (event == null) {
199             return;
200         }
201         
202         // 連接狀態
203         KeeperState keeperState = event.getState();
204         // 事件類型
205         EventType eventType = event.getType();
206         // 受影響的path
207         String path = event.getPath();
208         //原子對象seq 記錄進入process的次數
209         String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
210 
211         System.out.println(logPrefix + "收到Watcher通知");
212         System.out.println(logPrefix + "連接狀態:\t" + keeperState.toString());
213         System.out.println(logPrefix + "事件類型:\t" + eventType.toString());
214 
215         if (KeeperState.SyncConnected == keeperState) {
216             // 成功連接上ZK服務器
217             if (EventType.None == eventType) {
218                 System.out.println(logPrefix + "成功連接上ZK服務器");
219                 connectedSemaphore.countDown();
220             } 
221             //創建節點
222             else if (EventType.NodeCreated == eventType) {
223                 System.out.println(logPrefix + "節點創建");
224                 try {
225                     Thread.sleep(100);
226                 } catch (InterruptedException e) {
227                     e.printStackTrace();
228                 }
229             } 
230             //更新節點
231             else if (EventType.NodeDataChanged == eventType) {
232                 System.out.println(logPrefix + "節點數據更新");
233                 try {
234                     Thread.sleep(100);
235                 } catch (InterruptedException e) {
236                     e.printStackTrace();
237                 }
238             } 
239             //更新子節點
240             else if (EventType.NodeChildrenChanged == eventType) {
241                 System.out.println(logPrefix + "子節點變更");
242                 try {
243                     Thread.sleep(3000);
244                 } catch (InterruptedException e) {
245                     e.printStackTrace();
246                 }
247             } 
248             //刪除節點
249             else if (EventType.NodeDeleted == eventType) {
250                 System.out.println(logPrefix + "節點 " + path + " 被刪除");
251             }
252             else {
253                 System.out.println(logPrefix + "其他事件:" + eventType);
254             };
255         } 
256         else if (KeeperState.Disconnected == keeperState) {
257             System.out.println(logPrefix + "與ZK服務器斷開連接");
258         } 
259         else if (KeeperState.AuthFailed == keeperState) {
260             System.out.println(logPrefix + "權限檢查失敗");
261         } 
262         else if (KeeperState.Expired == keeperState) {
263             System.out.println(logPrefix + "會話失效");
264         }
265         else {
266             System.out.println(logPrefix + "其他狀態:" + keeperState);
267         };
268 
269         System.out.println("--------------------------------------------");
270 
271     }
272 
273     /**
274      * <B>方法名稱:</B>測試zookeeper監控<BR>
275      * <B>概要說明:</B>主要測試watch功能<BR>
276      * @param args
277      * @throws Exception
278      */
279     public static void main(String[] args) throws Exception {
280 
281         //建立watcher //當前客戶端可以稱為一個watcher 觀察者角色
282         ZooKeeperWatcherYuCong zkWatch = new ZooKeeperWatcherYuCong();
283         //創建連接 
284         zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
285         System.out.println(zkWatch.zk.toString());
286         
287         Thread.sleep(1000);
288         
289         // 清理節點
290         zkWatch.deleteAllTestPath(false);
291         
292         //-----------------第一步: 創建父節點 /p ------------------------//
293         if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", false)) {
294             
295             Thread.sleep(1000);
296             
297             //-----------------第二步: 讀取節點 /p 和    讀取/p節點下的子節點(getChildren)的區別 --------------//
298             // 讀取數據
299             zkWatch.readData(PARENT_PATH, false);
300         
301             // 讀取子節點(監控childNodeChange事件)
302             zkWatch.getChildren(PARENT_PATH, false);
303 
304             // 更新數據
305             zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
306             Thread.sleep(1000);
307             
308             // 創建子節點
309             zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", false);
310             
311             
312             //-----------------第三步: 建立子節點的觸發 --------------//
313 //            zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true);
314 //            zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true);
315             
316             //-----------------第四步: 更新子節點數據的觸發 --------------//
317             //在進行修改之前,我們需要watch一下這個節點:
318             Thread.sleep(1000);
319             zkWatch.readData(CHILDREN_PATH, true);
320             zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
321             
322         }
323         
324         Thread.sleep(10000);
325         // 清理節點
326         zkWatch.deleteAllTestPath(false);
327         
328         
329         Thread.sleep(10000);
330         zkWatch.releaseConnection();
331         
332     }
333 
334 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM