Zookeeper學習之ZooKeeper源碼分析


一、宏觀分析ZooKeeper源碼結構

  ZooKeeper宏觀分析源碼,如下圖所示:

        

  要想分析源碼,首先需要宏觀分析整個ZooKeeper結構,要知道ZooKeeper分為兩部分:服務端集群、客戶端。

  其中服務端:

  • 每台ZooKeeper服務器都有三個狀態:初始化、運行中、結束關機。因此當服務器都處於運行時,構成一個zookeeper集群,那么就能夠對外提供服務(單機也可以運行);
  • 服務端啟動服務后,進行初始化構成可用集群;

  對於客戶端:

  • 客戶端封裝出API操作層,這樣任何訪問都基於同一API;
  • 客戶端的API要遵循一定的協議,進行消息協議封裝;
  • 網絡通訊要實現序列化、反序列化以及連接建立;

  當然,客戶端提供的這部分協議封裝、序列化/反序列化、建立連接能力,服務端也同時需要具備。我們可以通過寫偽服務端攔截請求進行查看,代碼如下:

public class SoecktLister {
    
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(2181);
        Socket accept = serverSocket.accept();
        byte[] result = new byte[2048];
        accept.getInputStream().read(result);

        ByteBuffer bb = ByteBuffer.wrap(result);
        ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis);
        RequestHeader header2 = new RequestHeader();
        header2.deserialize(bia, "header");
        System.out.println(header2);
        bbis.close();
    }
}

  然后通過客戶端進行訪問:

public class ZooKeeperTest {

   private ZooKeeper zooKeeper;

   public ZooKeeperTest() {
      try {
         zooKeeper= new ZooKeeper("localhost:2181",
                 5000,
                null, false);
      } catch (IOException e) {
         e.printStackTrace();
      }
   }

   public void add(String path,String data){
      try {
         String newPath = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      } catch (KeeperException e) {
         e.printStackTrace();
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   public static void main(String[] args) {
      ZooKeeperTest zooKeeperTest=new ZooKeeperTest();
      zooKeeperTest.add("/monkey2","2019");
   }

}

  於是服務端可以收到請求:

RequestHeader{protocolVersion=45, lastZxidSeen=0, timeOut=0, sessionId=21474836480000, passwd=[]}

  其實這些內容就是一次簡單請求消息的協議包裝。

二、服務端源碼分析

  1、服務端初始化

  根據ZooKeeper啟動腳本./zkServer.sh start -server ip:port,打開腳本可以看到服務端啟動入口:org.apache.zookeeper.server.quorum.QuorumPeerMain。

  注意:服務端的數據存放結構是:org.apache.zookeeper.server.DataTree,dataTree是放在ZKDataBasse中的。

  服務端啟動后,會依次進行配置文件zoo.cfg加載、數據加載、通訊建立、leader選舉,代碼如下:

@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    loadDataBase();     //加載數據 znode數據加載:讀取硬盤快照文件(data目錄下)
    startServerCnxnFactory();   //網絡通訊建立
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    startLeaderElection();      //選舉
    startJvmPauseMonitor();         super.start();    //此刻調用線程run方法
}

  注:在調用此之前已經進行了配置加載,如代碼:

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    try {
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    LOG.info("Starting quorum peer");
    MetricsProvider metricsProvider;
    try {
        metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
            config.getMetricsProviderClassName(),
            config.getMetricsProviderConfiguration());
    } catch (MetricsProviderLifeCycleException error) {
        throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
    }
    try {
        ServerMetrics.metricsProviderInitialized(metricsProvider);
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        if (config.getClientPortAddress() != null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
        }

        quorumPeer = getQuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
        //quorumPeer.setQuorumPeers(config.getAllMembers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
        quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier() != null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
        quorumPeer.setSslQuorum(config.isSslQuorum());
        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        if (config.sslQuorumReloadCertFiles) {
            quorumPeer.getX509Util().enableCertFileReloading();
        }

        // sets quorum sasl authentication configurations
        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
        if (quorumPeer.isQuorumSaslAuthEnabled()) {
            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
        }
        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
        quorumPeer.initialize();

        if (config.jvmPauseMonitorToRun) {
            quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
        }

        quorumPeer.start();    //此刻是調用quorumPeer的start方法,並不是啟動quorumPeer線程,真正線程的啟動在start方法中的super.start()
        quorumPeer.join();      //等待服務端初始化完成
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    } finally {
        if (metricsProvider != null) {
            try {
                metricsProvider.stop();
            } catch (Throwable error) {
                LOG.warn("Error while stopping metrics", error);
            }
        }
    }
}

  服務端啟動詳細流程如下圖所示:

  

  2、服務端請求響應

  然后就是服務端對外提供服務響應請求,如下圖所示(響應寫操作):

    

   以上流程就遵從了ZooKeeper的Zab一致性協議,Zab協議 的全稱是 Zookeeper Atomic Broadcast (Zookeeper原子廣播),Zookeeper 是通過 Zab 協議來保證分布式事務的最終一致性。

  Zab協議詳情以及選舉規則請參考:Zookeeper學習之Zab一致性協議

三、客戶端源碼分析

  1、客戶端初始化

  客戶端啟動流程如下:

        

  一開始客戶端會進行集群的解析和網絡初始化(ClientCncx對象),同時ClientCncx對象會創建SendThread和EventThread兩個線程,用於對request/response以及watcher event進行管理。其代碼如下:

public ClientCnxn(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZooKeeper zooKeeper,
    ClientWatchManager watcher,
    ClientCnxnSocket clientCnxnSocket,
    long sessionId,
    byte[] sessionPasswd,
    boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;

  sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); this.clientConfig = zooKeeper.getClientConfig();
    initRequestTimeout();
}

public void start() {
  sendThread.start(); eventThread.start();
}

  2、客戶端請求管理

  客戶端訪問服務端流程如下:

           

   由上圖可知ClientCncx啟動了兩個線程:SendThread、EventThread,這兩個線程一個處理對服務端的請求響應,一個處理監聽事件。

  這兩個線程都是基於隊列進行請求管理,outGoingQueue用於處理發送request請求的隊列,PendingQueue用於存儲已經發送等待服務響應的請求,這樣當收到請求后就可以進行response處理,waitingEventsQueue用處臨時存放需要被觸發的對象,因此通過隊列的應用就實現了ZooKeeper的高性能。

  因此客戶端主要使用了這些技術:底層請求管理就是隊列>線程進行隊列處理>NIO默認通訊方式>synchronized鎖(用在隊列上)。

  客戶端和服務端同時使用到了Jute序列化組件以及自有的通訊協議,詳情請查看:Zookeeper學習之Jute序列化以及通信協議詳解

四、ZooKeeper運維

  日常在Linux下使用:echo zk命令|nc ip port 命令進行日常ZooKeeper運維,如:echo mntr |nc 192.168.0.31 2181。

  Linux中nc命令是一個功能強大的網絡工具,全稱是netcat,在線安裝:yum install -y nc。常見zk命令如下:

         

  當然也可以自己代碼實現,進行界面運維。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM