一、Server角色
每個Server在工作過程中有三種狀態:
① LOOKING:當前Server不知道leader是誰,正在搜尋。
② LEADING:當前Server即為選舉出來的leader。
③ FOLLOWING:leader已經選舉出來,當前Server與之同步。
QuorumPeer定義了server的類型,其中ServerState表示server類型,LeanerType表示當ServerState為FOLLOWING時是參與者還是觀察者,前者稱為follower,后者稱為observer。
代碼如下:
-
public class QuorumPeer extends Thread implements QuorumStats.Provider{
-
public enum ServerState {
-
LOOKING, FOLLOWING, LEADING, OBSERVING;
-
}
-
public enum LearnerType {
-
PARTICIPANT, OBSERVER;
-
}
-
}
二、Znode類型
CreateMode中定義了四種節點類型,分別對應:
PERSISTENT:永久節點
EPHEMERAL:臨時節點
ERSISTENT_SEQUENTIAL:永久節點、序列化
EPHEMERAL_SEQUENTIAL:臨時節點、序列化
代碼如下:
-
public enum CreateMode {
-
PERSISTENT (0, false, false),
-
PERSISTENT_SEQUENTIAL (2, false, true),
-
EPHEMERAL (1, true, false),
-
EPHEMERAL_SEQUENTIAL (3, true, true);
-
}
三、Stat類
Stat類定義znode節點的元信息,主要成員變量如下:
-
public class Stat implements Record {
-
private long czxid; // 創建時的zxid
-
private long mzxid; // 最新修改的zxid
-
private long ctime; // 創建時間 private
-
long mtime; // 修改時間
-
private int version; // 版本號,對應znode data
-
private int cversion; // 版本號,對應子znode
-
private int aversion; // 版本號,對應acl
-
private long ephemeralOwner; // 臨時節點對應的client session id,默認為0
-
private int dataLength; // znode data長度
-
private int numChildren; // 子znode個數
-
private long pzxid; // 最新修改的zxid,貌似與mzxid重合了
-
}
注意: Stat、StatPersisited、StatPersisitedV1三個類,其成員變量和邏輯基本一致,但StatPersisited類少了dataLength和numChildren屬性,StatPersisitedV1類少了dataLength、 numChildren和pzxid屬性,具體不同類用在什么地方待進一步分析。 |
三、DataNOde類
DataNode類記錄了znode節點的所有信息,包括其父節點、子節點、數據內容、ACL信息、stat元數據等,主要成員變量如下:
-
public class DataNode implements Record {
-
DataNode parent;
-
byte data[];
-
Long acl;
-
public StatPersisted stat;
-
private Set<String> children = null;
-
}
需要注意acl和children兩個成員變量。 acl是Long型值,相當於aclkey,具體的ACL信息實際上保存在DataTree中的longKeyMap和aclKeyMap中,前者保存了整個目錄樹所有節點的ACL信息,類型是Map<Long, List<ACL>>可以根據aclkey獲得某節點的ACL信息列表,后者則是該map的反向結構。 children 用於記錄該節點的子節點列表信息,但保存的並不是DataNode類型,而是只保存了每個子節點路徑名的最后部分,比如該節點為"/biglog ",子節點為"/biglog /test1",那么children中保存"test1"這個相對路徑,這么做的目的是:This should be synchronized on except deserializing (for speed up issues)。 |
三、DataTree類
DataTree類維護整個目錄樹結構,ConcurrentHashMap<String, DataNode> nodes保存了從完整路徑到DataNode的hashtable,而DataNode中的Set<String> children保存了父子關系,即子節點的相對路徑。通過某DataNode可以獲知其任意子節點的相對路徑,然后拼裝成完整路徑,再去DataTree的nodes中查找。所有對節點路徑的訪問都是通過nodes完成的。主要成員變量如下:
(1)DataTree類:
-
/**
-
* This hashtable provides a fast lookup to the datanodes. The tree is the
-
* source of truth and is where all the locking occurs
-
*/
-
private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();
-
private final WatchManager dataWatches = new WatchManager();
-
private final WatchManager childWatches = new WatchManager();
-
-
private static final String rootZookeeper = "/"; //ZooKeeper樹的根節點
-
-
private static final String procZookeeper = Quotas.procZookeeper;// ZooKeeper節點,作為管理和狀態節點
-
private static final String procChildZookeeper = procZookeeper.substring(1);//存儲根節點的子節點的字符串
-
//the zookeeper quota node that acts as the quota management node for zookeeper
-
private static final String quotaZookeeper = Quotas.quotaZookeeper; //ZooKeeper quota節點,作為ZooKeeper的配額管理節點
-
private static final String quotaChildZookeeper = quotaZookeeper.substring(procZookeeper.length() + 1); // 存儲ZooKeeper節點的子節點字符串
-
-
private final PathTrie pTrie = new PathTrie(); //path trie跟蹤在DataTree中的quota節點
-
-
//該hashtable列出了一個會話的臨時節點路徑
-
private final Map<Long, HashSet<String>> ephemerals =new ConcurrentHashMap<Long, HashSet<String>>();
-
//this is map from longs to acl's. It saves acl's being stored for each datanode.
-
public final Map<Long, List<ACL>> longKeyMap =new HashMap<Long, List<ACL>>();
-
//this a map from acls to long.
-
public final Map<List<ACL>, Long> aclKeyMap =new HashMap<List<ACL>, Long>();
-
-
//在DataTree中acls的數量
-
protected long aclIndex = 0;
(2)Quota類:
-
public class Quotas {
-
-
// the zookeeper nodes that acts as the management and status node
-
public static final String procZookeeper = "/zookeeper";
-
-
// the zookeeper quota node that acts as the quota management node for zookeeper
-
public static final String quotaZookeeper = "/zookeeper/quota";
-
-
//the limit node that has the limit of a subtree
-
public static final String limitNode = "zookeeper_limits";
-
-
//the stat node that monitors the limit of a subtree.
-
public static final String statNode = "zookeeper_stats";
-
-
/**
-
* return the quota path associated with this
-
* prefix
-
* @param path the actual path in zookeeper.
-
* @return the limit quota path
-
*/
-
public static String quotaPath(String path) {
-
return quotaZookeeper + path +
-
"/" + limitNode;
-
}
-
-
/**
-
* return the stat quota path associated with this
-
* prefix.
-
* @param path the actual path in zookeeper
-
* @return the stat quota path
-
*/
-
public static String statPath(String path) {
-
return quotaZookeeper + path + "/" +
-
statNode;
-
}
-
}
(2)StatsTrack類
-
//a class that represents the stats associated with quotas
-
public class StatsTrack {
-
private int count;
-
private long bytes;
-
private String countStr = "count";
-
private String byteStr = "bytes";
-
-
public StatsTrack() {
-
this(null);
-
}
-
/**
-
* the stat string should be of the form count=int,bytes=long
-
* if stats is called with null the count and bytes are initialized
-
* to -1.
-
* @param stats the stat string to be intialized with
-
*/
-
public StatsTrack(String stats) {
-
if (stats == null) {
-
stats = "count=-1,bytes=-1";
-
}
-
String[] split = stats.split(",");
-
if (split.length != 2) {
-
throw new IllegalArgumentException("invalid string " + stats);
-
}
-
count = Integer.parseInt(split[0].split("=")[1]);
-
bytes = Long.parseLong(split[1].split("=")[1]);
-
}
-
-
-
/**
-
* get the count of nodes allowed as part of quota
-
*
-
* @return the count as part of this string
-
*/
-
public int getCount() {
-
return this.count;
-
}
-
-
/**
-
* set the count for this stat tracker.
-
*
-
* @param count
-
* the count to set with
-
*/
-
public void setCount(int count) {
-
this.count = count;
-
}
-
-
/**
-
* get the count of bytes allowed as part of quota
-
*
-
* @return the bytes as part of this string
-
*/
-
public long getBytes() {
-
return this.bytes;
-
}
-
-
/**
-
* set teh bytes for this stat tracker.
-
*
-
* @param bytes
-
* the bytes to set with
-
*/
-
public void setBytes(long bytes) {
-
this.bytes = bytes;
-
}
-
-
@Override
-
/*
-
* returns the string that maps to this stat tracking.
-
*/
-
public String toString() {
-
return countStr + "=" + count + "," + byteStr + "=" + bytes;
-
}
-
}
四、DataTree初始化
DataTree初始化要完成的工作,需要建立系統節點,包括/、/zookeeper、/zookeeper/quota三個znode。
下面先看一下DataNode的構造函數如下:
-
public DataNode(DataNode parent, byte data[], Long acl, StatPersisted stat) {
-
this.parent = parent;
-
this.data = data;
-
this.acl = acl;
-
this.stat = stat;
-
}
Datatree初始化:
-
/**
-
* This is a pointer to the root of the DataTree. It is the source of truth,
-
* but we usually use the nodes hashmap to find nodes in the tree.
-
*/
-
private DataNode root = new DataNode(null, new byte[0], -1L,new StatPersisted());
-
-
// create a /zookeeper filesystem that is the proc filesystem of zookeeper
-
private DataNode procDataNode = new DataNode(root, new byte[0], -1L,new StatPersisted());
-
-
// create a /zookeeper/quota node for maintaining quota properties for zookeeper
-
private DataNode quotaDataNode = new DataNode(procDataNode, new byte[0],-1L, new StatPersisted());
-
-
public DataTree() {
-
// Rather than fight it, let root have an alias
-
nodes.put("", root);
-
nodes.put(rootZookeeper, root);
-
-
// add the proc node and quota node
-
root.addChild(procChildZookeeper);
-
nodes.put(procZookeeper, procDataNode);
-
-
procDataNode.addChild(quotaChildZookeeper);
-
nodes.put(quotaZookeeper, quotaDataNode);
-
}
結構圖為:
|---rootZookeeper = "/"
|---procZookeeper = "/zookeeper"
|---procChildZookeeper ="zookeeper"
|---quotaZookeeper = "/zookeeper/quota"
|---quotaChildZookeeper = "quota"
limitNode = "zookeeper_limits"
statNode = "zookeeper_stats"
|---DataNode root("/")
|---root.children set<String>
|---<Zookeeper>
|---DataNode procDataNode("/Zookeeper")
|---procDataNode.children set<String>
|---<quota>
|---DataNode procDataNode("/Zookeeper/quota")
|---procDataNode.children set<String>
|---<null>
|---nodes<String, DataNode>
|---<"",root>
|---<rootZookeeper,root>
|---<procZookeeper, procDataNode>
|---<quotaZookeeper, quotaDataNode>
五、節點操作
5.1 createNode過程
-
/**
-
* @param path
-
* @param data
-
* @param acl
-
* @param ephemeralOwner
-
* the session id that owns this node. -1 indicates this is not
-
* an ephemeral node.
-
* @param zxid
-
* @param time
-
* @return the patch of the created node
-
* @throws KeeperException
-
*/
-
public String createNode(String path, byte data[], List<ACL> acl,long ephemeralOwner, int parentCVersion, long zxid, long time)
詳細代碼:

public String createNode(String path, byte data[], List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time) throws KeeperException.NoNodeException, KeeperException.NodeExistsException { int lastSlash = path.lastIndexOf('/'); String parentName = path.substring(0, lastSlash); String childName = path.substring(lastSlash + 1); StatPersisted stat = new StatPersisted(); stat.setCtime(time); stat.setMtime(time); stat.setCzxid(zxid); stat.setMzxid(zxid); stat.setPzxid(zxid); stat.setVersion(0); stat.setAversion(0); stat.setEphemeralOwner(ephemeralOwner); DataNode parent = nodes.get(parentName); if (parent == null) { throw new KeeperException.NoNodeException(); } synchronized (parent) { Set<String> children = parent.getChildren(); if (children != null) { if (children.contains(childName)) { throw new KeeperException.NodeExistsException(); } } if (parentCVersion == -1) { parentCVersion = parent.stat.getCversion(); parentCVersion++; } parent.stat.setCversion(parentCVersion); parent.stat.setPzxid(zxid); Long longval = convertAcls(acl); DataNode child = new DataNode(parent, data, longval, stat); parent.addChild(childName); nodes.put(path, child); if (ephemeralOwner != 0) { HashSet<String> list = ephemerals.get(ephemeralOwner); if (list == null) { list = new HashSet<String>(); ephemerals.put(ephemeralOwner, list); } synchronized (list) { list.add(path); } } } // now check if its one of the zookeeper node child if (parentName.startsWith(quotaZookeeper)) { // now check if its the limit node if (Quotas.limitNode.equals(childName)) { // this is the limit node // get the parent and add it to the trie pTrie.addPath(parentName.substring(quotaZookeeper.length())); } if (Quotas.statNode.equals(childName)) { updateQuotaForPath(parentName .substring(quotaZookeeper.length())); } } // also check to update the quotas for this node String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { // ok we have some match and need to update updateCount(lastPrefix, 1); updateBytes(lastPrefix, data == null ? 0 : data.length); } dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); return path; }
具體創建過程如下:
① 創建StatPersisted stat元數據,並set各種成員變量;
② 創建DataNode child節點;
③ 解析父節點路徑parentName,並通過DataNode parent = nodes.get(parentName)獲取父節點,然后更新parent的pzxid、cversion、ephemeralOwner;
④ 將child放入parent的children列表中,以及放入DataTree的nodes中:parent.addChild(childName); nodes.put(path, child);
⑤ 如果是臨時節點,需要保存到DataTree的ephemerals中,key是所屬owner的sessionid;
⑥ 判斷該節點是否/zookeeper/quota/zookeeper_limits或/zookeeper/quota/zookeeper_stat,如果是則????;
⑦ 更新該節點的quota信息,即***/ zookeeper_stat節點內容;
⑧ 調用dataWatches.triggerWatch()觸發該路徑的Event.EventType.NodeCreated相關事件;
⑨ 調用childWatches.triggerWatch()觸發父節點路徑的Event.EventType.NodeChildrenChanged相關事件。
5.2 deleteNode過程
-
/**
-
* remove the path from the datatree
-
*
-
* @param path
-
* the path to of the node to be deleted
-
* @param zxid
-
* the current zxid
-
* @throws KeeperException.NoNodeException
-
*/
-
public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException {
詳細代碼:

public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException { int lastSlash = path.lastIndexOf('/'); String parentName = path.substring(0, lastSlash); String childName = path.substring(lastSlash + 1); DataNode node = nodes.get(path); if (node == null) { throw new KeeperException.NoNodeException(); } nodes.remove(path); DataNode parent = nodes.get(parentName); if (parent == null) { throw new KeeperException.NoNodeException(); } synchronized (parent) { parent.removeChild(childName); parent.stat.setPzxid(zxid); long eowner = node.stat.getEphemeralOwner(); if (eowner != 0) { HashSet<String> nodes = ephemerals.get(eowner); if (nodes != null) { synchronized (nodes) { nodes.remove(path); } } } node.parent = null; } if (parentName.startsWith(procZookeeper)) { // delete the node in the trie. if (Quotas.limitNode.equals(childName)) { // we need to update the trie // as well pTrie.deletePath(parentName.substring(quotaZookeeper.length())); } } // also check to update the quotas for this node String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { // ok we have some match and need to update updateCount(lastPrefix, -1); int bytes = 0; synchronized (node) { bytes = (node.data == null ? 0 : -(node.data.length)); } updateBytes(lastPrefix, bytes); } if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "dataWatches.triggerWatch " + path); ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "childWatches.triggerWatch " + parentName); } Set<Watcher> processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); childWatches.triggerWatch(path, EventType.NodeDeleted, processed); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, EventType.NodeChildrenChanged); }
具體的deleteNode過程如下:
① 根據DataNode node = nodes.get(path)獲取該節點的DataNode;
② 根據DataNode parent = nodes.get(parentName)獲取該節點的父節點;
③ 更新parent的children列表、cversion、pzxid、ephemeralOwner,如果是臨時節點,還要更新DataTree的ephemerals;
④ 判斷該節點是否/zookeeper/quota/zookeeper_limits或/zookeeper/quota/zookeeper_stat,如果是則????;
⑤ 更新該節點的quota信息,即***/ zookeeper_stat節點內容;
⑥ 調用dataWatches.triggerWatch()觸發該路徑的Event.EventType.NodeDeleted相關事件;
⑦ 調用childWatches.triggerWatch()觸發父節點路徑的Event.EventType.NodeChildrenChanged相關事件
5.3 setData過程
-
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {
詳細代碼:

public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
具體的setData過程:
① 根據DataNode n = nodes.get(path)獲取該節點DataNode;
② 更新n的data、mtime、mzxid、version信息;
③ 調用DataTree的updateBytes更新Quota信息;
④ 調用dataWatches.triggerWatch()觸發該節點路徑的Event.EventType. NodeDataChanged相關事件。
5.4 getData過程
-
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException
詳細代碼:

public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); if (watcher != null) { dataWatches.addWatch(path, watcher); } return n.data; } }
具體的getData過程如下:
① 根據DataNode n = nodes.get(path)獲取該節點DataNode;
② 如果watcher參數不為NULL,調用dataWatches.addWatch()添加watcher;
③ 返回n的data信息。
5.5 statNode過程
-
public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException
5.6 getChildren 過程
-
public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException
5.7 getCounts過程
-
/**
-
* this method gets the count of nodes and the bytes under a subtree
-
*
-
* @param path
-
* the path to be used
-
* @param counts
-
* the int count
-
*/
-
private void getCounts(String path, Counts counts)