深入理解 ZooKeeper單機客戶端的啟動流程


2020-0208 補充整個過程的流程圖
流程圖

客戶端的啟動流程

https://img2018.cnblogs.com/blog/1496926/201909/1496926-20190925213211418-1969928795.png
看上面的客戶端啟動的腳本圖,可以看到,zookeeper客戶端腳本運行的入口ZookeeperMain.java的main()方法, 關於這個類可以理解成它是程序啟動的輔助類,由它提供開始的位置,進而加載出zk client的上下文

創建ZooKeeperMain對象

https://img2018.cnblogs.com/blog/1496926/201909/1496926-20190925213210345-735408900.png

// todo zookeeper的入口方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
    // todo new ZK客戶端
    ZooKeeperMain main = new ZooKeeperMain(args);

    // todo run方法的實現在下面
    main.run();
}

跟蹤ZooKeeperMain main = new ZooKeeperMain(args); 能往下追很長的代碼,提前說main.run()的作用,就是對用戶輸入的命令進行下一步處理

如上是入口函數的位置,跟進這兩個函數,可以找到我們在client端的命令行中可以輸入命令和zookeeper服務端進行通信的原因(開起了新的線程),以及zookeeper的客戶端所依賴的其他類

跟進ZooKeeperMain main = new ZooKeeperMain(args);

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    // todo 連接到客戶端
    connectToZK(cl.getOption("server"));
    }

我們在命令行啟動客戶端時,輸入命令zkCli.sh -server localhost:2181,其中的args數組, 就是我們在啟動就是我們輸入的參數,

構建zookeeperMain對象時,上面主要做了兩件事

  • 解析args參數數組
  • 連接客戶端

解析參數數組的邏輯就在下面, 很熟悉,就是我們在命令行啟動zookeeper時輸入的命令可選項

  public boolean parseOptions(String[] args) {
    List<String> argList = Arrays.asList(args);
    Iterator<String> it = argList.iterator();

    while (it.hasNext()) {
        String opt = it.next();
        try {
            if (opt.equals("-server")) {
                options.put("server", it.next());
            } else if (opt.equals("-timeout")) {
                options.put("timeout", it.next());
            } else if (opt.equals("-r")) {
                options.put("readonly", "true");
            }
        } catch (NoSuchElementException e) {
            System.err.println("Error: no argument found for option "
                    + opt);
            return false;
        }

        if (!opt.startsWith("-")) {
            command = opt;
            cmdArgs = new ArrayList<String>();
            cmdArgs.add(command);
            while (it.hasNext()) {
                cmdArgs.add(it.next());
            }
            return true;
        }
    }
    return true;
}

創建ZooKeeper客戶端的對象

https://img2018.cnblogs.com/blog/1496926/201909/1496926-20190925213209884-1525817308.png

接着看如果連接客戶端, connectToZK(String newHost) 同樣是本類方法,源碼如下:

// todo 來到這里
protected void connectToZK(String newHost) throws InterruptedException, IOException {
    if (zk != null && zk.getState().isAlive()) {
        zk.close();
    }
    //todo  命令行中的server 后面跟着 host主機地址
    host = newHost;
    boolean readOnly = cl.getOption("readonly") != null;
    // todo 創建zookeeper的實例
    zk = new ZooKeeper(host,
                        Integer.parseInt(cl.getOption("timeout")),
                        new MyWatcher(), readOnly);
}

到這里算是個小高潮吧,畢竟看到了zookeeper client的封裝類ZooKeeper, 這個類上的注解大概是這么介紹這個類的

  • 它是個Zookeeper 客戶端的封裝類, 它的第一個參數是 host:port,host:port,host:port這種格式的字符串,逗號左右是不同的服務端的地址
  • 會異步的創建session,通常這個session在構造函數執行完之間就已經創建完成了
  • watcher 是監聽者,它被通知的時刻不確定,可能是構造方法執行完成前,也可能在這之后
  • 只要沒有連接成功, zookeeper客戶端,會一直嘗試從提供的服務地址串中選擇出一個嘗試鏈接

跟進ZooKeeper的構造方法

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    watchManager.defaultWatcher = watcher;

    // todo 包裝服務端的地址
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    //todo 將服務端的地址封裝進 StaticHostProvider -> HostProvider中
    HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());

    // todo 創建客戶端的上下文, 這個上下文對象的亮點就是它維護了一個客戶端的socket
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            // todo 跟進這個方法,getClientCnxnSocket, 獲取出客戶端上下文中的socket
            getClientCnxnSocket(), canBeReadOnly);
    // todo 啟動客戶端
    cnxn.start();
}

主要做了這么幾件事

  • 將服務端的地址解析封裝進了StaticHostProvider類中, 可以把這個類理解成專門存放服務端地址的set 集合
  • 創建出了客戶端的上下文對象: ClientCnxn, 當然在這之前,入參位置還有一個getClientCnxnSocket()這個函數可以創建出客戶端的NIO Socket
  • 然后調用cnxn.start() 其實就是啟動了客戶端的另外兩條線程sendThreadeventThread 下面會詳細說

創建客戶端的 NioSocket

https://img2018.cnblogs.com/blog/1496926/201909/1496926-20190925213209392-569368054.png

繼續跟進源碼 getClientCnxnSocket()通過反射,zk客戶端使用的socket對象是ClientCnxnSocketNIO

 //todo 通過反射創建出客戶端上下文中的 socket , 實際的ClientCnxnSocketNIO 是 ClientCnxnSocket的子類
    // todo --->  zookeeper 封裝的 NIO的邏輯都在   實際的ClientCnxnSocketNIO
    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
        // todo zookeeper.clientCnxnSocket
        String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);

        if (clientCnxnSocketName == null) {
            // todo 上面String其實就是這個類的name, 根進去看一下它的屬性
            // todo 這個類維護了NioSocket使用到的 selector 選擇器 , 已經發生的感興趣的事件SelectionKey
            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
        }

        try {
            // todo 可以看到客戶端使用的 NioSocket
            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()
                    .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + clientCnxnSocketName);
            ioe.initCause(e);
            throw ioe;
        }
    }

創建 ClientCnxn客戶端的上下文

https://img2018.cnblogs.com/blog/1496926/201909/1496926-20190925213208967-626750030.png

創建上下文,構造函數中的諸多屬性都是在前面讀取配置文件或是新添加進來的,重點是最后兩行,它創建了兩條線程類,和zk客戶端的IO息息相關

 public   ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId; // todo 剛才傳遞過來的值為0
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();
        // todo 添加read的超時時間
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        
        // todo  創建了一個seadThread 線程
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
    }

創建SendThread

https://img2018.cnblogs.com/blog/1496926/201909/1496926-20190925213208290-622984818.png

sendThred是一個客戶端的線程類,什么時候開啟? 其實就在上面,當創建了ClientCnxn后,調用的cnxn.start()就是在開啟它的run() , 它有什么作用? 它的run()是一個無限循環,除非運到了close的條件,否則他就會一直循環下去, 比如向服務端發送心跳,或者向服務端發送我們在控制台輸入的數據以及接受服務端發送過來的響應

這是他的構造方法,可以看到它還是一個守護線程,並擁有客戶端socket的引用,有了NIO Socket相關技能

//todo
SendThread(ClientCnxnSocket clientCnxnSocket) {
    super(makeThreadName("-SendThread()"));
    // todo 設置狀態 Connecting
    state = States.CONNECTING;
    // todo 就是在 Zookeeper new ClientCnxn 時, 在倒數第二個位置使傳遞進去一個函數實際的
    this.clientCnxnSocket = clientCnxnSocket;
    // todo 設置成守護線程
    setDaemon(true);
}

它的Run方法, 真的是好長啊, 比我上面寫的部分內容還長(大概兩百行了), 大概它的流程 ,每次循環:

  • 檢查一下客戶端的socket有沒有和服務端的socket建立連接
    • 沒有建立連接
      • 嘗試選出其他的server地址進行連接
      • 如果滿足close的條件,直接break 跳出整個while循環
    • 如果已經建立了連接
      • 計算 to = 讀取的超時時間 - 服務端的響應時間
    • 未連接的狀態
      • 計算 to = 連接超時時間 - 服務端的響應時間
    • 上面的兩個to, 如果小於0, 說明客戶端和服務端通信出現了異常, 很可能是server的session time out,於是拋出異常
    • 如果連接狀態是健康的,向服務端發送心跳
    • clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);向服務端發送數據

在這個負責和服務端進行IO操作的線程中,只要不是close或其他重大錯誤,一般可以預知的異常都有try起來,然后記錄日志,並沒有其他操作,循環還是會進行

// todo introduce 介紹
    clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    // todo 這個while循環中存在建立連接的過程, 已經連接建立失敗后不斷重試的過程
    //todo  state.isAlive() 默認是 NOT_CONNECTED
    while (state.isAlive()) {
        try {


//todo 1111  如果socket還沒有連接 /////////////////////////////////////////////////////////////////////////////////////////////////////////

            //todo  如果socket還沒有連接
            if (!clientCnxnSocket.isConnected()) {
                // todo 判斷是不是第一次連接, 如果不是第一次進入下面try代碼塊, 隨機產生一個小於一秒的時間
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                // todo 如果是closing 或者 已經關閉了, 直接退出這個循環
                if (closing || !state.isAlive()) {
                    break;
                }
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    // todo 連接失敗時,來這里重試連接
                    // todo 從我們傳遞進來的host地址中選擇一個地址
                    serverAddress = hostProvider.next(1000);
                }

                // todo client和server進行socket連接
                // todo  跟進去 ,實現邏輯在上面
                // todo  這個方法開始建立連接,並將 isFasterConnect改成了 false
                startConnect(serverAddress);
                clientCnxnSocket.updateLastSendAndHeard();
            }

 //todo  2222 如果socket處於連接狀態 /////////////////////////////////////////////////////////////////////////////////////////////////////////

            // todo 下面的連接狀態
            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                           LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }

                    if (sendAuthEvent == true) {
                        eventThread.queueEvent(new WatchedEvent(
                              Watcher.Event.EventType.None,
                              authState,null));
                    }
                }
                // todo  連接成功的話執行to 為下面值
                // todo  to = 讀取的超時時間 -  上一次的讀取時間
                // todo 如果預訂的超時時間 - 上次讀的時間 <= 0 說明超時了
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                // todo 如果沒有連接成功, 就會來到這里, 給 to 賦值
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

 //todo  3333 異常處理 /////////////////////////////////////////////////////////////////////////////////////////////////////////


            // todo 下面拋出來了異常
            if (to <= 0) {
                String warnInfo;
                warnInfo = "Client session timed out, have not heard from server in "
                    + clientCnxnSocket.getIdleRecv()
                    + "ms"
                    + " for sessionid 0x"
                    + Long.toHexString(sessionId);
                LOG.warn(warnInfo);
                // todo 這里拋出來了異常, 下面的try 就會把它抓住
                throw new SessionTimeoutException(warnInfo);
            }

//todo  44444 連接成功執行的邏輯 /////////////////////////////////////////////////////////////////////////////////////////////////////////


            // todo 下面的是連接成功執行的邏輯
            if (state.isConnected()) {
                // todo  為了防止競爭狀態丟失發送第二個ping, 同時也避免出現很多的ping
            	//1000(1 second) is to prevent(阻止) race condition missing to send the second ping
            	//also make sure not to send too many pings when readTimeout is small 
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                		((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    // todo 客戶端一直在這里循環, 如果連接成功的話, 每次循環都來到這個邏輯這里發送 ping
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }

//todo 55555 /////////////////////////////////////////////////////////////////////////////////////////////////////////

            // If we are in read-only mode, seek for read/write server
            // todo 只讀狀態 相關邏輯
            if (state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout =
                        Math.min(2*pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }

//todo  66666 /////////////////////////////////////////////////////////////////////////////////////////////////////////


            // todo 消費outgoingqueue, 完成向服務端的發送發送
            // todo doTransport 是 ClientCnxnSocket 的抽象方法, 實現類clientCnxnSocketNio
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            // todo 在這個try中處理里面的拋出來的異常
            if (closing) {
                // todo 如果是請求關閉, 直接退出 break 出while循環
                if (LOG.isDebugEnabled()) {
                    // closing so this is expected
                    LOG.debug("An exception was thrown while closing send thread for session 0x"
                            + Long.toHexString(getSessionId())
                            + " : " + e.getMessage());
                }
                break;
            } else {
                // todo 只要不是退出異常, 下面的異常都是僅僅打印了一下出現了什么異常
                // this is ugly, you have a better way speak up
                if (e instanceof SessionExpiredException) {
                    LOG.info(e.getMessage() + ", closing socket connection");
                } else if (e instanceof SessionTimeoutException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof EndOfStreamException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof RWServerFoundException) {
                    LOG.info(e.getMessage());
                } else if (e instanceof SocketException) {
                    LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
                } else {
                    LOG.warn("Session 0x{} for server {}, unexpected error{}",
                                    Long.toHexString(getSessionId()),
                                    serverAddress,
                                    RETRY_CONN_MSG,
                                    e);
                }
                // todo 這個方法中, isFirstConnect = true
                cleanup();
                if (state.isAlive()) {
                    eventThread.queueEvent(new WatchedEvent(
                            Event.EventType.None,
                            Event.KeeperState.Disconnected,
                            null));
                }
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }
        }
    } // todo while循環的結束符號 , 這是個while循環, 除了上面的close其他異常都會繼續循環, 接着上去再看一遍

    cleanup();
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                Event.KeeperState.Disconnected, null));
    }
    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
            "SendThread exited loop for session: 0x"
                   + Long.toHexString(getSessionId()));
}

在上面這個200行的Run方法中比較值得注意的幾個方法如下

  • 如果做到下次選出一個非當前server的地址

針對下標運行,對數組的size取模, 再賦值給自己,所以就實現了從0 - array.size()的循環

  public InetSocketAddress next(long spinDelay) {
        currentIndex = ++currentIndex % serverAddresses.size();
        if (currentIndex == lastIndex && spinDelay > 0) {
            try {
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        } else if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }
  • 如果檢查到了沒有連接的話,就是用clientCnxnSocket進行連接

這個函數中,將標記是否是第一次連接的標記置為了flase, 並且拿到了sessionid

   // todo 保證連接的邏輯
        void primeConnection() throws IOException {
            LOG.info("Socket connection established to "
                     + clientCnxnSocket.getRemoteSocketAddress()
                     + ", initiating session");
            isFirstConnect = false;
            //todo  創建了一個建立連接的request, 並且在下面將它添加進來 outgoingqueue
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            synchronized (outgoingQueue) {
            ... 如watcher 相關的邏輯

SendThread 和 服務端的IO溝通

跟進上面Run方法的如下方法,doTranprot

  clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);

他是本類的抽象方法,具體的實現類是clientCnxnSocketNIO

跟進這個方法,其中有一步跟重要doIO(pendingQueue, outgoingQueue, cnxn);

   for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // todo 建立連接的邏輯
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                // todo 往服務端發送數據的邏輯 , 方法在上面的64行
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
  • DoIo的源碼如下

它分成了兩大模塊

  • 讀就緒, 讀取服務端發送過來的數據
  • 寫就緒, 往客戶端發送用戶在控制台輸入的命令
   void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        // todo 通過key獲取服務端的channel
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        // TODO 讀就緒
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                // todo 返回buffer
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) { //todo 連接有沒有初始化, 來這之前被改成了 flase ,現在
                    // todo 讀取服務端發給我的連接請求的結果
                    readConnectResult(); // primeConnection()
                    enableRead();
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    //todo 如果已經初始化了, 就來這里讀取響應, 跟進去
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        //todo 寫就緒
        if (sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                // todo 查詢出可發送的packet
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
                        p.createBB();
                    }
                    // todo 往服務端發送數據 packet.ByteBuf
                    sock.write(p.bb); // 發送服務端
                    if (!p.bb.hasRemaining()) { //todo !hasRemaining  沒有剩余的數據
                        sentCount++;
                        // todo 將發送過的packet從outgoingqueue移除
                        outgoingQueue.removeFirstOccurrence(p);
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                // todo 如果剛才的請求頭的類型不是null , 不是ping ,不是權限驗證 就把packet添加到 pendingQueue
                                /**
                                 * These are the packets that have been sent and are waiting for a response.
                                 * todo 這個penddingQueue 存放的是已經發送的 和 等待服務器響應的packet
                                 */
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
                if (outgoingQueue.isEmpty()) {
                    disableWrite();
                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                  e.html
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }
    }

思考:

雖然找到了客戶端往服務端發送數據的代碼, 但是問題來了, 它發送的什么數據啊? 在上面可以看到,它每次發送的數據都被包裝車成了packet類型,並且,繼續往下跟進可以看到這個packet來自於一個叫outgoingqueue的隊列中

client想往服務端發送什么?其實發送就是我們手動輸入的命令,只不過他把我們的命令解析出來並且進行了封裝,進行了哪些封裝? String-> request -> packet -> socket ,這個packet就在上面的部分被消費

到目前為止,算上一開始的主線程,其實已經有3條線程了, 分別是主線程,SendThread和eventThread

代碼讀到這里,sendThread部分其實已經結束了,我們直到了它正在消費outgoingqueue中的內容,接下來的任務返回回去,從新回到 ZooKeeperMain中,看一開始主線程時如何處理用戶在命令行的輸入的

    // todo zookeeper的入口方法
    public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
        // todo new ZK客戶端
        ZooKeeperMain main = new ZooKeeperMain(args);

        // todo run方法的實現在下面
        main.run();
    }

跟進 main.run(), 主要做了如下幾件事

  • 通過反射創建出可以獲取控制台輸入的對象jline.ConsoleReader
  • 通過反射創建出可以解析鍵盤錄入的對象
  • 開啟while循環,等待用戶的輸入,處理用戶的輸入executeLine(line);
 @SuppressWarnings("unchecked")
    void run() throws KeeperException, IOException, InterruptedException {
        if (cl.getCommand() == null) {
            System.out.println("Welcome to ZooKeeper!");

            boolean jlinemissing = false;
            // only use jline if it's in the classpath
            try {
                // todo jline.ConsoleReader是java命令行的實現類, 獲取可從控制台接受輸入的對象
                Class<?> consoleC = Class.forName("jline.ConsoleReader");
                Class<?> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompletor");

                System.out.println("JLine support is enabled");

                // todo 使用反射獲取實例
                Object console = consoleC.getConstructor().newInstance();
                Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk);

                // todo 通過反射獲取某指定類的指定方法  Completor
                Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor"));
                addCompletor.invoke(console, completor);

                String line;
                Method readLine = consoleC.getMethod("readLine", String.class);

                // todo 我們在命令行中輸入的那些命令最終都會來到這里執行
                // todo getPrompt() 方法 就是在控制台上打印出了命令行的前綴---  [zk: " + host + "("+zk.getState()+")" + " " + commandCount + "] "
                while ((line = (String) readLine.invoke(console, getPrompt())) != null) {
                    // todo 執行命令行的輸入
                    executeLine(line);
                }
            } catch (ClassNotFoundException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (NoSuchMethodException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (InvocationTargetException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (IllegalAccessException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            } catch (InstantiationException e) {
                LOG.debug("Unable to start jline", e);
                jlinemissing = true;
            }

            if (jlinemissing) {
                System.out.println("JLine support is disabled");
                BufferedReader br =
                        new BufferedReader(new InputStreamReader(System.in));

                String line;
                while ((line = br.readLine()) != null) {
                    executeLine(line);
                }
            }
        } else {
            // Command line args non-null.  Run what was passed.
            processCmd(cl);
        }
    }

繼續跟進 executeLine(line);,做了如下幾件事

  • 處理用戶輸入
  • 將命令添加到歷史命令
  • 處理命令
  • 命令數+1
     public void executeLine(String line)
                throws InterruptedException, IOException, KeeperException {
            if (!line.equals("")) {
                cl.parseCommand(line);
                // todo 添加到歷史命令
                addToHistory(commandCount, line);
                // todo 具體處理命令
                processCmd(cl);
                // todo 命令次數+1
                commandCount++;
        }
    }

處理命令的邏輯如下:

將命令解析出來,通過if分支語句,判斷用戶輸入的什么命令, 然后再進一步處理

  // todo zookeeper客戶端, 處理用戶輸入命令的具體邏輯
    // todo  用大白話講,下面其實就是把 從控制台獲取的用戶的輸入信息轉換成指定的字符, 然后發送到服務端
    // todo MyCommandOptions 是處理命令行選項和shell腳本的工具類
    protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
        // todo 在這個方法中可以看到很多的命令行所支持的命令
    Stat stat = new Stat();
    // todo 獲取命令行輸入中 0 1 2 3 ... 位置的內容, 比如 0 位置是命令  1 2 3 位置可能就是不同的參數
    String[] args = co.getArgArray();
    String cmd = co.getCommand();
    if (args.length < 1) {
        usage();
        return false;
    }

    if (!commandMap.containsKey(cmd)) {
        usage();
        return false;
    }

    boolean watch = args.length > 2;
    String path = null;
    List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
    LOG.debug("Processing " + cmd);
    ...
   // todo 看看這個create命令的實現, 如果是-e 就是很 CreateMode= ephemeral sequential 時序的
    if (cmd.equals("create") && args.length >= 3) {
        int first = 0;
        CreateMode flags = CreateMode.PERSISTENT;
        if ((args[1].equals("-e") && args[2].equals("-s"))
                || (args[1]).equals("-s") && (args[2].equals("-e"))) {
            first += 2;
            flags = CreateMode.EPHEMERAL_SEQUENTIAL;
        } else if (args[1].equals("-e")) {
            first++;
            flags = CreateMode.EPHEMERAL;
        } else if (args[1].equals("-s")) {
            first++;
            flags = CreateMode.PERSISTENT_SEQUENTIAL;
        }
    ...

比如,用戶輸入的是創建新節點的命令create /path, 就會有下面的函數處理

// todo  調用Zookeeper的 create方法,
    String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags);

跟進這個方法 , 主要做了下面幾件事

  • 校驗合法性
  • 封裝進 request
  • 添加acl
  • 提交submitRequest(),他是個重要的阻塞方法,每次執行都會阻塞等待服務端的響應
  • 等待響應結果
 public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        // todo 驗證,path string 的合法性, 根據去查看
        PathUtils.validatePath(clientPath, createMode.isSequential());

        final String serverPath = prependChroot(clientPath);

        // todo 創建請求頭, 不同的操作有不同的頭標記
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();

        // todo 將命令行里面的內容嵌入到request
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);

        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        // todo 添加權限
        request.setAcl(acl);

        // todo 通過上下文, 將包裝后的用戶的request 提交到socket 傳遞到server , 跟進去看看
        ReplyHeader r =submitRequest

        // todo 判斷是否出錯了
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }

客戶端的阻塞式等待 -- 自旋鎖

跟進submitRequest()

  // todo 這是ClientCnxn的類, 提交請求, 最終將我們的請求傳遞到socket
    // todo 返回一個header, 因為根據它判斷是否是否出錯了
    public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        // todo 來到這個 queuePacket() 方法在下面, 這個方法就是將  用戶輸入-> string ->>> request ->>> packet 的過程
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);

        // todo 使用同步代碼塊,在下面的進行    同步阻塞等待, 直到有了Response響應才會跳出這個循環, 這個finished狀態就是在客戶端接受到服務端的
        // todo 的響應后, 將服務端的響應解析出來,然后放置到 pendingqueue里時,設置上去的
        synchronized (packet) {
            while (!packet.finished) {
                // todo 這個等待是需要喚醒的
                packet.wait();
            }
        }
        // todo 直到上面的代碼塊被喚醒,才會這個方法才會返回
        return r;
    }

在上面的代碼中,可以看到可以他是使用一個while(!packet,finishes){} 來阻塞程序的, 剛看看到用戶的命令被封裝進了request, 接下來, 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);中,可以看到他被封裝進packet,然后添加到outgoingqueue隊列中,源碼如下


 // todo
    Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet.
        // todo 它會為我們的沒有 Xid  的packet生成 Xid
        // It is generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // todo 她會在ClientCnxnSocket::doIO()之后生成
        // where the packet is actually sent.
        // todo packet實際生成的位置
        synchronized (outgoingQueue) {
            // todo 將用戶傳遞過來的信息包裝成 Packet
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                // todo 如果客戶端正在發送關閉session的請求, 就標記成 closing = true
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                // todo 將packet 添加到隊列里面
                // todo 這個什么時候會被消費呢? 是在sendthread的無限循環中被消費的, 因為那是第二條線程
                outgoingQueue.add(packet);
            }
        }
        // todo getClientCnxnSocket() 獲取ClientCnxnSocket對象
        // todo wakeupCnxn() 是 ClientCnxnSocket對象 中的抽象方法, 實現類是 ClientCnxnSocket的實現類ClientCnxnSocketNio
        
        // 喚醒阻塞在selector.select上的線程,讓該線程及時去處理其他事情,比如這里的讓sendThread 干凈去消費packet 
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }

在這個方法的最后一行,點睛,selector.wakeup(); 就是通知選擇器,別再阻塞select了,趕緊去做其他工作

因為選擇器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代碼貼出來如下

服務端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封裝類的,SendThread同樣也是,它可以使用

現在再看,喚醒selector 讓他去做其他事 ,其實即使doIO(),這個方法代碼其實我在上面貼出來過,就是分成兩大部分,讀就緒與寫就緒

 // todo 往服務端發送 packet
    //todo 下面就是NIO 網絡編程的邏輯了
    @Override
    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        // todo 選擇器在waitTimeOut時間內阻塞輪詢== 上一次計算的 to時間
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        // todo 獲取channel注冊進selector時返回的key
        synchronized (this) {
            selected = selector.selectedKeys();
        }
            // Everything below and until we get back to the select is non blocking,
            //  so time is effectively a constant. That is  Why we just have to do this once, here
        // todo 直到我們重新回到select之前, 下面的全部操作都是非阻塞的
        // todo 因此時間只是一個常數, 那就是為什么我們在這里用下面的函數
        updateNow();
        //
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // todo 建立連接的邏輯
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                // todo 往服務端發送數據的邏輯 , 方法在上面的64行
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }

寫到這里其實已經把整個過程順下來了,下面再重新看看,sendThread是如果消費packet並且修改然后得到服務端的響應,修改pakcet.finished屬性的, 因為現在主線的submitRequest還在阻塞呢

往服務端寫

客戶端的socket的實現類是ClientCnxnSocketNio, 它往服務端寫的邏輯如下, 不難看出使用的java原生的 sock.write(p.bb); // 發送服務端 , 亮點是后面的操作pendingQueue.add(p);被寫過的packet被添加到了pengingqueue中

if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
    // todo 查詢出可發送的packet
    Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());

    if (p != null) {
        updateLastSend();
        // If we already started writing p, p.bb will already exist
        if (p.bb == null) {
            if ((p.requestHeader != null) &&
                    (p.requestHeader.getType() != OpCode.ping) &&
                    (p.requestHeader.getType() != OpCode.auth)) {
                p.requestHeader.setXid(cnxn.getXid());
            }
            p.createBB();
        }
        // todo 往服務端發送數據 packet.ByteBuf
        sock.write(p.bb); // 發送服務端
        if (!p.bb.hasRemaining()) { //todo !hasRemaining  沒有剩余的數據
            sentCount++;
            // todo 將發送過的packet從outgoingqueue移除
            outgoingQueue.removeFirstOccurrence(p);
            if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                synchronized (pendingQueue) {
                    // todo 如果剛才的請求頭的類型不是null , 不是ping ,不是權限驗證 就把packet添加到 pendingQueue
                    /**
                     * These are the packets that have been sent and are waiting for a response.
                     * todo 這個penddingQueue 存放的是已經發送的 和 等待服務器響應的packet
                     */
                    pendingQueue.add(p);
                }
            }
        }

上面說了, 為啥被使用過的pakcet還要保留一份呢? 還是那個原因,主線程還因為pakcet的finish狀態未被該變而阻塞呢, 那什么時候改變呢? 答案是受到服務端的響應之后改變,在哪里收到呢? 就是DoIo()的讀就緒模塊,下面附上源碼,它的解析我寫在這段代碼下面

從服務端讀


if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
    throw new EndOfStreamException(
            "Unable to read additional data from server sessionid 0x"
                    + Long.toHexString(sessionId)
                    + ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
    // todo 返回buffer
    incomingBuffer.flip();
    if (incomingBuffer == lenBuffer) {
        recvCount++;
        readLength();
    } else if (!initialized) { //todo 連接有沒有初始化, 來這之前被改成了 flase ,現在
        // todo 讀取服務端發給我的連接請求的結果
        readConnectResult(); // primeConnection()
        enableRead();
        if (findSendablePacket(outgoingQueue,
                cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
            // Since SASL authentication has completed (if client is configured to do so),
            // outgoing packets waiting in the outgoingQueue can now be sent.
            enableWrite();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
        updateLastHeard();
        initialized = true;
    } else {
        //todo 如果已經初始化了, 就來這里讀取響應, 跟進去
        sendThread.readResponse(incomingBuffer);
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
        updateLastHeard();
    }

如上代碼的最后部分, sendThread.readResponse(incomingBuffer); 下面是它的源碼,它首先是從buffer中讀取出服務端的發送的數據,然后一通解析,封裝進pendingqueue的packet中,並且在方法的最后部分終於完成了狀態的修改

 // todo 同樣是 sendThread的方法, 讀取響應
        // todo 是經過flip 反轉后的  可讀的buffer
        void readResponse(ByteBuffer incomingBuffer) throws IOException {

            // todo ---------------------   從服務端寫回來的buffer中解析封裝成  ReplyHeader   ----------------------------
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            // todo ---------------------------------------------------------------------

            // todo 下面根據 ReplyHeader 的 xid 判斷響應的結果類型
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket               
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;                    
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) );            		            		
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                    	LOG.warn("Got server path " + event.getPath()
                    			+ " which is too short for chroot path "
                    			+ chrootPath);
                    }
                }

                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }

                eventThread.queueEvent( we );
                return;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (clientTunneledAuthenticationInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia,"token");
                zooKeeperSaslClient.respondToServer(request.getToken(),
                  ClientCnxn.this);
                return;
            }

            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                // todo 從pendingQueue 中取出第一個packet
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response to the first request!
             * // todo 因為請求存在隊列中,是有順序的, 因此我們最好對第一個做出相應
             */
            try {
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            + replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet );
                }
                // todo 把todo 從服務端解析出來的結果賦值給 pendingQueue 中的packet
                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                // todo 跟進這個方法
                finishPacket(packet);
            }
        }

解開客戶端的阻塞狀態

進入finishPacket(packet)

    // todo ClientCnxn 也就是本類中, 在根據用戶的輸入向服務端提交命令后的那個 wait喚醒了, finished=true,使得原來的while循環退出了
            private void finishPacket(Packet p) {
                if (p.watchRegistration != null) {
                    p.watchRegistration.register(p.replyHeader.getErr());
                }

                // todo 喚醒 Zookeeper中 submitRequest() 提交請求后的阻塞操作, 現在拿到請求后進行喚醒
                if (p.cb == null) {
                    synchronized (p) {
                        //todo   將這個finish 改成true, 在
                        p.finished = true;
                        p.notifyAll();
                    }
                } else {
                    p.finished = true;
            eventThread.queuePacket(p);
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM