【Zookeeper】源碼分析之服務器(二)之ZooKeeperServer


一、前言

  前面闡述了服務器的總體框架,下面來分析服務器的所有父類ZooKeeperServer。

二、ZooKeeperServer源碼分析

  2.1 類的繼承關系 

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}

  說明:ZooKeeperServer是ZooKeeper中所有服務器的父類,其實現了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定義了expire方法(表示會話過期)和getServerId方法(表示獲取服務器ID),而Provider則主要定義了獲取服務器某些數據的方法。

  2.2 類的內部類

  1. DataTreeBuilder類

    public interface DataTreeBuilder {
        // 構建DataTree
        public DataTree build();
    }

  說明:其定義了構建樹DataTree的接口。

  2. BasicDataTreeBuilder類  

    static public class BasicDataTreeBuilder implements DataTreeBuilder {
        public DataTree build() {
            return new DataTree();
        }
    }

  說明:實現DataTreeBuilder接口,返回新創建的樹DataTree。

  3. MissingSessionException類  

    public static class MissingSessionException extends IOException {
        private static final long serialVersionUID = 7467414635467261007L;

        public MissingSessionException(String msg) {
            super(msg);
        }
    }

  說明:表示會話缺失異常。

  4. ChangeRecord類 

    static class ChangeRecord {
        ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
                List<ACL> acl) {
            // 屬性賦值
            this.zxid = zxid;
            this.path = path;
            this.stat = stat;
            this.childCount = childCount;
            this.acl = acl;
        }
        
        // zxid
        long zxid;

        // 路徑
        String path;

        // 統計數據
        StatPersisted stat; /* Make sure to create a new object when changing */

        // 子節點個數
        int childCount;

        // ACL列表
        List<ACL> acl; /* Make sure to create a new object when changing */

        @SuppressWarnings("unchecked")
        // 拷貝
        ChangeRecord duplicate(long zxid) {
            StatPersisted stat = new StatPersisted();
            if (this.stat != null) {
                DataTree.copyStatPersisted(this.stat, stat);
            }
            return new ChangeRecord(zxid, path, stat, childCount,
                    acl == null ? new ArrayList<ACL>() : new ArrayList(acl));
        }
    }

  說明:ChangeRecord數據結構是用於方便PrepRequestProcessor和FinalRequestProcessor之間進行信息共享,其包含了一個拷貝方法duplicate,用於返回屬性相同的ChangeRecord實例。

  2.3 類的屬性  

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
    // 日志器
    protected static final Logger LOG;
    
    static {
        // 初始化日志器
        LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
        
        Environment.logEnv("Server environment:", LOG);
    }
    // JMX服務
    protected ZooKeeperServerBean jmxServerBean;
    protected DataTreeBean jmxDataTreeBean;

    
    // 默認心跳頻率
    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    /** value of -1 indicates unset, use default */
    // 最小會話過期時間
    protected int minSessionTimeout = -1;
    /** value of -1 indicates unset, use default */
    // 最大會話過期時間
    protected int maxSessionTimeout = -1;
    // 會話跟蹤器
    protected SessionTracker sessionTracker;
    // 事務日志快照
    private FileTxnSnapLog txnLogFactory = null;
    // Zookeeper內存數據庫
    private ZKDatabase zkDb;
    // 
    protected long hzxid = 0;
    // 異常
    public final static Exception ok = new Exception("No prob");
    // 請求處理器
    protected RequestProcessor firstProcessor;
    // 運行標志
    protected volatile boolean running;

    /**
     * This is the secret that we use to generate passwords, for the moment it
     * is more of a sanity check.
     */
    // 生成密碼的密鑰
    static final private long superSecret = 0XB3415C00L;
    
    // 
    int requestsInProcess;
    
    // 未處理的ChangeRecord
    final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
    
    // this data structure must be accessed under the outstandingChanges lock
    // 記錄path對應的ChangeRecord
    final HashMap<String, ChangeRecord> outstandingChangesForPath =
        new HashMap<String, ChangeRecord>();
        
    // 連接工廠
    private ServerCnxnFactory serverCnxnFactory;
    
    // 服務器統計數據
    private final ServerStats serverStats;
}
類的屬性

  說明:類中包含了心跳頻率,會話跟蹤器(處理會話)、事務日志快照、內存數據庫、請求處理器、未處理的ChangeRecord、服務器統計信息等。

  2.4 類的構造函數

  1. ZooKeeperServer()型構造函數  

    public ZooKeeperServer() {
        serverStats = new ServerStats(this);
    }

  說明:其只初始化了服務器的統計信息。

  2. ZooKeeperServer(FileTxnSnapLog, int, int, int, DataTreeBuilder, ZKDatabase)型構造函數  

    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            int minSessionTimeout, int maxSessionTimeout,
            DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
        // 給屬性賦值
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        this.minSessionTimeout = minSessionTimeout;
        this.maxSessionTimeout = maxSessionTimeout;
        
        LOG.info("Created server with tickTime " + tickTime
                + " minSessionTimeout " + getMinSessionTimeout()
                + " maxSessionTimeout " + getMaxSessionTimeout()
                + " datadir " + txnLogFactory.getDataDir()
                + " snapdir " + txnLogFactory.getSnapDir());
    }

  說明:該構造函數會初始化服務器統計數據、事務日志工廠、心跳時間、會話時間(最短超時時間和最長超時時間)。

  3. ZooKeeperServer(FileTxnSnapLog, int, DataTreeBuilder)型構造函數  

    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            DataTreeBuilder treeBuilder) throws IOException {
        this(txnLogFactory, tickTime, -1, -1, treeBuilder,
                new ZKDatabase(txnLogFactory));
    }

  說明:其首先會生成ZooKeeper內存數據庫后,然后調用第二個構造函數進行初始化操作。

  4. ZooKeeperServer(File, File, int)型構造函數 

    public ZooKeeperServer(File snapDir, File logDir, int tickTime)
            throws IOException {
        this( new FileTxnSnapLog(snapDir, logDir),
                tickTime, new BasicDataTreeBuilder());
    }

  說明:其會調用同名構造函數進行初始化操作。

  5. ZooKeeperServer(FileTxnSnapLog, DataTreeBuilder)型構造函數  

    public ZooKeeperServer(FileTxnSnapLog txnLogFactory,
            DataTreeBuilder treeBuilder)
        throws IOException
    {
        this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, treeBuilder,
                new ZKDatabase(txnLogFactory));
    }

  說明:其生成內存數據庫之后再調用同名構造函數進行初始化操作。

  2.5 核心函數分析

  1. loadData函數 

    public void loadData() throws IOException, InterruptedException {
        /*
         * When a new leader starts executing Leader#lead, it 
         * invokes this method. The database, however, has been
         * initialized before running leader election so that
         * the server could pick its zxid for its initial vote.
         * It does it by invoking QuorumPeer#getLastLoggedZxid.
         * Consequently, we don't need to initialize it once more
         * and avoid the penalty of loading it a second time. Not 
         * reloading it is particularly important for applications
         * that host a large database.
         * 
         * The following if block checks whether the database has
         * been initialized or not. Note that this method is
         * invoked by at least one other method: 
         * ZooKeeperServer#startdata.
         *  
         * See ZOOKEEPER-1642 for more detail.
         */
        if(zkDb.isInitialized()){ // 內存數據庫已被初始化
            // 設置為最后處理的Zxid
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else { // 未被初始化,則加載數據庫
            setZxid(zkDb.loadDataBase());
        }
        
        // Clean up dead sessions
        LinkedList<Long> deadSessions = new LinkedList<Long>();
        for (Long session : zkDb.getSessions()) { // 遍歷所有的會話
            if (zkDb.getSessionWithTimeOuts().get(session) == null) { // 刪除過期的會話
                deadSessions.add(session);
            }
        }
        // 完成DataTree的初始化
        zkDb.setDataTreeInit(true);
        for (long session : deadSessions) { // 遍歷過期會話
            // XXX: Is lastProcessedZxid really the best thing to use?
            // 刪除會話
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }
    }

  說明:該函數用於加載數據,其首先會判斷內存庫是否已經加載設置zxid,之后會調用killSession函數刪除過期的會話,killSession會從sessionTracker中刪除session,並且killSession最后會調用DataTree的killSession函數,其源碼如下 

    void killSession(long session, long zxid) {
        // the list is already removed from the ephemerals
        // so we do not have to worry about synchronizing on
        // the list. This is only called from FinalRequestProcessor
        // so there is no need for synchronization. The list is not
        // changed here. Only create and delete change the list which
        // are again called from FinalRequestProcessor in sequence.
        // 移除session,並獲取該session對應的所有臨時節點
        HashSet<String> list = ephemerals.remove(session);
        if (list != null) {
            for (String path : list) { // 遍歷所有臨時節點
                try {
                    // 刪除路徑對應的節點
                    deleteNode(path, zxid);
                    if (LOG.isDebugEnabled()) {
                        LOG
                                .debug("Deleting ephemeral node " + path
                                        + " for session 0x"
                                        + Long.toHexString(session));
                    }
                } catch (NoNodeException e) {
                    LOG.warn("Ignoring NoNodeException for path " + path
                            + " while removing ephemeral for dead session 0x"
                            + Long.toHexString(session));
                }
            }
        }
    }

  說明:DataTree的killSession函數的邏輯首先移除session,然后取得該session下的所有臨時節點,然后逐一刪除臨時節點。

  2. submit函數 

    public void submitRequest(Request si) {
        if (firstProcessor == null) { // 第一個處理器為空
            synchronized (this) {
                try {
                    while (!running) { // 直到running為true,否則繼續等待
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            // 是否為合法的packet
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) { 
                // 處理請求
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

  說明:當firstProcessor為空時,並且running標志為false時,其會一直等待,直到running標志為true,之后調用touch函數判斷session是否存在或者已經超時,之后判斷請求的類型是否合法,合法則使用請求處理器進行處理。

  3. processConnectRequest函數  

    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        // 反序列化
        connReq.deserialize(bia, "connect");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Session establishment request from client "
                    + cnxn.getRemoteSocketAddress()
                    + " client's lastZxid is 0x"
                    + Long.toHexString(connReq.getLastZxidSeen()));
        }
        boolean readOnly = false;
        try {
            // 是否為只讀
            readOnly = bia.readBool("readOnly");
            cnxn.isOldClient = false;
        } catch (IOException e) {
            // this is ok -- just a packet from an old client which
            // doesn't contain readOnly field
            LOG.warn("Connection request from old client "
                    + cnxn.getRemoteSocketAddress()
                    + "; will be dropped if server is in r-o mode");
        }
        if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { // 為只讀模式但是該服務器是只讀服務器,拋出異常
            String msg = "Refusing session request for not-read-only client "
                + cnxn.getRemoteSocketAddress();
            LOG.info(msg);
            throw new CloseRequestException(msg);
        }
        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { // 請求連接的zxid大於DataTree處理的最大的zxid,拋出異常
            String msg = "Refusing session request for client "
                + cnxn.getRemoteSocketAddress()
                + " as it has seen zxid 0x"
                + Long.toHexString(connReq.getLastZxidSeen())
                + " our last zxid is 0x"
                + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                + " client must try another server";

            LOG.info(msg);
            throw new CloseRequestException(msg);
        }
        // 獲取超時時間
        int sessionTimeout = connReq.getTimeOut();
        // 獲取密碼
        byte passwd[] = connReq.getPasswd();
        // 獲取最短超時時間
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) { 
            sessionTimeout = minSessionTimeout;
        }
        // 獲取最長超時時間
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        // 設置超時時間
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
        // 不接收任何packet,直到會話創建成功
        cnxn.disableRecv();
        // 獲取會話id
        long sessionId = connReq.getSessionId();
        if (sessionId != 0) { // 表示重新創建會話
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            // 關閉會話
            serverCnxnFactory.closeSession(sessionId);
            // 設置會話id
            cnxn.setSessionId(sessionId);
            // 重新打開會話
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
            // 創建會話
            createSession(cnxn, passwd, sessionTimeout);
        }
    }
processConnectRequest

  說明:其首先將傳遞的ByteBuffer進行反序列化,轉化為相應的ConnectRequest,之后進行一系列判斷(可能拋出異常),然后獲取並判斷該ConnectRequest中會話id是否為0,若為0,則表示可以創建會話,否則,重新打開會話。

  4. processPacket函數 

    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        // 創建請求頭
        RequestHeader h = new RequestHeader();
        // 將頭反序列化為RequestHeader
        h.deserialize(bia, "header");
        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) { // 需要進行認證(有密碼)
            LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
            AuthPacket authPacket = new AuthPacket();
            // 將ByteBuffer轉化為AuthPacket
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
            // 獲取AuthPacket的模式
            String scheme = authPacket.getScheme();
            AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if(ap != null) {
                try {
                    // 進行認證
                    authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
                } catch(RuntimeException e) {
                    LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
                    authReturn = KeeperException.Code.AUTHFAILED;                   
                }
            }
            if (authReturn!= KeeperException.Code.OK) { // 認證失敗
                if (ap == null) {
                    LOG.warn("No authentication provider for scheme: "
                            + scheme + " has "
                            + ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: " + scheme);
                }
                // send a response...
                // 構造響應頭
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.AUTHFAILED.intValue());
                // 發送響應
                cnxn.sendResponse(rh, null, null);
                // ... and close connection
                // 關閉連接的信息
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                // 不接收任何packet
                cnxn.disableRecv();
            } else { // 認證成功
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Authentication succeeded for scheme: "
                              + scheme);
                }
                LOG.info("auth success " + cnxn.getRemoteSocketAddress());
                // 構造響應頭
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.OK.intValue());
                // 發送響應
                cnxn.sendResponse(rh, null, null);
            }
            return;
        } else {
            if (h.getType() == OpCode.sasl) { // 為SASL類型
                // 處理SASL
                Record rsp = processSasl(incomingBuffer,cnxn);
                // 構造響應頭
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                // 發送響應
                cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
            }
            else { // 不為SASL類型
                // 創建請求
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
                // 設置請求所有者
                si.setOwner(ServerCnxn.me);
                // 提交請求
                submitRequest(si);
            }
        }
        // 
        cnxn.incrOutstandingRequests(h);
    }
processPacket

  說明:該函數首先將傳遞的ByteBuffer進行反序列,轉化為相應的RequestHeader,然后根據該RequestHeader判斷是否需要認證,若認證失敗,則構造認證失敗的響應並發送給客戶端,然后關閉連接,並且再補接收任何packet。若認證成功,則構造認證成功的響應並發送給客戶端。若不需要認證,則再判斷其是否為SASL類型,若是,則進行處理,然后構造響應並發送給客戶端,否則,構造請求並且提交請求。

三、總結

  本篇分析了ZooKeeperServer的源碼,了解了其對於請求和會話的處理,也謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM