深入理解 ZooKeeper客戶端與服務端的watcher回調


2020-02-08 補充本篇博文所描述的watcher回調的流程圖
回調的流程圖

watcher存在的必要性

舉個特容易懂的例子: 假如我的項目是基於dubbo+zookeeper搭建的分布式項目, 我有三個功能相同的服務提供者,用zookeeper當成注冊中心,我的三個項目得注冊進zookeeper才能對外暴露服務,但是問題來了,寫的java代碼怎么才能注冊進zookeeper呢?當然加入依賴,寫好配置文件再啟動就成了,這時,這三個服務體提供者就是zookeeper的客戶端了,zookeeper的客戶端不止一個,我選擇了哪個依賴,就是哪個客戶端,光有服務提供者不成啊,對外提供服務,我得需要服務消費者啊,於是用同樣的方式,把消費者也注冊進zookeeper,zookeeper中就存在了4個node,也就是4個客戶端,服務消費者訂閱zookeeper,向它拉取服務提供者的address,然后把地址緩存在本地, 進而可以遠程調用服務消費者,那么問題又來了,萬一哪一台服務提供者掛了,怎么辦呢?zookeeper是不是得通知消費者呢? 萬一哪一天服務提供者的address變了,是不是也得通知消費者? 這就是watcher存在的意義,它解決了這件事

watcher的類型

keeperState EventType 觸發條件 說明
SyncConnected None(-1) 客戶端與服務端建立連接 客戶端與服務端處於連接狀態
SyncConnected NodeCreate(1) watcher監聽的數據節點被創建 客戶端與服務端處於連接狀態
SyncConnected NodeDeleted(2) Watcher監聽的數據節點被刪除 客戶端與服務端處於連接狀態
SyncConnected NodeDataChanged(3) watcher監聽的node數據內容發生改變 客戶端與服務端處於連接狀態
SyncConnected NodeChildrenChange(4) 被監聽的數據節點的節點列表發生變更 客戶端與服務端處於連接狀態
Disconnect None(-1) 客戶端與服務端斷開連接 客戶端與服務端斷開連接
Expired (-112) None(-1) 會話超時 session過期,收到異常SessionExpiredException
AuthFailed None(-1) 1.使用了錯誤的scheme 2,SALS權限驗證失敗了 收到異常AuthFailedException

實驗場景:

假設我們已經成功啟動了zookeeper的服務端和客戶端,並且預先添加了watcher,然后使用控制台動態的修改下node的data,我們會發現watcher回調的現象

添加的鈎子函數代碼如下:

public class ZookepperClientTest {
    public static void main(String[] args) throws Exception {
        ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.err.println("連接,觸發");
            }
        });

    Stat stat = new Stat();
    
     //   todo 下面添加的事件監聽器可是實現事件的消費訂閱
      String content = new String(client.getData("/node1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // todo 任何連接上這個節點的客戶端修改了這個節點的 data數據,都會引起process函數的回調

                // todo 特點1:  watch只能使用1次
                if (event.getType().equals(Event.EventType.NodeDataChanged)){
                    System.err.println("當前節點數據發生了改變");
                }
            }
        }, stat));

看如上的代碼, 添加了一個自己的watcher也就是client.getData("/node1", new Watcher() {} 這是個回調的鈎子函數,執行時不會運行,當滿足的某個條件時才會執行, 比如: node1被刪除了, node1的data被修改了

getData做了哪些事情?

源碼如下: getdata,顧名思義,返回服務端的node的data+stat, 當然是當服務端的node發生了變化后調用的

主要主流如下幾個工作

  • 創建WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
    • 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象
  • 創建一個request,並且初始化這個request.head=getData=4
  • 調用ClientCnxn.submitRequest(...) , 將現存的這些信息進一步封裝
  • request.setWatch(watcher != null);說明他並沒有將watcher封裝進去,而是僅僅做了個有沒有watcher的標記
 public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
         // todo 校驗path
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            // todo DataWatchRegistration 繼承了 WatchRegistration
            // todo DataWatchRegistration 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);
        // todo 創建一個請求頭
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);

        // todo 創建了一個GetDataRequest
        GetDataRequest request = new GetDataRequest();
        // todo 給這個請求初始化,path 是傳遞進來的path,但是 watcher不是!!! 如果我們給定了watcher , 這里面的條件就是  true
        request.setPath(serverPath);
        request.setWatch(watcher != null); // todo 可看看看服務端接收到請求是怎么辦的

        GetDataResponse response = new GetDataResponse();

        // todo 同樣由 clientCnxn 上下文進行提交請求, 這個操作應該同樣是阻塞的
         // todo EventThread 和 SendThread 同時使用一份 clientCnxn的 submitRequest()
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 的源碼我寫在下面, 這里來到這個方法中,一眼能看到,它依然是阻塞的式的,並且requet被進一步封裝進packet

更重要的是 queuePacket()方法的最后一個參數,存在我們剛剛創建的path+watcher的封裝類

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    // todo 來到這個 queuePacket() 方法在下面, 這個方法就是將  用戶輸入-> string ->>> request ->>> packet 的過程
    Packet packet = queuePacket(h, r, request, response, null, null, null,
            null, watchRegistration);


    // todo 使用同步代碼塊,在下面的進行    同步阻塞等待, 直到有了Response響應才會跳出這個循環, 這個finished狀態就是在客戶端接受到服務端的
    // todo 的響應后, 將服務端的響應解析出來,然后放置到 pendingqueue里時,設置上去的
    synchronized (packet) {
        while (!packet.finished) {
            // todo 這個等待是需要喚醒的
            packet.wait();
        }
    }
    // todo 直到上面的代碼塊被喚醒,才會這個方法才會返回
    return r;
}

同樣,在queuePacket()方法中將packet提交到outgoingQueue中,最終被seadThread消費發送到服務端

服務端如何處理watchRegistration不為空的packet

后續我准備用一整篇博客詳解單機模式下服務端處理請求的流程,所以這篇博客只說結論

在服務端,用戶的請求最終會按順序流向三個Processor,分別是

  • PrepRequestProcessor
    • 負責進行一些狀態的修改
  • SyncRequestProcessor
    • 將事務日志同步到磁盤
  • FinalRequestProcessor
    • 相應用戶的請求

我們直接去看FinalRequestProcessor public void processRequest(Request request) {}方法,看他針對getData()方式的請求做出了哪些動作.下面來了個小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);跟進watcher的有無給服務端添加不同的Watcher

真的得划重點了,當我發現這一點時,我的心情是超級激動的,就像發現了新大陸一樣

case OpCode.getData: {
        lastOp = "GETD";
        GetDataRequest getDataRequest = new GetDataRequest();
        ByteBufferInputStream.byteBuffer2Record(request.request,
                getDataRequest);
        DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                ZooDefs.Perms.READ,
                request.authInfo);
        Stat stat = new Stat();
        // todo 這里的操作    getDataRequest.getWatch() ? cnxn : null 對應可客戶端的  跟進watcher有沒有而決定往服務端傳遞 true 還是false 相關
        // todo 跟進去 getData()
        byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                getDataRequest.getWatch() ? cnxn : null);
        //todo  cnxn的Processor()被回調, 往客戶端發送數據 , 什么時候觸發呢? 就是上面的  處理事務時的回調 第127行

        // todo 構建了一個 rsp ,在本類的最后面將rsp 響應給client
        rsp = new GetDataResponse(b, stat);
        break;
    }

繼續跟進這個getData()在服務端維護了一份path+watcher的map

public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        n.copyStat(stat);
        if (watcher != null) {
            // todo 將path 和 watcher 綁定在一起
            dataWatches.addWatch(path, watcher);
        }
        return n.data;
    }
}

客戶端打開命令行,修改服務端node的狀態

書接上回,當客戶單的代碼去創建ClientCnxn時,有下面的邏輯 , 它開啟了兩條守護線程, sendThread負責向服務端發送心跳,已經和服務端進行用戶相關的IO交流, EventThread就負責和txn事務相關的處理邏輯,級別上升到針對node

    // todo start就是啟動了在構造方法中創建的線程
    public void start() {
        sendThread.start();
        eventThread.start();
    }

到目前為止,客戶端就有如下三條線程了

  • 負責處理用戶在控制台輸入命令的主線程
  • 守護線程1: seadThread
  • 守護線程2: eventThread

跟進主線程的處理用戶輸入部分的邏輯代碼如下:

下面的代碼的主要邏輯就是處理用戶輸入的命令,當通過if-else選擇分支判斷用戶到底輸入的啥命令

按照我們的假定的場景,用戶輸入的命令是這樣的 set /path newValue 所以,毫無疑問,經過解析后代碼會去執行下面的stat = zk.setData(path, args[2].getBytes(),部分

  // todo zookeeper客戶端, 處理用戶輸入命令的具體邏輯
    // todo  用大白話講,下面其實就是把 從控制台獲取的用戶的輸入信息轉換成指定的字符, 然后發送到服務端
    // todo MyCommandOptions 是處理命令行選項和shell腳本的工具類
    protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
        // todo 在這個方法中可以看到很多的命令行所支持的命令
        Stat stat = new Stat();
        // todo 獲取命令行輸入中 0 1 2 3 ... 位置的內容, 比如 0 位置是命令  1 2 3 位置可能就是不同的參數
        String[] args = co.getArgArray();
        String cmd = co.getCommand();
        if (args.length < 1) {
            usage();
            return false;
        }

        if (!commandMap.containsKey(cmd)) {
            usage();
            return false;
        }

        boolean watch = args.length > 2;
        String path = null;
        List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
        LOG.debug("Processing " + cmd);

        if (cmd.equals("quit")) {
            System.out.println("Quitting...");
            zk.close();
            System.exit(0);
        } else if (cmd.equals("set") && args.length >= 3) {
            path = args[1];
            stat = zk.setData(path, args[2].getBytes(),
                    args.length > 3 ? Integer.parseInt(args[3]) : -1);
            printStat(stat);

繼續跟進stat = zk.setData(path, args[2].getBytes(), 下面的邏輯也很簡單,就是將用戶的輸入封裝進來request中,通過ClientCnxn類的submit方法提交到一個隊列中,等待着sendThread去消費

這次有目的的看一下submitRequest的最后一個參數為null, 這個參數是WatchRegistration的位置,一開始置為null

 public Stat setData(final String path, byte data[], int version)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.setData);
        SetDataRequest request = new SetDataRequest();
        request.setPath(serverPath);
        request.setData(data);
        request.setVersion(version);
        
        SetDataResponse response = new SetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        return response.getStat();
    }

跟進這個submitRequest()方法, 源碼如下,不處所料的是,它同樣被阻塞住了,直到服務端給了它響應

當前代碼的主要邏輯就是將request封裝進packet,然后將packet添加到ClintCnxn維護的outgoingQueue隊列中等待sendThread的消費

這次來到這個方法是因為我們在控制台輸入的set 命令而觸發的,比較重要的是本次packet攜帶的WatchRegistration==null, 毫無疑問,這次服務端在FinalRequestProcessor中再處理時取出的watcher==null, 也就不會將path+watcher保存進maptable中

重要:發送事務消息

FinalRequestProcessorpublic void processRequest(Request request) {}方法中,有如下代碼

//todo 請求頭不為空
    if (request.hdr != null) {
        // 獲取請求頭
       TxnHeader hdr = request.hdr;
       // 獲取事務
       Record txn = request.txn;
        // todo 跟進這個方法-----<--!!!!!!-----處理事務的邏輯,在這里面有向客戶端發送事件的邏輯, 回調客戶端的watcher----!!!!!!-->
       rc = zks.processTxn(hdr, txn);
    }

繼續跟進去

// todo 處理事物日志
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    // todo 繼續跟進去!!!!!!!!!
    // todo 跟進 processTxn(hdr, txn)
    rc = getZKDatabase().processTxn(hdr, txn);

跟進ZkDatabase.java中的processTxn(hdr, txn)方法

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    // todo 跟進 processTxn
    return dataTree.processTxn(hdr, txn);
}

跟進到DataTree.java

  public ProcessTxnResult processTxn(TxnHeader header, Record txn)
    {
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) { // todo 根據客客戶端發送過來的type進行switch,
                case OpCode.create:
                    CreateTxn createTxn = (CreateTxn) txn;
                    rc.path = createTxn.getPath();
                    // todo  跟進這個創建節點的方法
                    createNode(
                            createTxn.getPath(),

根據請求頭的值,進而判斷出走到那個switch的分支,當前我們在控制台觸發,進入到setData分支如下:跟進這個方法中可以看到它主要做了如下幾件事

  • 使用傳遞進來的新值替代舊data
  • dataWatches.triggerWatch(path, EventType.NodeDataChanged);**觸發指定的事件watch,什么事件呢? NodeDataChange, 觸發了哪個watcher呢? 跟進去查看 **


    //todo  setData
    public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
        synchronized (n) {
            // todo 修改內存的數據
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
        // todo 終於 看到了   服務端 關於觸發NodeDataChanged的事件
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }

補充Watch & EventType 類圖

watcherAndEventType

跟進去 dataWatches.triggerWatch(path, EventType.NodeDataChanged);,源碼如下, 主要的邏輯就是取出存放在服務端的watch,然后逐個回調他們的processor函數,問題來了,到底是哪些watcher呢? 其實就是我們在客戶端啟動時添加getData()時存進去的wather,也就是ServerCnxn


 // todo 跟進去服務端的 觸發事件,  但是吧, 很納悶. 就是沒有往客戶端發送數據的邏輯
    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            // todo 繼續跟進去, 看它如何回調的
            w.process(e);
        }
        return watchers;
    }

懷着激動的心情去看看ServerCnxn的process()方法做了什么事?

來到ServerCnxn的實現類NIOServerCnxn, 確實很激動,看到了服務端在往客戶端發送事務型消息, 並且new ReplyHeader(-1, -1L, 0)第一個位置上的參數是-1, 這一點很重要,因為客戶端在接受到這個xid=-1的標記后,就會將這條響應交給EventThread處理

    @Override
    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        // todo  往服務端發送了 e event類型消息
        sendResponse(h, e, "notification");
    }

處理回調回調watch使用的響應

進入到SendThread的讀就緒源碼部分,如下: 它根據header.xid=-1就知道了這是事務類型的響應

// todo 服務端拋出來的事件, 客戶端將把他存在EventThread的 watingEvents 隊列中
// todo 它的實現邏輯也是這樣, 會有另外一個線程不斷的消費這個隊列
if (replyHdr.getXid() == -1) {
    // -1 means notification
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got notification sessionid:0x"
                + Long.toHexString(sessionId));
    }
    // todo 創建watcherEvent 並將服務端發送回來的數據,反序列化進這個對象中
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");

    // convert from a server path to a client path
    // todo 將server path 反轉成 client path
    if (chrootPath != null) {
        String serverPath = event.getPath();
        if (serverPath.compareTo(chrootPath) == 0)
            event.setPath("/");
        else if (serverPath.length() > chrootPath.length())
            event.setPath(serverPath.substring(chrootPath.length()));
        else {
            LOG.warn("Got server path " + event.getPath()
                    + " which is too short for chroot path "
                    + chrootPath);
        }
              WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                //todo 跟進去
                eventThread.queueEvent(we);
                return;
            }
    }

在這個方法的最后,將這個相應添加進EventThread消費的隊列中,跟進 eventThread.queueEvent(we);

// todo
public void queueEvent(WatchedEvent event) {
    // todo 如果事件的類型是 none, 或者sessionState =  直接返回
    /**
     *   todo 事件的類型被設計成 watcher 接口的枚舉
     *   None (-1),
     *   NodeCreated (1),
     *   NodeDeleted (2),
     *   NodeDataChanged (3),
     *   NodeChildrenChanged (4);
     */
    if (event.getType() == EventType.None
            && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();

    // materialize the watchers based on the event
    // todo 根據事件的具體類型,將觀察者具體化, 跟進去
    // todo 這個類是ClientCnxn的輔助類,作用就是將watcher 和它觀察的事件封裝在一起
    WatcherSetEventPair pair = new WatcherSetEventPair(
            //todo 跟進這個 materialize方法. 其實就是從map中取出了和當前client關聯的全部 watcher set

            watcher.materialize(event.getState(), event.getType(),
                    event.getPath()),
            event);
    // queue the pair (watch set & event) for later processing
    // todo 將watch集合 和 event 進行排隊(按順序添加到隊列里了), 以便后續處理 , 怎么處理呢?  就在EventThread的run循環中消費
    // todo watingEvent ==>  LinkedBlockingQueue<Object>
    waitingEvents.add(pair);
}

上面的代碼主要做了如下幾件事:

  • 從map中取出和當前事件相關的全部watcher
  • 將watcher set 添加進 waitingEvents隊列中,等待EventThead的消費

跟進 watcher.materialize(event.getState(), event.getType(), 會追到下面的代碼

case NodeDataChanged: // todo node中的data改變和 nodeCreate 都會來到下面的分支
        case NodeCreated:
            synchronized (dataWatches) {
                // todo dataWatches 就是剛才存放  path : watcher 的map
                // todo dataWatches.remove(clientPath) 移除並返回clientPath對應的watcher , 放入 result 中
                addTo(dataWatches.remove(clientPath), result);
            }

上面的dataWatches 就是保存path+watcher set的map, 上面的操作是移除並返回指定的watcher,這也說明了,為什么zk原生客戶端添加的watcher僅僅會回調一次

EventThread是如何消費waitingEvents的

EventThread是一條守護線程, 因此它擁有自己的不斷在運行的run方法,它就是在這個run方法中對這個隊列進行消費的


        @Override
        public void run() {
            try {
                isRunning = true;
                // todo 同樣是無限的循環
                while (true) {
                    // todo 從watingEvnets 中取出一個 WatcherSetEventPair
                    Object event = waitingEvents.take();
                    if (event == eventOfDeath) {
                        wasKilled = true;
                    } else {
                        // todo 本類方法,處理這個事件,繼續進入,方法就在下面
                        processEvent(event);
                    }
                    if (wasKilled)
                        synchronized (waitingEvents) {
                            if (waitingEvents.isEmpty()) {
                                isRunning = false;
                                break;
                            }
                        }
                }
            } catc
        

繼續跟進它的processEvent(event),最終會在這個方法中調用下面的代碼,這里的watcher就是我在本篇博客的開始位置添加進去的watcher,至此打完收工

 watcher.process(pair.event);

總結:

當客戶端啟動時添加watcher對某一個特定path上的node進行監聽時 , 客戶端的watcher被封裝進WatcherRegistion中再進一步發送的服務端

watcher不為空的packet達到服務端后會被巧妙的處理,將ServerCnxn當成watcher注冊添加到服務端維護的那份watcher map table中

當watcher關聯的node發生了NodeCreate,NodeDeleted ,NodeDataChannged,NodeChildrenChannged時,在最后一個處理器就會觸發發送事務類型事件的動作,其實就是回調ServerCnxn的process()方法

事務類型的響應返回到客戶端,跟進xid區分出到底是哪種響應,如-1是NodeDataChanged,最終會把這個事務事件提交到EventThread消費的waitingEvents等待EventThread消費它,回調客戶端的watcher的process()方法

如果覺得對您有幫助,歡迎點個推薦, 如果有錯誤,歡迎指出


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM