ZooKeeper源碼分析(一)---ZooKeeper接口介紹


一、Server角色

每個Server在工作過程中有三種狀態:

  ① LOOKING:當前Server不知道leader是誰,正在搜尋。

  ② LEADING:當前Server即為選舉出來的leader

  ③ FOLLOWING:leader已經選舉出來,當前Server與之同步

QuorumPeer定義了server的類型,其中ServerState表示server類型,LeanerType表示當ServerState為FOLLOWING時是參與者還是觀察者,前者稱為follower,后者稱為observer。

代碼如下

  1. public class QuorumPeer extends Thread implements QuorumStats.Provider{
  2.     public enum ServerState {
  3.       LOOKING, FOLLOWING, LEADING, OBSERVING;
  4.     }
  5.     public enum LearnerType {
  6.       PARTICIPANT, OBSERVER;
  7.     }
  8. }

二、Znode類型

CreateMode中定義了四種節點類型,分別對應:

  PERSISTENT:永久節點

  EPHEMERAL:臨時節點

  ERSISTENT_SEQUENTIAL:永久節點、序列化

  EPHEMERAL_SEQUENTIAL:臨時節點、序列化

代碼如下

  1. public enum CreateMode {
  2.    PERSISTENT (0, false, false),
  3.    PERSISTENT_SEQUENTIAL (2, false, true),
  4.    EPHEMERAL (1, true, false),
  5.    EPHEMERAL_SEQUENTIAL (3, true, true);
  6. }

三、Stat類

Stat類定義znode節點的元信息,主要成員變量如下:

  1. public class Stat implements Record {
  2.    private long czxid; // 創建時的zxid
  3.    private long mzxid; // 最新修改的zxid
  4.    private long ctime; // 創建時間 private
  5.    long mtime; // 修改時間
  6.    private int version; // 版本號,對應znode data
  7.    private int cversion; // 版本號,對應子znode
  8.    private int aversion; // 版本號,對應acl
  9.    private long ephemeralOwner; // 臨時節點對應的client session id,默認為0
  10.    private int dataLength; // znode data長度
  11.    private int numChildren; // 子znode個數
  12.    private long pzxid; // 最新修改的zxid,貌似與mzxid重合了
  13. }

注意:

StatStatPersisitedStatPersisitedV1三個類,其成員變量和邏輯基本一致,但StatPersisited類少了dataLength和numChildren屬性,StatPersisitedV1類少了dataLength、 numChildren和pzxid屬性,具體不同類用在什么地方待進一步分析。

三、DataNOde類

  DataNode類記錄了znode節點的所有信息,包括其父節點子節點數據內容ACL信息stat元數據等,主要成員變量如下:

  1. public class DataNode implements Record {
  2.     DataNode parent;
  3.     byte data[];
  4.     Long acl;
  5.     public StatPersisted stat;
  6.     private Set<String> children = null;
  7. }

需要注意acl和children兩個成員變量。

aclLong型值,相當於aclkey,具體的ACL信息實際上保存在DataTree中longKeyMapaclKeyMap中,前者保存了整個目錄樹所有節點的ACL信息,類型是Map<Long, List<ACL>>可以根據aclkey獲得某節點的ACL信息列表后者則是該map的反向結構

children 用於記錄該節點子節點列表信息,但保存的並不是DataNode類型,而是只保存了每個子節點路徑名的最后部分,比如該節點為"/biglog ",子節點為"/biglog /test1",那么children中保存"test1"這個相對路徑,這么做的目的是:This should be synchronized on except deserializing (for speed up issues)。

三、DataTree類

DataTree類維護整個目錄樹結構ConcurrentHashMap<String, DataNode> nodes保存了從完整路徑到DataNode的hashtable,而DataNode中的Set<String> children保存了父子關系即子節點的相對路徑。通過某DataNode可以獲知其任意子節點的相對路徑,然后拼裝成完整路徑,再去DataTree的nodes中查找。所有對節點路徑的訪問都是通過nodes完成的。主要成員變量如下:

(1)DataTree類:

  1.  /**
  2. * This hashtable provides a fast lookup to the datanodes. The tree is the
  3. * source of truth and is where all the locking occurs
  4. */
  5. private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();
  6. private final WatchManager dataWatches = new WatchManager();
  7. private final WatchManager childWatches = new WatchManager();
  8.  
  9. private static final String rootZookeeper = "/"; //ZooKeeper樹的根節點
  10.  
  11. private static final String procZookeeper = Quotas.procZookeeper;// ZooKeeper節點,作為管理和狀態節點
  12. private static final String procChildZookeeper = procZookeeper.substring(1);//存儲根節點的子節點的字符串
  13. //the zookeeper quota node that acts as the quota management node for zookeeper
  14. private static final String quotaZookeeper = Quotas.quotaZookeeper; //ZooKeeper quota節點,作為ZooKeeper的配額管理節點
  15. private static final String quotaChildZookeeper = quotaZookeeper.substring(procZookeeper.length() + 1); // 存儲ZooKeeper節點的子節點字符串
  16.  
  17. private final PathTrie pTrie = new PathTrie(); //path trie跟蹤在DataTree中的quota節點
  18.  
  19. //該hashtable列出了一個會話的臨時節點路徑
  20. private final Map<Long, HashSet<String>> ephemerals =new ConcurrentHashMap<Long, HashSet<String>>();
  21. //this is map from longs to acl's. It saves acl's being stored for each datanode.
  22. public final Map<Long, List<ACL>> longKeyMap =new HashMap<Long, List<ACL>>();
  23. //this a map from acls to long.
  24. public final Map<List<ACL>, Long> aclKeyMap =new HashMap<List<ACL>, Long>();
  25.  
  26. //在DataTree中acls的數量
  27. protected long aclIndex = 0;

(2)Quota類:

  1. public class Quotas {
  2.  
  3.     // the zookeeper nodes that acts as the management and status node
  4.     public static final String procZookeeper = "/zookeeper";
  5.  
  6.     // the zookeeper quota node that acts as the quota management node for zookeeper
  7.     public static final String quotaZookeeper = "/zookeeper/quota";
  8.  
  9.     //the limit node that has the limit of a subtree
  10.     public static final String limitNode = "zookeeper_limits";
  11.  
  12.     //the stat node that monitors the limit of a subtree.
  13.     public static final String statNode = "zookeeper_stats";
  14.  
  15.     /**
  16.      * return the quota path associated with this
  17.      * prefix
  18.      * @param path the actual path in zookeeper.
  19.      * @return the limit quota path
  20.      */
  21.     public static String quotaPath(String path) {
  22.         return quotaZookeeper + path +
  23.         "/" + limitNode;
  24.    }
  25.  
  26.     /**
  27.      * return the stat quota path associated with this
  28.      * prefix.
  29.      * @param path the actual path in zookeeper
  30.      * @return the stat quota path
  31.      */
  32.     public static String statPath(String path) {
  33.         return quotaZookeeper + path + "/" +
  34.         statNode;
  35.     }
  36. }

(2)StatsTrack類

  1. //a class that represents the stats associated with quotas
  2. public class StatsTrack {
  3.     private int count;
  4.     private long bytes;
  5.     private String countStr = "count";
  6.     private String byteStr = "bytes";
  7.  
  8.     public StatsTrack() {
  9.         this(null);
  10.     }
  11.     /**
  12.      * the stat string should be of the form count=int,bytes=long
  13.      * if stats is called with null the count and bytes are initialized
  14.      * to -1.
  15.      * @param stats the stat string to be intialized with
  16.      */
  17.     public StatsTrack(String stats) {
  18.         if (stats == null) {
  19.             stats = "count=-1,bytes=-1";
  20.         }
  21.         String[] split = stats.split(",");
  22.         if (split.length != 2) {
  23.             throw new IllegalArgumentException("invalid string " + stats);
  24.         }
  25.         count = Integer.parseInt(split[0].split("=")[1]);
  26.         bytes = Long.parseLong(split[1].split("=")[1]);
  27.     }
  28.  
  29.  
  30.     /**
  31.      * get the count of nodes allowed as part of quota
  32.      *
  33.      * @return the count as part of this string
  34.      */
  35.     public int getCount() {
  36.         return this.count;
  37.     }
  38.  
  39.     /**
  40.      * set the count for this stat tracker.
  41.      *
  42.      * @param count
  43.      * the count to set with
  44.      */
  45.     public void setCount(int count) {
  46.         this.count = count;
  47.     }
  48.  
  49.     /**
  50.      * get the count of bytes allowed as part of quota
  51.      *
  52.      * @return the bytes as part of this string
  53.      */
  54.     public long getBytes() {
  55.         return this.bytes;
  56.     }
  57.  
  58.     /**
  59.      * set teh bytes for this stat tracker.
  60.      *
  61.      * @param bytes
  62.      * the bytes to set with
  63.      */
  64.     public void setBytes(long bytes) {
  65.         this.bytes = bytes;
  66.     }
  67.  
  68.     @Override
  69.     /*
  70.      * returns the string that maps to this stat tracking.
  71.      */
  72.     public String toString() {
  73.         return countStr + "=" + count + "," + byteStr + "=" + bytes;
  74.     }
  75. }

四、DataTree初始化

DataTree初始化要完成的工作,需要建立系統節點,包括//zookeeper/zookeeper/quota三個znode。

下面先看一下DataNode的構造函數如下:

  1. public DataNode(DataNode parent, byte data[], Long acl, StatPersisted stat) {
  2.         this.parent = parent;
  3.         this.data = data;
  4.         this.acl = acl;
  5.         this.stat = stat;
  6. }

Datatree初始化:

  1. /**
  2.  * This is a pointer to the root of the DataTree. It is the source of truth,
  3.  * but we usually use the nodes hashmap to find nodes in the tree.
  4.  */
  5. private DataNode root = new DataNode(null, new byte[0], -1L,new StatPersisted());
  6.  
  7. // create a /zookeeper filesystem that is the proc filesystem of zookeeper
  8. private DataNode procDataNode = new DataNode(root, new byte[0], -1L,new StatPersisted());
  9.  
  10. // create a /zookeeper/quota node for maintaining quota properties for zookeeper
  11. private DataNode quotaDataNode = new DataNode(procDataNode, new byte[0],-1L, new StatPersisted());
  12.  
  13. public DataTree() {
  14.     // Rather than fight it, let root have an alias
  15.     nodes.put("", root);
  16.     nodes.put(rootZookeeper, root);
  17.  
  18.     // add the proc node and quota node
  19.     root.addChild(procChildZookeeper);
  20.     nodes.put(procZookeeper, procDataNode);
  21.  
  22.     procDataNode.addChild(quotaChildZookeeper);
  23.     nodes.put(quotaZookeeper, quotaDataNode);
  24. }

結構圖為:


|---rootZookeeper = "/"

|---procZookeeper = "/zookeeper"

    |---procChildZookeeper ="zookeeper"

|---quotaZookeeper = "/zookeeper/quota"

    |---quotaChildZookeeper = "quota"


limitNode = "zookeeper_limits"

statNode = "zookeeper_stats"


|---DataNode root("/")

    |---root.children set<String>

        |---<Zookeeper>

|---DataNode procDataNode("/Zookeeper")

    |---procDataNode.children set<String>

        |---<quota>

|---DataNode procDataNode("/Zookeeper/quota")

    |---procDataNode.children set<String>

        |---<null>


|---nodes<String, DataNode>

    |---<"",root>

    |---<rootZookeeper,root>

    |---<procZookeeper, procDataNode>

    |---<quotaZookeeper, quotaDataNode>

五、節點操作

5.1 createNode過程

  1. /**
  2.  * @param path
  3.  * @param data
  4.  * @param acl
  5.  * @param ephemeralOwner
  6.  * the session id that owns this node. -1 indicates this is not
  7.  * an ephemeral node.
  8.  * @param zxid
  9.  * @param time
  10.  * @return the patch of the created node
  11.  * @throws KeeperException
  12.  */
  13. public String createNode(String path, byte data[], List<ACL> acl,long ephemeralOwner, int parentCVersion, long zxid, long time)

詳細代碼:

  public String createNode(String path, byte data[], List<ACL> acl,
            long ephemeralOwner, int parentCVersion, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = new StatPersisted();
        stat.setCtime(time);
        stat.setMtime(time);
        stat.setCzxid(zxid);
        stat.setMzxid(zxid);
        stat.setPzxid(zxid);
        stat.setVersion(0);
        stat.setAversion(0);
        stat.setEphemeralOwner(ephemeralOwner);
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            Set<String> children = parent.getChildren();
            if (children != null) {
                if (children.contains(childName)) {
                    throw new KeeperException.NodeExistsException();
                }
            }
            
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }    
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            Long longval = convertAcls(acl);
            DataNode child = new DataNode(parent, data, longval, stat);
            parent.addChild(childName);
            nodes.put(path, child);
            if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
        }
        // now check if its one of the zookeeper node child
        if (parentName.startsWith(quotaZookeeper)) {
            // now check if its the limit node
            if (Quotas.limitNode.equals(childName)) {
                // this is the limit node
                // get the parent and add it to the trie
                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
            }
            if (Quotas.statNode.equals(childName)) {
                updateQuotaForPath(parentName
                        .substring(quotaZookeeper.length()));
            }
        }
        // also check to update the quotas for this node
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
            // ok we have some match and need to update
            updateCount(lastPrefix, 1);
            updateBytes(lastPrefix, data == null ? 0 : data.length);
        }
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    }
View Code

具體創建過程如下:

  ① 創建StatPersisted stat元數據,並set各種成員變量;

  ② 創建DataNode child節點;

  ③ 解析父節點路徑parentName,並通過DataNode parent = nodes.get(parentName)獲取父節點,然后更新parent的pzxid、cversion、ephemeralOwner;

  ④ 將child放入parent的children列表中,以及放入DataTree的nodes中:parent.addChild(childName); nodes.put(path, child);

  ⑤ 如果是臨時節點,需要保存到DataTree的ephemerals中,key是所屬owner的sessionid;

  ⑥ 判斷該節點是否/zookeeper/quota/zookeeper_limits或/zookeeper/quota/zookeeper_stat,如果是則????;

  ⑦ 更新該節點的quota信息,即***/ zookeeper_stat節點內容;

  ⑧ 調用dataWatches.triggerWatch()觸發該路徑的Event.EventType.NodeCreated相關事件;

  ⑨ 調用childWatches.triggerWatch()觸發父節點路徑的Event.EventType.NodeChildrenChanged相關事件。

5.2 deleteNode過程

  1. /**
  2.  * remove the path from the datatree
  3.  *
  4.  * @param path
  5.  * the path to of the node to be deleted
  6.  * @param zxid
  7.  * the current zxid
  8.  * @throws KeeperException.NoNodeException
  9.  */
  10. public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException {

 詳細代碼:

  public void deleteNode(String path, long zxid)
            throws KeeperException.NoNodeException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        DataNode node = nodes.get(path);
        if (node == null) {
            throw new KeeperException.NoNodeException();
        }
        nodes.remove(path);
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            parent.removeChild(childName);
            parent.stat.setPzxid(zxid);
            long eowner = node.stat.getEphemeralOwner();
            if (eowner != 0) {
                HashSet<String> nodes = ephemerals.get(eowner);
                if (nodes != null) {
                    synchronized (nodes) {
                        nodes.remove(path);
                    }
                }
            }
            node.parent = null;
        }
        if (parentName.startsWith(procZookeeper)) {
            // delete the node in the trie.
            if (Quotas.limitNode.equals(childName)) {
                // we need to update the trie
                // as well
                pTrie.deletePath(parentName.substring(quotaZookeeper.length()));
            }
        }

        // also check to update the quotas for this node
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
            // ok we have some match and need to update
            updateCount(lastPrefix, -1);
            int bytes = 0;
            synchronized (node) {
                bytes = (node.data == null ? 0 : -(node.data.length));
            }
            updateBytes(lastPrefix, bytes);
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                    "dataWatches.triggerWatch " + path);
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                    "childWatches.triggerWatch " + parentName);
        }
        Set<Watcher> processed = dataWatches.triggerWatch(path,
                EventType.NodeDeleted);
        childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                EventType.NodeChildrenChanged);
    }
View Code

具體的deleteNode過程如下:

  ① 根據DataNode node = nodes.get(path)獲取該節點的DataNode;

  ② 根據DataNode parent = nodes.get(parentName)獲取該節點的父節點;

  ③ 更新parent的children列表、cversion、pzxid、ephemeralOwner,如果是臨時節點,還要更新DataTree的ephemerals;

  ④ 判斷該節點是否/zookeeper/quota/zookeeper_limits或/zookeeper/quota/zookeeper_stat,如果是則????;

  ⑤ 更新該節點的quota信息,即***/ zookeeper_stat節點內容;

  ⑥ 調用dataWatches.triggerWatch()觸發該路徑的Event.EventType.NodeDeleted相關事件;

  ⑦ 調用childWatches.triggerWatch()觸發父節點路徑的Event.EventType.NodeChildrenChanged相關事件

5.3 setData過程

  1. public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {

詳細代碼:

  public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
        synchronized (n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
View Code

具體的setData過程:

  ① 根據DataNode n = nodes.get(path)獲取該節點DataNode;

  ② 更新n的data、mtime、mzxid、version信息;

  ③ 調用DataTree的updateBytes更新Quota信息;

  ④ 調用dataWatches.triggerWatch()觸發該節點路徑的Event.EventType. NodeDataChanged相關事件。

5.4 getData過程

  1. public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException

詳細代碼:

    public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);
            }
            return n.data;
        }
    }
View Code

具體的getData過程如下:

  ① 根據DataNode n = nodes.get(path)獲取該節點DataNode;

  ② 如果watcher參數不為NULL,調用dataWatches.addWatch()添加watcher;

  ③ 返回n的data信息。

5.5 statNode過程

  1. public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException

5.6 getChildren 過程

  1. public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException

5.7 getCounts過程

  1. /**
  2.  * this method gets the count of nodes and the bytes under a subtree
  3.  *
  4.  * @param path
  5.  * the path to be used
  6.  * @param counts
  7.  * the int count
  8.  */
  9. private void getCounts(String path, Counts counts)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM