zookeeper(四):核心原理(Watcher、事件和狀態)


zookeeper主要是為了統一分布式系統中各個節點的工作狀態,在資源沖突的情況下協調提供節點資源搶占,提供給每個節點了解整個集群所處狀態的途徑這一切的實現都依賴於zookeeper中的事件監聽和通知機制

zookeeper中的事件和狀態

事件狀態構成了zookeeper客戶端連接描述的兩個維度。注意,網上很多帖子都是在介紹zookeeper客戶端連接的事件,但是忽略了zookeeper客戶端狀態的變化也是要進行監聽和通知的。這里我們通過下面的兩個表詳細介紹zookeeper中的事件和狀態(zookeeper API中被定義為@Deprecated的事件和狀態就不介紹了)。

zookeeper客戶端與zookeeper server連接的狀態

連接狀態 狀態含義
KeeperState.Expired 客戶端和服務器在ticktime的時間周期內,是要發送心跳通知的。這是租約協議的一個實現。客戶端發送request,告訴服務器其上一個租約時間,服務器收到這個請求后,告訴客戶端其下一個租約時間是哪個時間點。當客戶端時間戳達到最后一個租約時間,而沒有收到服務器發來的任何新租約時間,即認為自己下線(此后客戶端會廢棄這次連接,並試圖重新建立連接)。這個過期狀態就是Expired狀態
KeeperState.Disconnected 就像上面那個狀態所述,當客戶端斷開一個連接(可能是租約期滿,也可能是客戶端主動斷開)這是客戶端和服務器的連接就是Disconnected狀態
KeeperState.SyncConnected 一旦客戶端和服務器的某一個節點建立連接(注意,雖然集群有多個節點,但是客戶端一次連接到一個節點就行了),並完成一次version、zxid的同步,這時的客戶端和服務器的連接狀態就是SyncConnected
KeeperState.AuthFailed zookeeper客戶端進行連接認證失敗時,發生該狀態

需要說明的是,這些狀態在觸發時,所記錄的事件類型都是:EventType.None

zookeeper中的watch事件(當zookeeper客戶端監聽某個znode節點”/node-x”時)

zookeeper事件 事件含義
EventType.NodeCreated 當node-x這個節點被創建時,該事件被觸發
EventType.NodeChildrenChanged 當node-x這個節點的直接子節點被創建、被刪除、子節點數據發生變更時,該事件被觸發。
EventType.NodeDataChanged 當node-x這個節點的數據發生變更時,該事件被觸發
EventType.NodeDeleted 當node-x這個節點被刪除時,該事件被觸發。
EventType.None 當zookeeper客戶端的連接狀態發生變更時,即KeeperState.Expired、KeeperState.Disconnected、KeeperState.SyncConnected、KeeperState.AuthFailed狀態切換時,描述的事件類型為EventType.None

 

watch機制

Znode發生變化(Znode本身的增加,刪除,修改,以及子Znode的變化)可以通過Watch機制通知到客戶端。那么要實現Watch,就必須實現org.apache.zookeeper.Watcher接口,並且將實現類的對象傳入到可以Watch的方法中。Zookeeper中所有讀操作(getData(),getChildren(),exists())都可以設置Watch選項。Watch事件具有one-time trigger(一次性觸發)的特性,如果Watch監視的Znode有變化,那么就會通知設置該Watch的客戶端。

在上述說道的所有讀操作中,如果需要Watcher,我們可以自定義Watcher,如果是Boolean型變量,當為true時,則使用系統默認的Watcher,系統默認的Watcher是在Zookeeper的構造函數中定義的Watcher。參數中Watcher為空或者false,表示不啟用Wather。

watch特性1:一次性觸發器

客戶端在Znode設置了Watch時,如果Znode內容發生改變,那么客戶端就會獲得Watch事件。例如:客戶端設置getData("/znode1", true)后,如果/znode1發生改變或者刪除,那么客戶端就會得到一個/znode1的Watch事件,但是/znode1再次發生變化,那客戶端是無法收到Watch事件的,除非客戶端設置了新的Watch。

watch特性2:發送至客戶端

Watch事件是異步發送到Client。Zookeeper可以保證客戶端發送過去的更新順序是有序的。例如:某個Znode沒有設置watcher,那么客戶端對這個Znode設置Watcher發送到集群之前,該客戶端是感知不到該Znode任何的改變情況的。換個角度來解釋:由於Watch有一次性觸發的特點,所以在服務器端沒有Watcher的情況下,Znode的任何變更就不會通知到客戶端。不過,即使某個Znode設置了Watcher,且在Znode有變化的情況下通知到了客戶端,但是在客戶端接收到這個變化事件,但是還沒有再次設置Watcher之前,如果其他客戶端對該Znode做了修改,這種情況下,Znode第二次的變化客戶端是無法收到通知的。這可能是由於網絡延遲或者是其他因素導致,所以我們使用Zookeeper不能期望能夠監控到節點每次的變化。Zookeeper只能保證最終的一致性,而無法保證強一致性。

watch特性3:設置watch的數據內容

Znode改變有很多種方式,例如:節點創建,節點刪除,節點改變,子節點改變等等。Zookeeper維護了兩個Watch列表,一個節點數據Watch列表,另一個是子節點Watch列表。getData()和exists()設置數據Watch,getChildren()設置子節點Watch。兩者選其一,可以讓我們根據不同的返回結果選擇不同的Watch方式,getData()和exists()返回節點的內容,getChildren()返回子節點列表。因此,setData()觸發內容Watch,create()觸發當前節點的內容Watch或者是其父節點的子節點Watch。delete()同時觸發父節點的子節點Watch和內容Watch,以及子節點的內容Watch。

Zookeeper Watcher的運行機制

1,Watch是輕量級的,其實就是本地JVM的Callback,服務器端只是存了是否有設置了Watcher的布爾類型。(源碼見:org.apache.zookeeper.server.FinalRequestProcessor)
2,在服務端,在FinalRequestProcessor處理對應的Znode操作時,會根據客戶端傳遞的watcher變量,添加到對應的ZKDatabase(org.apache.zookeeper.server.ZKDatabase)中進行持久化存儲,同時將自己NIOServerCnxn做為一個Watcher callback,監聽服務端事件變化
3,Leader通過投票通過了某次Znode變化的請求后,然后通知對應的Follower,Follower根據自己內存中的zkDataBase信息,發送notification信息給zookeeper客戶端。
4,Zookeeper客戶端接收到notification信息后,找到對應變化path的watcher列表,挨個進行觸發回調。

流程圖

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * Zookeeper Wathcher 
 * 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類)
 * @author(alienware)
 * @since 2015-6-14
 */
public class ZooKeeperWatcher implements Watcher {

    /** 定義原子變量 */
    AtomicInteger seq = new AtomicInteger();
    /** 定義session失效時間 */
    private static final int SESSION_TIMEOUT = 10000;
    /** zookeeper服務器地址 */
    private static final String CONNECTION_ADDR = "192.168.1.121:2181,192.168.1.122:2181,192.168.1.123:2181";
    /** zk父路徑設置 */
    private static final String PARENT_PATH = "/p";
    /** zk子路徑設置 */
    private static final String CHILDREN_PATH = "/p/c";
    /** 進入標識 */
    private static final String LOG_PREFIX_OF_MAIN = "【Main】";
    /** zk變量 */
    private ZooKeeper zk = null;
    /**用於等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 創建ZK連接
     * @param connectAddr ZK服務器地址列表
     * @param sessionTimeout Session超時時間
     */
    public void createConnection(String connectAddr, int sessionTimeout) {
        this.releaseConnection();
        try {
            //this表示把當前對象進行傳遞到其中去(也就是在主函數里實例化的new ZooKeeperWatcher()實例對象)
            zk = new ZooKeeper(connectAddr, sessionTimeout, this);
            System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務器");
            connectedSemaphore.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉ZK連接
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 創建節點
     * @param path 節點路徑
     * @param data 數據內容
     * @return 
     */
    public boolean createPath(String path, String data, boolean needWatch) {
        try {
            //設置監控(由於zookeeper的監控都是一次性的所以 每次必須設置監控)
            this.zk.exists(path, needWatch);
            System.out.println(LOG_PREFIX_OF_MAIN + "節點創建成功, Path: " + 
                               this.zk.create(    /**路徑*/ 
                                                   path, 
                                                   /**數據*/
                                                   data.getBytes(), 
                                                   /**所有可見*/
                                                   Ids.OPEN_ACL_UNSAFE, 
                                                   /**永久存儲*/
                                                   CreateMode.PERSISTENT ) +     
                               ", content: " + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 讀取指定節點數據內容
     * @param path 節點路徑
     * @return
     */
    public String readData(String path, boolean needWatch) {
        try {
            System.out.println("讀取數據操作...");
            return new String(this.zk.getData(path, needWatch, null));
        } catch (Exception e) {
            e.printStackTrace();
            return "";
        }
    }

    /**
     * 更新指定節點數據內容
     * @param path 節點路徑
     * @param data 數據內容
     * @return
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " +
                                this.zk.setData(path, data.getBytes(), -1));
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 刪除指定節點
     * 
     * @param path
     *            節點path
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 判斷指定節點是否存在
     * @param path 節點路徑
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 獲取子節點
     * @param path 節點路徑
     */
    private List<String> getChildren(String path, boolean needWatch) {
        try {
            System.out.println("讀取子節點操作...");
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 刪除所有節點
     */
    public void deleteAllTestPath(boolean needWatch) {
        if(this.exists(CHILDREN_PATH, needWatch) != null){
            this.deleteNode(CHILDREN_PATH);
        }
        if(this.exists(PARENT_PATH, needWatch) != null){
            this.deleteNode(PARENT_PATH);
        }        
    }
    
    /**
     * 收到來自Server的Watcher通知后的處理。
     */
    @Override
    public void process(WatchedEvent event) {
        
        System.out.println("進入 process 。。。。。event = " + event);
        
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        if (event == null) {
            return;
        }
        
        // 連接狀態
        KeeperState keeperState = event.getState();
        // 事件類型
        EventType eventType = event.getType();
        // 受影響的path
        String path = event.getPath();
        //原子對象seq 記錄進入process的次數
        String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

        System.out.println(logPrefix + "收到Watcher通知");
        System.out.println(logPrefix + "連接狀態:\t" + keeperState.toString());
        System.out.println(logPrefix + "事件類型:\t" + eventType.toString());

        if (KeeperState.SyncConnected == keeperState) {
            // 成功連接上ZK服務器
            if (EventType.None == eventType) {
                System.out.println(logPrefix + "成功連接上ZK服務器");
                connectedSemaphore.countDown();
            } 
            //創建節點
            else if (EventType.NodeCreated == eventType) {
                System.out.println(logPrefix + "節點創建");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } 
            //更新節點
            else if (EventType.NodeDataChanged == eventType) {
                System.out.println(logPrefix + "節點數據更新");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } 
            //更新子節點
            else if (EventType.NodeChildrenChanged == eventType) {
                System.out.println(logPrefix + "子節點變更");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } 
            //刪除節點
            else if (EventType.NodeDeleted == eventType) {
                System.out.println(logPrefix + "節點 " + path + " 被刪除");
            }
            else ;
        } 
        else if (KeeperState.Disconnected == keeperState) {
            System.out.println(logPrefix + "與ZK服務器斷開連接");
        } 
        else if (KeeperState.AuthFailed == keeperState) {
            System.out.println(logPrefix + "權限檢查失敗");
        } 
        else if (KeeperState.Expired == keeperState) {
            System.out.println(logPrefix + "會話失效");
        }
        else ;

        System.out.println("--------------------------------------------");

    }

    /**
     * <B>方法名稱:</B>測試zookeeper監控<BR>
     * <B>概要說明:</B>主要測試watch功能<BR>
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        //建立watcher //當前客戶端可以稱為一個watcher 觀察者角色
        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
        //創建連接 
        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
        //System.out.println(zkWatch.zk.toString());
        
        Thread.sleep(1000);
        
        // 清理節點
        zkWatch.deleteAllTestPath(false);
        
        //-----------------第一步: 創建父節點 /p ------------------------//
        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) {
            
            Thread.sleep(1000);
            
            //-----------------第二步: 讀取節點 /p 和    讀取/p節點下的子節點(getChildren)的區別 --------------//
            // 讀取數據
            zkWatch.readData(PARENT_PATH, true);
        
            // 讀取子節點(監控childNodeChange事件)
            zkWatch.getChildren(PARENT_PATH, true);

            // 更新數據
            zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
            
            Thread.sleep(1000);
            // 創建子節點
            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", true);
            
            
            //-----------------第三步: 建立子節點的觸發 --------------//
//            zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true);
//            zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true);
            
            //-----------------第四步: 更新子節點數據的觸發 --------------//
            //在進行修改之前,我們需要watch一下這個節點:
            Thread.sleep(1000);
            zkWatch.readData(CHILDREN_PATH, true);
            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
            
        }
        
        Thread.sleep(10000);
        // 清理節點
        zkWatch.deleteAllTestPath(false);
        
        
        Thread.sleep(10000);
        zkWatch.releaseConnection();
        
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM