一、前言
前面已經分析了Watcher機制中的大多數類,本篇對於ZKWatchManager的外部類Zookeeper進行分析。
二、ZooKeeper源碼分析
2.1 類的內部類
ZooKeeper的內部類框架圖如下圖所示
說明:
· ZKWatchManager,Zookeeper的Watcher管理者,其源碼在之前已經分析過,不再累贅。
· WatchRegistration,抽象類,用作watch注冊。
· ExistsWatchRegistration,存在性watch注冊。
· DataWatchRegistration,數據watch注冊。
· ChildWatchRegistration,子節點注冊。
· States,枚舉類型,表示服務器的狀態。
1. WatchRegistration
接口類型,表示對路徑注冊監聽。
abstract class WatchRegistration { // Watcher private Watcher watcher; // 客戶端路徑 private String clientPath; // 構造函數 public WatchRegistration(Watcher watcher, String clientPath) { this.watcher = watcher; this.clientPath = clientPath; } // 獲取路徑到Watchers集合的鍵值對,由子類實現 abstract protected Map<String, Set<Watcher>> getWatches(int rc); /** * Register the watcher with the set of watches on path. * @param rc the result code of the operation that attempted to * add the watch on the path. */ // 注冊 public void register(int rc) { if (shouldAddWatch(rc)) { // 應該添加監聽 // 獲取路徑到Watchers集合的鍵值對,工廠模式 Map<String, Set<Watcher>> watches = getWatches(rc); synchronized(watches) { // 同步塊 // 通過路徑獲取watcher集合 Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { // watcher集合為空 // 新生成集合 watchers = new HashSet<Watcher>(); // 將路徑和watchers集合存入 watches.put(clientPath, watchers); } // 添加至watchers集合 watchers.add(watcher); } } } /** * Determine whether the watch should be added based on return code. * @param rc the result code of the operation that attempted to add the * watch on the node * @return true if the watch should be added, otw false */ // 判斷是否需要添加,判斷rc是否為0 protected boolean shouldAddWatch(int rc) { return rc == 0; } }
說明:可以看到WatchRegistration包含了Watcher和clientPath字段,表示監聽和對應的路徑,值得注意的是getWatches方式抽象方法,需要子類實現,而在register方法中會調用getWatches方法,實際上調用的是子類的getWatches方法,這是典型的工廠模式。register方法首先會判定是否需要添加監聽,然后再進行相應的操作,在WatchRegistration類的默認實現中shouldAddWatch是判定返回碼是否為0。
2. ExistsWatchRegistration
class ExistsWatchRegistration extends WatchRegistration { // 構造函數 public ExistsWatchRegistration(Watcher watcher, String clientPath) { // 調用父類構造函數 super(watcher, clientPath); } @Override protected Map<String, Set<Watcher>> getWatches(int rc) { // 根據rc是否為0確定返回dataWatches或existsWatches return rc == 0 ? watchManager.dataWatches : watchManager.existWatches; } @Override protected boolean shouldAddWatch(int rc) { // 判斷rc是否為0或者rc是否等於NONODE的值 return rc == 0 || rc == KeeperException.Code.NONODE.intValue(); } }
說明:ExistsWatchRegistration 表示對存在性監聽的注冊,其實現了getWatches方法,並且重寫了shouldAddWatch方法,getWatches方法是根據返回碼的值確定返回dataWatches或者是existWatches。
3. DataWatchRegistration
class DataWatchRegistration extends WatchRegistration { // 構造函數 public DataWatchRegistration(Watcher watcher, String clientPath) { // 調用父類構造函數 super(watcher, clientPath); } @Override protected Map<String, Set<Watcher>> getWatches(int rc) { // 直接返回dataWatches return watchManager.dataWatches; } }
說明:DataWatchRegistration表示對數據監聽的注冊,其實現了getWatches方法,返回dataWatches。
4. ChildWatchRegistration
class ChildWatchRegistration extends WatchRegistration { // 構造函數 public ChildWatchRegistration(Watcher watcher, String clientPath) { // 調用父類構造函數 super(watcher, clientPath); } @Override protected Map<String, Set<Watcher>> getWatches(int rc) { // 直接返回childWatches return watchManager.childWatches; } }
說明:ChildWatchRegistration表示對子節點監聽的注冊,其實現了getWatches方法,返回childWatches。
5. States
public enum States { // 代表服務器的狀態 CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; // 是否存活 public boolean isAlive() { // 不為關閉狀態並且未認證失敗 return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ // 是否連接 public boolean isConnected() { // 已連接或者只讀連接 return this == CONNECTED || this == CONNECTEDREADONLY; } }
說明:States為枚舉類,表示服務器的狀態,其有兩個方法,判斷服務器是否存活和判斷客戶端是否連接至服務端。
2.2 類的屬性
public class ZooKeeper { // 客戶端Socket public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket"; // 客戶端,用來管理客戶端與服務端的連接 protected final ClientCnxn cnxn; // Logger日志 private static final Logger LOG; static { //Keep these two lines together to keep the initialization order explicit // 初始化 LOG = LoggerFactory.getLogger(ZooKeeper.class); Environment.logEnv("Client environment:", LOG); } private final ZKWatchManager watchManager = new ZKWatchManager(); }
說明:ZooKeeper類存維護一個ClientCnxn類,用來管理客戶端與服務端的連接。
2.3 類的構造函數
1. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)型構造函數
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); // 初始化默認Watcher watchManager.defaultWatcher = watcher; // 對傳入的connectString進行解析 // connectString 類似於127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002未指定根空間的字符串 // 或者是127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a指定根空間的字符串,根為/app/a ConnectStringParser connectStringParser = new ConnectStringParser( connectString); // 根據服務器地址列表生成HostProvider HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); // 生成客戶端管理 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); // 啟動 cnxn.start(); }
說明:該構造函數會初始化WatchManager的defaultWatcher,同時會解析服務端地址和端口號,之后根據服務端的地址生成HostProvider(其會打亂服務器的地址),之后生成客戶端管理並啟動,注意此時會調用getClientCnxnSocket函數,其源碼如下
private static ClientCnxnSocket getClientCnxnSocket() throws IOException { // 查看是否在系統屬性中進行了設置 String clientCnxnSocketName = System .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); if (clientCnxnSocketName == null) { // 若未進行設置,取得ClientCnxnSocketNIO的類名 clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } try { // 使用反射新生成實例然后返回 return (ClientCnxnSocket) Class.forName(clientCnxnSocketName) .newInstance(); } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName); ioe.initCause(e); throw ioe; } }
說明:該函數會利用反射創建ClientCnxnSocketNIO實例
2. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException型構造函數
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher + " sessionId=" + Long.toHexString(sessionId) + " sessionPasswd=" + (sessionPasswd == null ? "<null>" : "<hidden>")); // 初始化默認Watcher watchManager.defaultWatcher = watcher; // 對傳入的connectString進行解析 // connectString 類似於127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002未指定根空間的字符串 // 或者是127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a指定根空間的字符串,根為/app/a ConnectStringParser connectStringParser = new ConnectStringParser( connectString); // 根據服務器地址列表生成HostProvider HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); // 生成客戶端時使用了session密碼 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly); // 設置客戶端的seenRwServerBefore字段為true(因為用戶提供了sessionId,表示肯定已經連接過) cnxn.seenRwServerBefore = true; // since user has provided sessionId // 啟動 cnxn.start(); }
說明:此型構造函數和之前構造函數的區別在於本構造函數提供了sessionId和sessionPwd,這表明用戶已經之前已經連接過服務端,所以能夠獲取到sessionId,其流程與之前的構造函數類似,不再累贅。
2.4 核心函數分析
1. create函數
函數簽名:public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; // 驗證路徑是否合法 PathUtils.validatePath(clientPath, createMode.isSequential()); // 添加根空間 final String serverPath = prependChroot(clientPath); // 新生請求頭 RequestHeader h = new RequestHeader(); // 設置請求頭類型 h.setType(ZooDefs.OpCode.create); // 新生創建節點請求 CreateRequest request = new CreateRequest(); // 新生創建節點響應 CreateResponse response = new CreateResponse(); // 設置請求的數據 request.setData(data); // 設置請求對應的Flag request.setFlags(createMode.toFlag()); // 設置服務器路徑 request.setPath(serverPath); if (acl != null && acl.size() == 0) { // ACL不為空但是大小為0,拋出異常 throw new KeeperException.InvalidACLException(); } // 設置請求的ACL列表 request.setAcl(acl); // 提交請求 ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { // 請求的響應的錯誤碼不為0,則拋出異常 throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { // 根空間為空 // 則返回響應中的路徑 return response.getPath(); } else { // 除去根空間后返回 return response.getPath().substring(cnxn.chrootPath.length()); } }
說明:該create函數是同步的,主要用作創建節點,其大致步驟如下
① 驗證路徑是否合法,若不合法,拋出異常,否則進入②
② 添加根空間,生成請求頭、請求、響應等,並設置相應字段,進入③
③ 通過客戶端提交請求,判斷返回碼是否為0,若不是,則拋出異常,否則,進入④
④ 除去根空間后,返回響應的路徑
其中會調用submitRequest方法,其源碼如下
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { // 新生響應頭 ReplyHeader r = new ReplyHeader(); // 新生Packet包 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { // 同步 while (!packet.finished) { // 如果沒有結束 // 則等待 packet.wait(); } } // 返回響應頭 return r; }
說明:submitRequest會將請求封裝成Packet包,然后一直等待packet包響應結束,然后返回;若沒結束,則等待。可以看到其是一個同步方法。
2. create函數
函數簽名:public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; // 驗證路徑是否合法 PathUtils.validatePath(clientPath, createMode.isSequential()); // 添加根空間 final String serverPath = prependChroot(clientPath); // 新生請求頭 RequestHeader h = new RequestHeader(); // 設置請求頭類型 h.setType(ZooDefs.OpCode.create); // 新生創建節點請求 CreateRequest request = new CreateRequest(); // 新生創建節點響應 CreateResponse response = new CreateResponse(); // 新生響應頭 ReplyHeader r = new ReplyHeader(); // 設置請求的數據 request.setData(data); // 設置請求對應的Flag request.setFlags(createMode.toFlag()); // 設置服務 request.setPath(serverPath); // 設置ACL列表 request.setAcl(acl); // 封裝成packet放入隊列,等待提交 cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
說明:該create函數是異步的,其大致步驟與同步版的create函數相同,只是最后其會將請求打包成packet,然后放入隊列等待提交。
3. delete函數
函數簽名:public void delete(final String path, int version) throws InterruptedException, KeeperException
public void delete(final String path, int version) throws InterruptedException, KeeperException { final String clientPath = path; // 驗證路徑的合法性 PathUtils.validatePath(clientPath); final String serverPath; // maintain semantics even in chroot case // specifically - root cannot be deleted // I think this makes sense even in chroot case. if (clientPath.equals("/")) { // 判斷是否是"/",即zookeeper的根目錄,根目錄無法刪除 // a bit of a hack, but delete(/) will never succeed and ensures // that the same semantics are maintained // serverPath = clientPath; } else { // 添加根空間 serverPath = prependChroot(clientPath); } // 新生請求頭 RequestHeader h = new RequestHeader(); // 設置請求頭類型 h.setType(ZooDefs.OpCode.delete); // 新生刪除請求 DeleteRequest request = new DeleteRequest(); // 設置路徑 request.setPath(serverPath); // 設置版本號 request.setVersion(version); // 新生響應頭 ReplyHeader r = cnxn.submitRequest(h, request, null, null); if (r.getErr() != 0) { // 判斷返回碼 throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } }
說明:該函數是同步的,其流程與create流程相似,不再累贅。
4. delete函數
函數簽名:public void delete(final String path, int version, VoidCallback cb, Object ctx)
public void delete(final String path, int version, VoidCallback cb, Object ctx) { final String clientPath = path; // 驗證路徑是否合法 PathUtils.validatePath(clientPath); final String serverPath; // maintain semantics even in chroot case // specifically - root cannot be deleted // I think this makes sense even in chroot case. if (clientPath.equals("/")) { // 判斷是否是"/",即zookeeper的根目錄,根目錄無法刪除 // a bit of a hack, but delete(/) will never succeed and ensures // that the same semantics are maintained serverPath = clientPath; } else { serverPath = prependChroot(clientPath); } // 新生請求頭 RequestHeader h = new RequestHeader(); // 設置請求頭類型 h.setType(ZooDefs.OpCode.delete); // 新生刪除請求 DeleteRequest request = new DeleteRequest(); // 設置路徑 request.setPath(serverPath); // 設置版本號 request.setVersion(version); // 封裝成packet放入隊列,等待提交 cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null); }
說明:該函數是異步的,其流程也相對簡單,不再累贅。
5. multi函數
public List<OpResult> multi(Iterable<Op> ops) throws InterruptedException, KeeperException { for (Op op : ops) { // 驗證每個操作是否合法 op.validate(); } // reconstructing transaction with the chroot prefix // 新生事務列表 List<Op> transaction = new ArrayList<Op>(); for (Op op : ops) { // 將每個操作添加根空間后添加到事務列表中 transaction.add(withRootPrefix(op)); } // 調用multiInternal后返回 return multiInternal(new MultiTransactionRecord(transaction)); }
說明:該函數用於執行多個操作或者不執行,其首先會驗證每個操作的合法性,然后將每個操作添加根空間后加入到事務列表中,之后會調用multiInternal函數,其源碼如下
protected List<OpResult> multiInternal(MultiTransactionRecord request) throws InterruptedException, KeeperException { // 新生請求頭 RequestHeader h = new RequestHeader(); // 設置請求頭類型 h.setType(ZooDefs.OpCode.multi); // 新生多重響應 MultiResponse response = new MultiResponse(); // 新生響應頭 ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { // 判斷返回碼是否為0 throw KeeperException.create(KeeperException.Code.get(r.getErr())); } // 獲取響應的結果集 List<OpResult> results = response.getResultList(); ErrorResult fatalError = null; for (OpResult result : results) { // 遍歷結果集 if (result instanceof ErrorResult && ((ErrorResult)result).getErr() != KeeperException.Code.OK.intValue()) { //判斷結果集中是否出現了異常 fatalError = (ErrorResult) result; break; } } if (fatalError != null) { // 出現了異常 // 新生異常后拋出 KeeperException ex = KeeperException.create(KeeperException.Code.get(fatalError.getErr())); ex.setMultiResults(results); throw ex; } // 返回結果集 return results; }
說明:multiInternal函數會提交多個操作並且等待響應結果集,然后判斷結果集中是否有異常,若有異常則拋出異常,否則返回響應結果集。
6. exists函數
函數簽名:public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException { final String clientPath = path; // 驗證路徑是否合法 PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { // 生成存在性注冊 wcb = new ExistsWatchRegistration(watcher, clientPath); } // 添加根空間 final String serverPath = prependChroot(clientPath); // 新生請求頭 RequestHeader h = new RequestHeader(); // 設置請求頭類型 h.setType(ZooDefs.OpCode.exists); // 新生節點存在請求 ExistsRequest request = new ExistsRequest(); // 設置路徑 request.setPath(serverPath); // 設置Watcher request.setWatch(watcher != null); // 新生設置數據響應 SetDataResponse response = new SetDataResponse(); // 提交請求 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { // 判斷返回碼 if (r.getErr() == KeeperException.Code.NONODE.intValue()) { return null; } throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } // 返回結果的狀態 return response.getStat().getCzxid() == -1 ? null : response.getStat(); }
說明:該函數是同步的,用於判斷指定路徑的節點是否存在,值得注意的是,其會對指定路徑的結點進行注冊監聽。
7. exists
函數簽名:public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx) { final String clientPath = path; // 驗證路徑是否合法 PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { // 生成存在性注冊 wcb = new ExistsWatchRegistration(watcher, clientPath); } // 添加根空間 final String serverPath = prependChroot(clientPath); // 新生請求頭 RequestHeader h = new RequestHeader(); // 設置請求頭類型 h.setType(ZooDefs.OpCode.exists); // 新生節點存在請求 ExistsRequest request = new ExistsRequest(); // 設置路徑 request.setPath(serverPath); // 設置Watcher request.setWatch(watcher != null); // 新生設置數據響應 SetDataResponse response = new SetDataResponse(); // 將請求封裝成packet,放入隊列,等待執行 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
說明:該函數是異步的,與同步的流程相似,不再累贅。
之后的getData、setData、getACL、setACL、getChildren函數均類似,只是生成的響應類別和監聽類別不相同,大同小異,不再累贅。
三、總結
本篇博文分析了Watcher機制的ZooKeeper類,該類包括了對服務器的很多事務性操作,並且包含了同步和異步兩個版本,但是相對來說,較為簡單,也謝謝各位園友的觀看~