一、宏觀分析ZooKeeper源碼結構
ZooKeeper宏觀分析源碼,如下圖所示:
要想分析源碼,首先需要宏觀分析整個ZooKeeper結構,要知道ZooKeeper分為兩部分:服務端集群、客戶端。
其中服務端:
- 每台ZooKeeper服務器都有三個狀態:初始化、運行中、結束關機。因此當服務器都處於運行時,構成一個zookeeper集群,那么就能夠對外提供服務(單機也可以運行);
- 服務端啟動服務后,進行初始化構成可用集群;
對於客戶端:
- 客戶端封裝出API操作層,這樣任何訪問都基於同一API;
- 客戶端的API要遵循一定的協議,進行消息協議封裝;
- 網絡通訊要實現序列化、反序列化以及連接建立;
當然,客戶端提供的這部分協議封裝、序列化/反序列化、建立連接能力,服務端也同時需要具備。我們可以通過寫偽服務端攔截請求進行查看,代碼如下:
public class SoecktLister { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(2181); Socket accept = serverSocket.accept(); byte[] result = new byte[2048]; accept.getInputStream().read(result); ByteBuffer bb = ByteBuffer.wrap(result); ByteBufferInputStream bbis = new ByteBufferInputStream(bb); BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis); RequestHeader header2 = new RequestHeader(); header2.deserialize(bia, "header"); System.out.println(header2); bbis.close(); } }
然后通過客戶端進行訪問:
public class ZooKeeperTest { private ZooKeeper zooKeeper; public ZooKeeperTest() { try { zooKeeper= new ZooKeeper("localhost:2181", 5000, null, false); } catch (IOException e) { e.printStackTrace(); } } public void add(String path,String data){ try { String newPath = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ZooKeeperTest zooKeeperTest=new ZooKeeperTest(); zooKeeperTest.add("/monkey2","2019"); } }
於是服務端可以收到請求:
RequestHeader{protocolVersion=45, lastZxidSeen=0, timeOut=0, sessionId=21474836480000, passwd=[]}
其實這些內容就是一次簡單請求消息的協議包裝。
二、服務端源碼分析
1、服務端初始化
根據ZooKeeper啟動腳本./zkServer.sh start -server ip:port,打開腳本可以看到服務端啟動入口:org.apache.zookeeper.server.quorum.QuorumPeerMain。
注意:服務端的數據存放結構是:org.apache.zookeeper.server.DataTree,dataTree是放在ZKDataBasse中的。
服務端啟動后,會依次進行配置文件zoo.cfg加載、數據加載、通訊建立、leader選舉,代碼如下:
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase(); //加載數據 znode數據加載:讀取硬盤快照文件(data目錄下) startServerCnxnFactory(); //網絡通訊建立 try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } startLeaderElection(); //選舉 startJvmPauseMonitor(); super.start(); //此刻調用線程run方法 }
注:在調用此之前已經進行了配置加載,如代碼:
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); MetricsProvider metricsProvider; try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } try { ServerMetrics.metricsProviderInitialized(metricsProvider); ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); } quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier() != null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum()); quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (quorumPeer.isQuorumSaslAuthEnabled()) { quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); if (config.jvmPauseMonitorToRun) { quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); } quorumPeer.start(); //此刻是調用quorumPeer的start方法,並不是啟動quorumPeer線程,真正線程的啟動在start方法中的super.start() quorumPeer.join(); //等待服務端初始化完成 } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } finally { if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } } }
服務端啟動詳細流程如下圖所示:
2、服務端請求響應
然后就是服務端對外提供服務響應請求,如下圖所示(響應寫操作):
以上流程就遵從了ZooKeeper的Zab一致性協議,Zab協議 的全稱是 Zookeeper Atomic Broadcast (Zookeeper原子廣播),Zookeeper 是通過 Zab 協議來保證分布式事務的最終一致性。
Zab協議詳情以及選舉規則請參考:Zookeeper學習之Zab一致性協議
三、客戶端源碼分析
1、客戶端初始化
客戶端啟動流程如下:
一開始客戶端會進行集群的解析和網絡初始化(ClientCncx對象),同時ClientCncx對象會創建SendThread和EventThread兩個線程,用於對request/response以及watcher event進行管理。其代碼如下:
public ClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); this.clientConfig = zooKeeper.getClientConfig(); initRequestTimeout(); } public void start() { sendThread.start(); eventThread.start(); }
2、客戶端請求管理
客戶端訪問服務端流程如下:
由上圖可知ClientCncx啟動了兩個線程:SendThread、EventThread,這兩個線程一個處理對服務端的請求響應,一個處理監聽事件。
這兩個線程都是基於隊列進行請求管理,outGoingQueue用於處理發送request請求的隊列,PendingQueue用於存儲已經發送等待服務響應的請求,這樣當收到請求后就可以進行response處理,waitingEventsQueue用處臨時存放需要被觸發的對象,因此通過隊列的應用就實現了ZooKeeper的高性能。
因此客戶端主要使用了這些技術:底層請求管理就是隊列>線程進行隊列處理>NIO默認通訊方式>synchronized鎖(用在隊列上)。
客戶端和服務端同時使用到了Jute序列化組件以及自有的通訊協議,詳情請查看:Zookeeper學習之Jute序列化以及通信協議詳解
四、ZooKeeper運維
日常在Linux下使用:echo zk命令|nc ip port 命令進行日常ZooKeeper運維,如:echo mntr |nc 192.168.0.31 2181。
Linux中nc命令是一個功能強大的網絡工具,全稱是netcat,在線安裝:yum install -y nc。常見zk命令如下:
當然也可以自己代碼實現,進行界面運維。