【Zookeeper】源碼分析之Watcher機制(一)


一、前言

  前面已經分析了Zookeeper持久話相關的類,下面接着分析Zookeeper中的Watcher機制所涉及到的類。

二、總體框圖

  對於Watcher機制而言,主要涉及的類主要如下。

  

  說明:

  Watcher,接口類型,其定義了process方法,需子類實現。

  Event,接口類型,Watcher的內部類,無任何方法。

  KeeperState,枚舉類型,Event的內部類,表示Zookeeper所處的狀態。

  EventType,枚舉類型,Event的內部類,表示Zookeeper中發生的事件類型。

  WatchedEvent,表示對ZooKeeper上發生變化后的反饋,包含了KeeperState和EventType。

  ClientWatchManager,接口類型,表示客戶端的Watcher管理者,其定義了materialized方法,需子類實現。

  ZKWatchManager,Zookeeper的內部類,繼承ClientWatchManager。

  MyWatcher,ZooKeeperMain的內部類,繼承Watcher。

  ServerCnxn,接口類型,繼承Watcher,表示客戶端與服務端的一個連接。

  WatchManager,管理Watcher。

三、Watcher源碼分析

  3.1 內部類

  Event,接口類型,表示事件代表的狀態,除去其內部類,其源碼結構如下  

public interface Event {}

  說明:可以看到,Event接口並沒有定義任何屬性和方法,但是其包含了KeeperState和EventType兩個內部枚舉類。

  3.2 接口方法  

abstract public void process(WatchedEvent event);

  說明:其代表了實現Watcher接口時必須實現的的方法,即定義進行處理,WatchedEvent表示觀察的事件。

四、Event源碼分析

  3.1 內部類

  1. KeeperState  

        public enum KeeperState { // 事件發生時Zookeeper的狀態
            /** Unused, this state is never generated by the server */
            @Deprecated
            // 未知狀態,不再使用,服務器不會產生此狀態
            Unknown (-1), 

            /** The client is in the disconnected state - it is not connected
             * to any server in the ensemble. */
            // 斷開
            Disconnected (0),

            /** Unused, this state is never generated by the server */
            @Deprecated
            // 未同步連接,不再使用,服務器不會產生此狀態
            NoSyncConnected (1),

            /** The client is in the connected state - it is connected
             * to a server in the ensemble (one of the servers specified
             * in the host connection parameter during ZooKeeper client
             * creation). */
            // 同步連接狀態
            SyncConnected (3),

            /**
             * Auth failed state
             */
            // 認證失敗狀態
            AuthFailed (4),

            /**
             * The client is connected to a read-only server, that is the
             * server which is not currently connected to the majority.
             * The only operations allowed after receiving this state is
             * read operations.
             * This state is generated for read-only clients only since
             * read/write clients aren't allowed to connect to r/o servers.
             */
            // 只讀連接狀態
            ConnectedReadOnly (5),

            /**
              * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
              * so that they can perform Zookeeper actions with their SASL-authorized permissions.
              */
            // SASL認證通過狀態
            SaslAuthenticated(6),

            /** The serving cluster has expired this session. The ZooKeeper
             * client connection (the session) is no longer valid. You must
             * create a new client connection (instantiate a new ZooKeeper
             * instance) if you with to access the ensemble. */
            // 過期狀態
            Expired (-112);

            // 代表狀態的整形值
            private final int intValue;     // Integer representation of value
                                            // for sending over wire

                                            
            // 構造函數
            KeeperState(int intValue) {
                this.intValue = intValue;
            }

            // 返回整形值
            public int getIntValue() {
                return intValue;
            }

            // 從整形值構造相應的狀態
            public static KeeperState fromInt(int intValue) {
                switch(intValue) {
                    case   -1: return KeeperState.Unknown;
                    case    0: return KeeperState.Disconnected;
                    case    1: return KeeperState.NoSyncConnected;
                    case    3: return KeeperState.SyncConnected;
                    case    4: return KeeperState.AuthFailed;
                    case    5: return KeeperState.ConnectedReadOnly;
                    case    6: return KeeperState.SaslAuthenticated;
                    case -112: return KeeperState.Expired;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }

  說明:KeeperState是一個枚舉類,其定義了在事件發生時Zookeeper所處的各種狀態,其還定義了一個從整形值返回對應狀態的方法fromInt。

  2. EventType 

        public enum EventType { // 事件類型
            //
            None (-1),
            // 結點創建
            NodeCreated (1),
            // 結點刪除
            NodeDeleted (2),
            // 結點數據變化
            NodeDataChanged (3),
            // 結點子節點變化
            NodeChildrenChanged (4);

            // 代表事件類型的整形 
            private final int intValue;     // Integer representation of value
                                            // for sending over wire

            // 構造函數
            EventType(int intValue) {
                this.intValue = intValue;
            }

            // 返回整形
            public int getIntValue() {
                return intValue;
            }

            // 從整形構造相應的事件
            public static EventType fromInt(int intValue) {
                switch(intValue) {
                    case -1: return EventType.None;
                    case  1: return EventType.NodeCreated;
                    case  2: return EventType.NodeDeleted;
                    case  3: return EventType.NodeDataChanged;
                    case  4: return EventType.NodeChildrenChanged;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to EventType");
                }
            }           
        }
    }

  說明:EventType是一個枚舉類,其定義了事件的類型(如創建節點、刪除節點等事件),同時,其還定義了一個從整形值返回對應事件類型的方法fromInt。

五、WatchedEvent

  5.1 類的屬性  

public class WatchedEvent {
    // Zookeeper的狀態
    final private KeeperState keeperState;
    // 事件類型
    final private EventType eventType;
    // 事件所涉及節點的路徑
    private String path;
}

  說明:WatchedEvent類包含了三個屬性,分別代表事件發生時Zookeeper的狀態、事件類型和發生事件所涉及的節點路徑。

  5.2 構造函數

  1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型構造函數 

    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
        // 初始化屬性
        this.keeperState = keeperState;
        this.eventType = eventType;
        this.path = path;
    }

  說明:構造函數傳入了三個參數,然后分別對屬性進行賦值操作。

  2. public WatchedEvent(WatcherEvent eventMessage)型構造函數  

    public WatchedEvent(WatcherEvent eventMessage) {
        // 從eventMessage中取出相應屬性進行賦值
        keeperState = KeeperState.fromInt(eventMessage.getState());
        eventType = EventType.fromInt(eventMessage.getType());
        path = eventMessage.getPath();
    }

  說明:構造函數傳入了WatcherEvent參數,之后直接從該參數中取出相應屬性進行賦值操作。

  對於WatchedEvent類的方法而言,相對簡單,包含了幾個getXXX方法,用於獲取相應的屬性值。

六、ClientWatchManager

  6.1 接口方法 

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

  說明:該方法表示事件發生時,返回需要被通知的Watcher集合,可能為空集合。

七、ZKWatchManager

  7.1 類的屬性 

    private static class ZKWatchManager implements ClientWatchManager {
        // 數據變化的Watchers
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        // 節點存在與否的Watchers
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        // 子節點變化的Watchers
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();
    }

  說明:ZKWatchManager實現了ClientWatchManager,並定義了三個Map鍵值對,鍵為節點路徑,值為Watcher。分別對應數據變化的Watcher、節點是否存在的Watcher、子節點變化的Watcher。

  7.2 核心方法分析

  1. materialize方法

        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            // 新生成結果Watcher集合
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) { // 確定事件類型
            case None: // 無類型
                // 添加默認Watcher
                result.add(defaultWatcher);
                // 是否需要清空(提取對zookeeper.disableAutoWatchReset字段進行配置的值、Zookeeper的狀態是否為同步連接)
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;

                synchronized(dataWatches) { // 同步塊
                    for(Set<Watcher> ws: dataWatches.values()) {
                        // 添加至結果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) { // 同步塊 
                    for(Set<Watcher> ws: existWatches.values()) {
                        // 添加至結果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) { // 同步塊
                    for(Set<Watcher> ws: childWatches.values()) {
                        // 添加至結果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        childWatches.clear();
                    }
                }
                // 返回結果
                return result;
            case NodeDataChanged: // 節點數據變化
            case NodeCreated: // 創建節點
                synchronized (dataWatches) { // 同步塊
                    // 移除clientPath對應的Watcher后全部添加至結果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) { 
                    // 移除clientPath對應的Watcher后全部添加至結果集合
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged: // 節點子節點變化
                synchronized (childWatches) {
                    // 移除clientPath對應的Watcher后全部添加至結果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted: // 刪除節點
                synchronized (dataWatches) { 
                    // 移除clientPath對應的Watcher后全部添加至結果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    // 移除clientPath對應的Watcher
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        // 移除clientPath對應的Watcher后全部添加至結果集合
                        addTo(existWatches.remove(clientPath), result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    // 移除clientPath對應的Watcher后全部添加至結果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default: // 缺省處理
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            // 返回結果集合
            return result;
        }
    }

  說明:該方法在事件發生后,返回需要被通知的Watcher集合。在該方法中,首先會根據EventType類型確定相應的事件類型,然后根據事件類型的不同做出相應的操作,如針對None類型,即無任何事件,則首先會從三個鍵值對中刪除clientPath對應的Watcher,然后將剩余的Watcher集合添加至結果集合;針對NodeDataChanged和NodeCreated事件而言,其會從dataWatches和existWatches中刪除clientPath對應的Watcher,然后將剩余的Watcher集合添加至結果集合。

八、總結

  針對Watcher機制的第一部分的源碼分析就已經完成,可以看到此部分的源碼相對簡單,之后會分析org.apache.zookeeper.server下的WatchManager和ClientWatchManager所在外部類ZooKeeper,也謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM