【Zookeeper】源碼分析之網絡通信(二)之NIOServerCnxn


一、前言

  前面介紹了ServerCnxn,下面開始學習NIOServerCnxn。

二、NIOServerCnxn源碼分析

  2.1 類的繼承關系

public class NIOServerCnxn extends ServerCnxn {}

  說明:NIOServerCnxn繼承了ServerCnxn抽象類,使用NIO來處理與客戶端之間的通信,使用單線程處理。

  2.2 類的內部類

  1. SendBufferWriter類 

    private class SendBufferWriter extends Writer {
        private StringBuffer sb = new StringBuffer();
        
        /**
         * Check if we are ready to send another chunk.
         * @param force force sending, even if not a full chunk
         */
        // 是否准備好發送另一塊
        private void checkFlush(boolean force) {
            if ((force && sb.length() > 0) || sb.length() > 2048) { // 當強制發送並且sb大小大於0,或者sb大小大於2048即發送緩存
                sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes()));
                // clear our internal buffer
                sb.setLength(0);
            }
        }

        @Override
        public void close() throws IOException {
            if (sb == null) return;
            // 關閉之前需要強制性發送緩存
            checkFlush(true);
            sb = null; // clear out the ref to ensure no reuse
        }

        @Override
        public void flush() throws IOException {
            checkFlush(true);
        }

        @Override
        public void write(char[] cbuf, int off, int len) throws IOException {
            sb.append(cbuf, off, len);
            checkFlush(false);
        }
    }
SendBufferWriter

  說明:該類用來將給客戶端的響應進行分塊,其核心方法是checkFlush方法,其源碼如下

        private void checkFlush(boolean force) {
            if ((force && sb.length() > 0) || sb.length() > 2048) { // 當強制發送並且sb大小大於0,或者sb大小大於2048即發送緩存
                sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes()));
                // clear our internal buffer
                sb.setLength(0);
            }
        }

  說明:當需要強制發送時,sb緩沖中只要有內容就會同步發送,或者是當sb的大小超過2048(塊)時就需要發送,其會調用NIOServerCnxn的sendBufferSync方法,該之后會進行分析,然后再清空sb緩沖。

  2. CommandThread類  

    private abstract class CommandThread extends Thread {
        PrintWriter pw;
        
        CommandThread(PrintWriter pw) {
            this.pw = pw;
        }
        
        public void run() {
            try {
                commandRun();
            } catch (IOException ie) {
                LOG.error("Error in running command ", ie);
            } finally {
                cleanupWriterSocket(pw);
            }
        }
        
        public abstract void commandRun() throws IOException;
    }
CommandThread

  說明:該類用於處理ServerCnxn中的定義的命令,其主要邏輯定義在commandRun方法中,在子類中各自實現,這是一種典型的工廠方法,每個子類對應着一個命令,每個命令使用單獨的線程進行處理。

  2.3 類的屬性  

public class NIOServerCnxn extends ServerCnxn {
    // 日志
    static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxn.class);

    // ServerCnxn工廠
    NIOServerCnxnFactory factory;

    // 針對面向流的連接套接字的可選擇通道
    final SocketChannel sock;

    // 表示 SelectableChannel 在 Selector 中注冊的標記
    private final SelectionKey sk;

    // 初始化標志
    boolean initialized;

    // 分配四個字節緩沖區
    ByteBuffer lenBuffer = ByteBuffer.allocate(4);

    // 賦值incomingBuffer
    ByteBuffer incomingBuffer = lenBuffer;

    // 緩沖隊列
    LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();

    // 會話超時時間
    int sessionTimeout;

    // ZooKeeper服務器
    private final ZooKeeperServer zkServer;

    /**
     * The number of requests that have been submitted but not yet responded to.
     */
    // 已經被提交但還未響應的請求數量
    int outstandingRequests;

    /**
     * This is the id that uniquely identifies the session of a client. Once
     * this session is no longer active, the ephemeral nodes will go away.
     */
    // 會話ID
    long sessionId;

    // 下個會話ID
    static long nextSessionId = 1;
    int outstandingLimit = 1;
    
    private static final String ZK_NOT_SERVING =
        "This ZooKeeper instance is not currently serving requests";
    
    private final static byte fourBytes[] = new byte[4];
}    
類的屬性

  說明:NIOServerCnxn維護了服務器與客戶端之間的Socket通道、用於存儲傳輸內容的緩沖區、會話ID、ZooKeeper服務器等。

  2.4 類的構造函數  

    public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
            SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
        this.zkServer = zk;
        this.sock = sock;
        this.sk = sk;
        this.factory = factory;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
        }
        if (zk != null) { 
            outstandingLimit = zk.getGlobalOutstandingLimit();
        }
        sock.socket().setTcpNoDelay(true);
        /* set socket linger to false, so that socket close does not
         * block */
        // 設置linger為false,以便在socket關閉時不會阻塞
        sock.socket().setSoLinger(false, -1);
        // 獲取IP地址
        InetAddress addr = ((InetSocketAddress) sock.socket()
                .getRemoteSocketAddress()).getAddress();
        // 認證信息中添加IP地址
        authInfo.add(new Id("ip", addr.getHostAddress()));
        // 設置感興趣的操作類型
        sk.interestOps(SelectionKey.OP_READ);
    }
構造函數

  說明:在構造函數中會對Socket通道進行相應設置,如設置TCP連接無延遲、獲取客戶端的IP地址並將此信息進行記錄,方便后續認證,最后設置SelectionKey感興趣的操作類型為READ。

  2.5 核心函數分析

  1. sendBuffer函數  

    public void sendBuffer(ByteBuffer bb) {
        try {
            if (bb != ServerCnxnFactory.closeConn) { // 不關閉連接
                // We check if write interest here because if it is NOT set,
                // nothing is queued, so we can try to send the buffer right
                // away without waking up the selector
                // 首先檢查interestOps中是否存在WRITE操作,如果沒有
                // 則表示直接發送緩沖而不必先喚醒selector
                if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { // 不為write操作
                    try {
                        // 將緩沖寫入socket
                        sock.write(bb);
                    } catch (IOException e) {
                        // we are just doing best effort right now
                    }
                }
                // if there is nothing left to send, we are done
                if (bb.remaining() == 0) { // bb中的內容已經被全部讀取
                    // 統計發送包信息(調用ServerCnxn方法)
                    packetSent();
                    return;
                }
            }

            synchronized(this.factory){ // 同步塊
                // Causes the first selection operation that has not yet returned to return immediately
                // 讓第一個還沒返回(阻塞)的selection操作馬上返回結果
                sk.selector().wakeup();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                            + " is valid: " + sk.isValid());
                }
                // 將緩存添加至隊列
                outgoingBuffers.add(bb);
                if (sk.isValid()) { // key是否合法
                    // 將寫操作添加至感興趣的集合
                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
                }
            }
            
        } catch(Exception e) {
            LOG.error("Unexpected Exception: ", e);
        }
    }
sendBuffer

  說明:該函數將緩沖寫入socket中,其大致處理可以分為兩部分,首先會判斷ByteBuffer是否為關閉連接的信號,並且當感興趣的集合中沒有寫操作時,其會立刻將緩存寫入socket,步驟如下

            if (bb != ServerCnxnFactory.closeConn) { // 不關閉連接
                // We check if write interest here because if it is NOT set,
                // nothing is queued, so we can try to send the buffer right
                // away without waking up the selector
                // 首先檢查interestOps中是否存在WRITE操作,如果沒有
                // 則表示直接發送緩沖而不必先喚醒selector
                if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { // 不為write操作
                    try {
                        // 將緩沖寫入socket
                        sock.write(bb);
                    } catch (IOException e) {
                        // we are just doing best effort right now
                    }
                }
                // if there is nothing left to send, we are done
                if (bb.remaining() == 0) { // bb中的內容已經被全部讀取
                    // 統計發送包信息(調用ServerCnxn方法)
                    packetSent();
                    return;
                }
            }

  當緩沖區被正常的寫入到socket后,會直接返回,然而,當原本就對寫操作感興趣時,其會走如下流程

            synchronized(this.factory){ // 同步塊
                // Causes the first selection operation that has not yet returned to return immediately
                // 讓第一個還沒返回(阻塞)的selection操作馬上返回結果
                sk.selector().wakeup();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                            + " is valid: " + sk.isValid());
                }
                // 將緩存添加至隊列
                outgoingBuffers.add(bb);
                if (sk.isValid()) { // key是否合法
                    // 將寫操作添加至感興趣的集合
                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
                }
            }

  首先會喚醒上個被阻塞的selection操作,然后將緩沖添加至outgoingBuffers隊列中,后續再進行發送。

  2. doIO函數 

    void doIO(SelectionKey k) throws InterruptedException {
        try {
            if (isSocketOpen() == false) { // socket未開啟
                LOG.warn("trying to do i/o on a null socket for session:0x"
                         + Long.toHexString(sessionId));

                return;
            }
            if (k.isReadable()) { // key可讀
                // 將內容從socket寫入incoming緩沖
                int rc = sock.read(incomingBuffer);
                if (rc < 0) { // 流結束異常,無法從客戶端讀取數據
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
                if (incomingBuffer.remaining() == 0) { // 緩沖區已經寫滿
                    boolean isPayload;
                    // 讀取下個請求
                    if (incomingBuffer == lenBuffer) { // start of next request
                        // 翻轉緩沖區,可讀
                        incomingBuffer.flip();
                        // 讀取lenBuffer的前四個字節,當讀取的是內容長度時則為true,否則為false
                        isPayload = readLength(k);
                        // 清除緩沖
                        incomingBuffer.clear();
                    } else { // 不等,因為在readLength中根據Len已經重新分配了incomingBuffer
                        // continuation
                        isPayload = true;
                    }
                    if (isPayload) { // 不為四個字母,為實際內容    // not the case for 4letterword
                        // 讀取內容
                        readPayload();
                    }
                    else { // 四個字母,為四字母的命令
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }
            if (k.isWritable()) { // key可寫
                // ZooLog.logTraceMessage(LOG,
                // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
                // "outgoingBuffers.size() = " +
                // outgoingBuffers.size());
                if (outgoingBuffers.size() > 0) {
                    // ZooLog.logTraceMessage(LOG,
                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
                    // "sk " + k + " is valid: " +
                    // k.isValid());

                    /*
                     * This is going to reset the buffer position to 0 and the
                     * limit to the size of the buffer, so that we can fill it
                     * with data from the non-direct buffers that we need to
                     * send.
                     */
                    // 分配的直接緩沖
                    ByteBuffer directBuffer = factory.directBuffer;
                    // 清除緩沖
                    directBuffer.clear();

                    for (ByteBuffer b : outgoingBuffers) { // 遍歷
                        if (directBuffer.remaining() < b.remaining()) { // directBuffer的剩余空閑長度小於b的剩余空閑長度
                            /*
                             * When we call put later, if the directBuffer is to
                             * small to hold everything, nothing will be copied,
                             * so we've got to slice the buffer if it's too big.
                             */
                            // 縮小緩沖至directBuffer的大小
                            b = (ByteBuffer) b.slice().limit(
                                    directBuffer.remaining());
                        }
                        /*
                         * put() is going to modify the positions of both
                         * buffers, put we don't want to change the position of
                         * the source buffers (we'll do that after the send, if
                         * needed), so we save and reset the position after the
                         * copy
                         */
                        // 記錄b的當前position
                        int p = b.position();
                        // 將b寫入directBuffer
                        directBuffer.put(b);
                        // 設置回b的原來的position
                        b.position(p);
                        if (directBuffer.remaining() == 0) { // 已經寫滿
                            break;
                        }
                    }
                    /*
                     * Do the flip: limit becomes position, position gets set to
                     * 0. This sets us up for the write.
                     */
                    // 翻轉緩沖區,可讀
                    directBuffer.flip();

                    // 將directBuffer的內容寫入socket
                    int sent = sock.write(directBuffer);
                    ByteBuffer bb;

                    // Remove the buffers that we have sent
                    while (outgoingBuffers.size() > 0) { // outgoingBuffers中還存在Buffer
                        // 取隊首元素,但並不移出
                        bb = outgoingBuffers.peek();
                        if (bb == ServerCnxnFactory.closeConn) { // 關閉連接,拋出異常
                            throw new CloseRequestException("close requested");
                        }
                        
                        // bb還剩余多少元素沒有被發送
                        int left = bb.remaining() - sent;
                        if (left > 0) { // 存在元素未被發送
                            /*
                             * We only partially sent this buffer, so we update
                             * the position and exit the loop.
                             */
                            // 更新bb的position
                            bb.position(bb.position() + sent);
                            break;
                        }
                        // 發送包,調用ServerCnxn方法
                        packetSent();
                        /* We've sent the whole buffer, so drop the buffer */
                        // 已經發送完buffer的所有內容,移除buffer
                        sent -= bb.remaining();
                        outgoingBuffers.remove();
                    }
                    // ZooLog.logTraceMessage(LOG,
                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
                    // outgoingBuffers.size() = " + outgoingBuffers.size());
                }

                synchronized(this.factory){ // 同步塊
                    if (outgoingBuffers.size() == 0) { // outgoingBuffers不存在buffer
                        if (!initialized
                                && (sk.interestOps() & SelectionKey.OP_READ) == 0) { // 未初始化並且無讀請求
                            throw new CloseRequestException("responded to info probe");
                        }
                        // 重置感興趣的集合
                        sk.interestOps(sk.interestOps()
                                & (~SelectionKey.OP_WRITE));
                    } else { // 重置感興趣的集合
                        sk.interestOps(sk.interestOps()
                                | SelectionKey.OP_WRITE);
                    }
                }
            }
        } catch (CancelledKeyException e) {
            LOG.warn("Exception causing close of session 0x"
                    + Long.toHexString(sessionId)
                    + " due to " + e);
            if (LOG.isDebugEnabled()) {
                LOG.debug("CancelledKeyException stack trace", e);
            }
            close();
        } catch (CloseRequestException e) {
            // expecting close to log session closure
            close();
        } catch (EndOfStreamException e) {
            LOG.warn("caught end of stream exception",e); // tell user why

            // expecting close to log session closure
            close();
        } catch (IOException e) {
            LOG.warn("Exception causing close of session 0x"
                    + Long.toHexString(sessionId)
                    + " due to " + e);
            if (LOG.isDebugEnabled()) {
                LOG.debug("IOException stack trace", e);
            }
            close();
        }
    }
doIO

  說明:該函數主要是進行IO處理,當傳入的SelectionKey是可讀時,其處理流程如下

            if (k.isReadable()) { // key可讀
                // 將內容從socket寫入incoming緩沖
                int rc = sock.read(incomingBuffer);
                if (rc < 0) { // 流結束異常,無法從客戶端讀取數據
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
                if (incomingBuffer.remaining() == 0) { // 緩沖區已經寫滿
                    boolean isPayload;
                    // 讀取下個請求
                    if (incomingBuffer == lenBuffer) { // start of next request
                        // 翻轉緩沖區,可讀
                        incomingBuffer.flip();
                        // 讀取lenBuffer的前四個字節,當讀取的是內容長度時則為true,否則為false
                        isPayload = readLength(k);
                        // 清除緩沖
                        incomingBuffer.clear();
                    } else { // 不等,因為在readLength中根據Len已經重新分配了incomingBuffer
                        // continuation
                        isPayload = true;
                    }
                    if (isPayload) { // 不為四個字母,為實際內容    // not the case for 4letterword
                        // 讀取內容
                        readPayload();
                    }
                    else { // 四個字母,為四字母的命令
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }

  說明:首先從socket中將數據讀入incomingBuffer中,再判斷incomingBuffer是否與lenBuffer相等,若相等,則表示讀取的是一個四個字母的命令,否則表示讀取的是具體內容的長度,因為在readLength函數會根據socket中內容的長度重新分配incomingBuffer。其中,readLength函數的源碼如下 

    private boolean readLength(SelectionKey k) throws IOException {
        // Read the length, now get the buffer
        // 讀取position之后的四個字節
        int len = lenBuffer.getInt();
        if (!initialized && checkFourLetterWord(sk, len)) { // 未初始化並且是四個字母組成的命令
            return false;
        }
        if (len < 0 || len > BinaryInputArchive.maxBuffer) {
            throw new IOException("Len error " + len);
        }
        if (zkServer == null) {
            throw new IOException("ZooKeeperServer not running");
        }
        // 重新分配len長度的緩沖
        incomingBuffer = ByteBuffer.allocate(len);
        return true;
    }

  說明:首先會讀取lenBuffer緩沖的position之后的四個字節,然后判斷其是否是四字母的命令或者是長整形(具體內容的長度),之后再根據長度重新分配incomingBuffer大小。

  同時,在調用完readLength后,會知道是否為內容,若為內容,則會調用readPayload函數來讀取內容,其源碼如下  

    private void readPayload() throws IOException, InterruptedException {
        // 表示還未讀取完socket中內容
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            // 將socket的內容讀入緩沖
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) { // 流結束異常,無法從客戶端讀取數據
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                        + Long.toHexString(sessionId)
                        + ", likely client has closed socket");
            }
        }
        
        // 表示已經讀取完了Socket中內容
        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            // 接收到packet
            packetReceived();
            // 翻轉緩沖區
            incomingBuffer.flip();
            if (!initialized) { // 未初始化
                // 讀取連接請求
                readConnectRequest();
            } else {
                // 讀取請求
                readRequest();
            }
            // 清除緩沖
            lenBuffer.clear();
            // 賦值incomingBuffer,即清除incoming緩沖
            incomingBuffer = lenBuffer;
        }
    }

  說明:首先會將socket中的實際內容寫入incomingBuffer中(已經重新分配大小),當讀取完成后,則更新接收的包統計信息,之后再根據是否初始化了還確定讀取連接請求還是直接請求,最后會清除緩存,並重新讓incomingBuffer與lenBuffer相等,表示該讀取過程結束。

  而當doIO中的key為可寫時,其處理流程如下 

            if (k.isWritable()) { // key可寫
                // ZooLog.logTraceMessage(LOG,
                // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
                // "outgoingBuffers.size() = " +
                // outgoingBuffers.size());
                if (outgoingBuffers.size() > 0) {
                    // ZooLog.logTraceMessage(LOG,
                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
                    // "sk " + k + " is valid: " +
                    // k.isValid());

                    /*
                     * This is going to reset the buffer position to 0 and the
                     * limit to the size of the buffer, so that we can fill it
                     * with data from the non-direct buffers that we need to
                     * send.
                     */
                    // 分配的直接緩沖
                    ByteBuffer directBuffer = factory.directBuffer;
                    // 清除緩沖
                    directBuffer.clear();

                    for (ByteBuffer b : outgoingBuffers) { // 遍歷
                        if (directBuffer.remaining() < b.remaining()) { // directBuffer的剩余空閑長度小於b的剩余空閑長度
                            /*
                             * When we call put later, if the directBuffer is to
                             * small to hold everything, nothing will be copied,
                             * so we've got to slice the buffer if it's too big.
                             */
                            // 縮小緩沖至directBuffer的大小
                            b = (ByteBuffer) b.slice().limit(
                                    directBuffer.remaining());
                        }
                        /*
                         * put() is going to modify the positions of both
                         * buffers, put we don't want to change the position of
                         * the source buffers (we'll do that after the send, if
                         * needed), so we save and reset the position after the
                         * copy
                         */
                        // 記錄b的當前position
                        int p = b.position();
                        // 將b寫入directBuffer
                        directBuffer.put(b);
                        // 設置回b的原來的position
                        b.position(p);
                        if (directBuffer.remaining() == 0) { // 已經寫滿
                            break;
                        }
                    }
                    /*
                     * Do the flip: limit becomes position, position gets set to
                     * 0. This sets us up for the write.
                     */
                    // 翻轉緩沖區,可讀
                    directBuffer.flip();

                    // 將directBuffer的內容寫入socket
                    int sent = sock.write(directBuffer);
                    ByteBuffer bb;

                    // Remove the buffers that we have sent
                    while (outgoingBuffers.size() > 0) { // outgoingBuffers中還存在Buffer
                        // 取隊首元素,但並不移出
                        bb = outgoingBuffers.peek();
                        if (bb == ServerCnxnFactory.closeConn) { // 關閉連接,拋出異常
                            throw new CloseRequestException("close requested");
                        }
                        
                        // bb還剩余多少元素沒有被發送
                        int left = bb.remaining() - sent;
                        if (left > 0) { // 存在元素未被發送
                            /*
                             * We only partially sent this buffer, so we update
                             * the position and exit the loop.
                             */
                            // 更新bb的position
                            bb.position(bb.position() + sent);
                            break;
                        }
                        // 發送包,調用ServerCnxn方法
                        packetSent();
                        /* We've sent the whole buffer, so drop the buffer */
                        // 已經發送完buffer的所有內容,移除buffer
                        sent -= bb.remaining();
                        outgoingBuffers.remove();
                    }
                    // ZooLog.logTraceMessage(LOG,
                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
                    // outgoingBuffers.size() = " + outgoingBuffers.size());
                }

  說明:其首先會判斷outgoingBuffers中是否還有Buffer未發送,然后遍歷Buffer,為提供IO效率,借助了directBuffer(64K大小),之后每次以directBuffer的大小(64K)來將緩沖的內容寫入socket中發送,直至全部發送完成。

三、總結

  本篇講解了NIOServerCnxn的處理細節,其主要依托於Java的NIO相關接口來完成IO操作,也謝謝各位園友的觀看~ 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM