每一個Watcher具有如下屬性:
1.KeeperState
2.EventType
3.path
4.process(WatchedEvent evnet)回掉方法
Watcher干嘛的?用戶監聽session的狀態,數據節點的狀態等。
watcher種類:defaultWatcher,非defaultWatcher
dafaultWatcher是在創建ZooKeeper對象時傳遞的watcher參數。非defaultWatcher只得是,在調用getData,getChildren,exists函數時傳遞的watcher對象。
二者的相同點在於:都可以監聽數據節點的狀態,比如在getData中使用了defaultWatcher,那么當監聽的節點內容發生改變時,defaultWatcher就會收到通知。如果沒有使用了非defaultWatcher,也是同樣的。
而這的不同點在於:defaultWatcher會監聽session的生命周期,比如session創建成功了,失效了等,而非defaultWatcher不具有這個職責。其次defaultWatcher並不與某一個節點路徑相互關聯。
notification event,非notification event
客戶端需要接受服務器發送過來的消息,第一種消息是類似於Watcher回掉這種的,我們叫做notification,他的特點是服務器主動發送消息給客戶端的,比如客戶端a在數據節點a上設置了getData監聽,當客戶端b修改了節點a后,服務器主動發送NodeDataChanged消息給客戶端a。第二中消息是類似於create,getData這種,他們向服務器發送對應的請求后,然后將請求放進到pendingQueue中,然后等待服務器的響應,當接受到服務器的響應后,再從pendingQueue中取出請求,然后進行回掉。對於第二中,只有兩類請求是不需要服務器響應的,ping,autu。
Watcher和AsyncCallback區別
Watcher用於監聽節點的,比如getData對數據節點a設置了watcher,那么當a的數據內容發生改變時,客戶端會收到NodeDataChanged通知,然后進行watcher的回掉。
AsyncCallback是在以異步方式使用ZooKeeper APi時,用戶獲取api的處理結果的。而這具有本質的不同,不要混淆。
Watcher和AsyncCallback回調思路
為什么將Watcher和AsyncCallback放在一起說,這是因為從客戶端的角度來看,他們都是異步的,客戶端有個后台線程負責處理Watcher和AsyncCallback回調,這里我們好奇的是,客戶端如何處理回調的。
一個最簡單的方法就是生產者-消費者模式,生產者向push一個event到waitingEvents事件隊列中,消費者線程從waitingEvents中取出event執行回調。這里的event可以是Watcher也可以是AsyncCallback。現在有幾個問題需要解決:
1.生產者線程什么時候push event:對於Watcher回調來說,如果客戶端在調用getData API時向節點A注冊了一個Watcher,那么當節點A的數據發生改變時服務器會主動向客戶端發出響應,客戶端收到這個響應后,反序列化出WatchedEvent,然后找到所有在節點A注冊的Watcher,將他們打包成一個event並push到waitingEvents。
對於AsyncCallback來說,如果客戶端在調用getData API來獲取節點A的數據並且傳遞了AsyncCallback,那么會將AsyncCallback保存到客戶端的pendingQueue並向服務器獲取節點A的數據內容,服務器在獲取完節點A的數據內容后會向服務器發送響應,客戶端拿到這個響應后取出pendingQueue中的AsyncCallback,然后打包成event 並push到waitingEvents。
Watcher和AsyncCallback回調實現
1.接受服務器的響應,並反序列化出ReplyHeader: 有一個單獨的線程SendThread,負責接收服務器端的響應。假設他接受到的服務器傳遞過來的字節流失incomingBuffer。那么他做的第一步就應該將這個incomingBuffer反序列化出ReplyHeader。
2.判斷響應類型:判斷ReplyHeader是Wacher響應還是AsyncCallback響應:ReplyHeader.getXid()存儲了響應類型。
2.1 如果是Wacher類型響應:從ReplyHeader中創建WatchedEvent,WatchedEvent里面存儲了節點的路徑,然后去WatcherManager中找到和這個節點相關聯的所有Wacher,將他們寫入到EventWaiting的waitingEvents中。
2.2 如果是AsyncCallback類型響應:從ReplyHeader中讀取response,這個response描述了是Exists,setData,getData,getChildren,create.....中的哪一個異步回調。從pendingQueue中拿到Package,Package中的cb存儲了AsyncCallback,也就是異步API的結果回調。最后將Package寫入到EventThreadwaitingEvents中。
3.回調:EventThread從waitingEvents中取出
同步getData源碼走讀:
/** * Return the data and the stat of the node of the given path. * <p> * If the watch is non-null and the call is successful (no exception is * thrown), a watch will be left on the node with the given path. The watch * will be triggered by a successful operation that sets data on the node, or * deletes the node. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * * @param path the given path * @param watcher explicit watcher * @param stat the stat of the node * @return the data of the node * @throws KeeperException If the server signals an error with a non-zero error code * @throws InterruptedException If the server transaction is interrupted. * @throws IllegalArgumentException if an invalid path is specified */ public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse();
//提交請求,等待請求處理完畢 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader();
//提交請求到outgoingQueue,outgoingQueue里面的請求是將要發送到服務器的請求,ClientCnxnSocket的doIO()會從outgoingQueue中取出隊列發送到服務器 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
//阻塞等待,直到這個請求處理完成 synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
1.將請求頭,請求體,響應等送入隊列,返回一個packet,然后等待packet完成。
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent.
//這里不會為這個packet生成xid,當ClientCnxnSocket::doIO從outgoingQueue中取出packet才生成xid synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath;
//如果會話已經關閉 if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing
//如果客戶端請求關閉會話
if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
由於outgoingQueue並不是阻塞隊列並且需要向其添加packet對象,所以需要對其synchronized。最后調用sendThread.getClientCnxnSocket().wakeupCnxn.
/** * @return true if a packet was received * @throws InterruptedException * @throws IOException */ void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); }
//優先處理讀響應 if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else { // Just in case enableWrite(); } } } }
鎖定outgoingQueue,獲取一個可以發送的packet,如果則個packet還沒有被發送,則p.bb!=null,然后生成xid。
xid初始值為1,每次發送一個packet時,都會遞增xid,將其設置到請求頭中。
最后,將packet寫入到pendingQueue中。
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header");
//雖然客戶端會發送ping或者auth消息給服務器,但是客戶端並不需要等待服務器的響應,也就是說他並沒有將請求寫入到pendingQueue中 if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; }
//notification類型的通知,watcher回掉相關邏輯。 if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } //根據<clientPath,EventType>從ZKWatcherManager中取出相關的wacher,然后封裝成:
//<EventType(NodeDataChanged),Path(/abc),watche1 for getData,WatchedEvent>
//<EventType(NodeDataChanged),Path(/abc),watche1 for exists,WatchedEvent>
eventThread.queueEvent( we ); return; } // If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. if (clientTunneledAuthenticationInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia,"token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally {
//客戶端發送的非ping,auth請求,比如getData,setData等,注意事項,如果客戶端注冊了新的wather,需要將這個watcher保存到ZKWatcherManager中。如果是同步調用,直接通知客戶端完成即可,如果是異步調用,就需要進行AsyncCallback回掉。 finishPacket(packet); } }
private void finishPacket(Packet p) {
//如果用戶顯示的注冊了watcher,比如在getData中注冊了非默認watcher,那么就將watcher添加到ZKWatcherManager中去,很重要的一步哦
//如果沒有這一步,也就沒有后面的watcher回掉了。
if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); }
//cb就是AsnycCallback,如果為null,表明是同步調用的接口,不需要異步回掉,因此,直接notifyAll即可。 if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
如果watchRegistration!=null,
如果p.cb==null,說明這個請求已經處理完畢了,因此通知packet完成即可。
最后,將packet放進waitingEvents中,然后EventThread就可以從waitingEvents中取出packets,然后執行客戶端邏輯。
private void processEvent(Object event) { try {
//watcher回掉邏輯 if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else {
//create,getData等需要異步回掉的 Packet p = (Packet) event; int rc = 0; String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.cb == null) { LOG.warn("Somehow a null cb got to EventThread!"); } else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { StatCallback cb = (StatCallback) p.cb; if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetDataResponse) { DataCallback cb = (DataCallback) p.cb; GetDataResponse rsp = (GetDataResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getData(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetACLResponse) { ACLCallback cb = (ACLCallback) p.cb; GetACLResponse rsp = (GetACLResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getAcl(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetChildrenResponse) { ChildrenCallback cb = (ChildrenCallback) p.cb; GetChildrenResponse rsp = (GetChildrenResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren()); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetChildren2Response) { Children2Callback cb = (Children2Callback) p.cb; GetChildren2Response rsp = (GetChildren2Response) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof CreateResponse) { StringCallback cb = (StringCallback) p.cb; CreateResponse rsp = (CreateResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath() .substring(chrootPath.length()))); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.cb instanceof VoidCallback) { VoidCallback cb = (VoidCallback) p.cb; cb.processResult(rc, clientPath, p.ctx); } } } catch (Throwable t) { LOG.error("Caught unexpected throwable", t); } } }
客戶端在節點a注冊了如下watcher:
getData(a,watcher1)
exists(a,watcher2)
getChildren(a,watcher3)
因此與a關聯的watcher有watcher1和watcher2。如果節點a被修改了,那么客戶端會收到notification 類型的通知,這里應是NodeDataChanged事件類型,此時,客戶端需要回掉watcher1和watcher2.也就是說,他需要根據<EventType,clientPath>來找到與節點a對應的所有watcher。
作者:FrancisWang
郵箱:franciswbs@163.com
出處:http://www.cnblogs.com/francisYoung/
本文地址:http://www.cnblogs.com/francisYoung/p/5225703.html
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。