znode 可以被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化可以通知設置監控的客戶端,這個功能是zookeeper對於應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,集群管理,分布式鎖等等。
知識准備:
zookeeper定義的狀態有:
Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);
事件定義的的類型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);
watcher定義的的類型有Children(1), Data(2), Any(3);
在上一篇
zookeeper源碼分析之一客戶端
中,我們連接zookeeper時,啟動了一個MyWatcher
protected void connectToZK(String newHost) throws InterruptedException, IOException { if (zk != null && zk.getState().isAlive()) { zk.close(); } host = newHost; boolean readOnly = cl.getOption("readonly") != null; if (cl.getOption("secure") != null) { System.setProperty(ZooKeeper.SECURE_CLIENT, "true"); System.out.println("Secure connection is enabled"); } zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly); }
創建zookeeper示例時,使用到watchManager:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager = defaultWatchManager(); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); hostProvider = aHostProvider; cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
將傳進來的MyWatcher作為默認watcher,存入watchManager,然后通過ClientCnxn包裝后,啟動線程。
那我們先了解一下ClientCnxn吧,ClientCnxn管理客戶端socket的io,它維護了一組可以連接上的server及當需要轉換時可以透明的轉換到的一組server。
先了解一下如何獲取socket的吧:
private static ClientCnxnSocket getClientCnxnSocket() throws IOException { String clientCnxnSocketName = System .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); if (clientCnxnSocketName == null) { clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } try { return (ClientCnxnSocket) Class.forName(clientCnxnSocketName) .newInstance(); } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName); ioe.initCause(e); throw ioe; } }
接着啟動ClientCnxn的start()方法,在此方法中啟動了兩個線程:
public void start() { sendThread.start(); eventThread.start(); }
其中SendThread類為發送的請求隊列提供服務,並且產生心跳。它同時也產生ReadThread。
我們看一下SendThread的run方法的主體:
if (!clientCnxnSocket.isConnected()) { // don't re-establish connection if we are closing if (closing) { break; } startConnect(); clientCnxnSocket.updateLastSendAndHeard(); } if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
ClientCnxnSocketNetty實現了ClientCnxnSocket的抽象方法,它負責連接到server,讀取/寫入網絡流量,並作為網絡數據層和更高packet層的中間層。其生命周期如下:
loop: - try: - - !isConnected() - - - connect() - - doTransport() - catch: - - cleanup() close()
從上述描述中,我們可以看到ClientCnxnSocket的工作流程,先判斷是否連接,沒有連接則調用connect方法進行連接,有連接則直接使用;然后調用doTransport方法進行通信,若連接過程中出現異常,則調用cleanup()方法;最后關閉連接。故最主要的流程為doTransport()方法:
@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { try { if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } Packet head = null; if (needSasl.get()) { if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } } else { if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) { return; } } // check if being waken up on closing. if (!sendThread.getZkState().isAlive()) { // adding back the patck to notify of failure in conLossPacket(). addBack(head); return; } // channel disconnection happened if (disconnected.get()) { addBack(head); throw new EndOfStreamException("channel for sessionid 0x" + Long.toHexString(sessionId) + " is lost"); } if (head != null) { doWrite(pendingQueue, head, cnxn); } } finally { updateNow(); } }
我們簡化一下上面的程序,一個是異常處理addBack(head),另一個正常流程處理doWrite(pendingQueue, head, cnxn),我們先拋掉異常,走正常流程看看:
先獲取Packet:
Packet head = null; if (needSasl.get()) { if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } } else { if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) { return; } }
其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一個鏈表阻塞隊列,保存發出的請求;
然后執行doWrite方法:
/** * doWrite handles writing the packets from outgoingQueue via network to server. */ private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) { updateNow(); while (true) { if (p != WakeupPacket.getInstance()) { if ((p.requestHeader != null) && (p.requestHeader.getType() != ZooDefs.OpCode.ping) && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); synchronized (pendingQueue) { pendingQueue.add(p); } } sendPkt(p); } if (outgoingQueue.isEmpty()) { break; } p = outgoingQueue.remove(); } }
dowrite方法負責將outgoingQueue的報文通過網絡寫到服務器上。發送報文程序如上紅色所示:
private void sendPkt(Packet p) { // Assuming the packet will be sent out successfully. Because if it fails, // the channel will close and clean up queues. p.createBB(); updateLastSend(); sentCount++; channel.write(ChannelBuffers.wrappedBuffer(p.bb)); }
1. Packet報文的結構如下:
/** * This class allows us to pass the headers and the relevant records around. */ static class Packet { RequestHeader requestHeader; ReplyHeader replyHeader; Record request; Record response; ByteBuffer bb; /** Client's view of the path (may differ due to chroot) **/ String clientPath; /** Servers's view of the path (may differ due to chroot) **/ String serverPath; boolean finished; AsyncCallback cb; Object ctx; WatchRegistration watchRegistration; public boolean readOnly; WatchDeregistration watchDeregistration; /** Convenience ctor */ Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration) { this(requestHeader, replyHeader, request, response, watchRegistration, false); } Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration, boolean readOnly) { this.requestHeader = requestHeader; this.replyHeader = replyHeader; this.request = request; this.response = response; this.readOnly = readOnly; this.watchRegistration = watchRegistration; } public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("clientPath:" + clientPath); sb.append(" serverPath:" + serverPath); sb.append(" finished:" + finished); sb.append(" header:: " + requestHeader); sb.append(" replyHeader:: " + replyHeader); sb.append(" request:: " + request); sb.append(" response:: " + response); // jute toString is horrible, remove unnecessary newlines return sb.toString().replaceAll("\r*\n+", " "); } }
從createBB方法中,我們看到在底層實際的網絡傳輸序列化中,zookeeper只會講requestHeader和request兩個屬性進行序列化,即只有這兩個會被序列化到底層字節數組中去進行網絡傳輸,不會將watchRegistration相關的信息進行網絡傳輸。
2. 更新最后一次發送updateLastSend
void updateLastSend() { this.lastSend = now; }
3. 使用nio channel 發送字節緩存到server
channel.write(ChannelBuffers.wrappedBuffer(p.bb));
其中,bb的類型為ByteBuffer,在packet中進行了初始化。
this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind();
小結:
zookeeper客戶端和服務器的連接主要是通過ClientCnxnSocket來實現的,有兩個具體的實現類ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:
先判斷是否連接,沒有連接則調用connect方法進行連接,有連接則進入下一步;
然后調用doTransport方法進行通信,若連接過程中出現異常,則調用cleanup()方法;
最后關閉連接。
上述的發現可以在SendThread的run方法中體現。
另:Zookeeper的特性--》順序一致性:按照客戶端發送請求的順序更新數據。我們再sendThread里可以看到多次更新時間戳來保證順序一致性,如下: