Apache ZooKeeper 服務啟動源碼解釋


 

轉載:https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper-code/

本文首先講解了 Apache ZooKeeper 服務啟動的三種方式,即 Standalone、偽分布式、分布式。然后針對分布式模式啟動步驟進行逐步介紹,從 Shell 腳本開始,一直介紹到服務完全啟動過程中所有的執行過程。通過本文讀者基本可以掌握 ZooKeeper 如何啟動、啟動過程中做了哪些工作。

分布式系統從根本上來說就是不同節點上的進程並發執行,並且相互之間對進程的行為進行協調處理的過程。不同節點上的進程互相協調行為的過程叫做分布式同步。許多分布式系統需要一個進程作為任務的協調者,執行一些其他進程並不執行的特殊的操作,一般情況下哪個進程擔當任務的協調者都無所謂,但是必須有一個進程作為協調者,自動選舉出一個協調者的過程就是分布式選舉。ZooKeeper 正是為了解決這一系列問題而生的,今天講的是 ZooKeeper 服務啟動流程深入分析。

ZooKeeper 服務啟動

ZooKeeper 服務的啟動方式分為三種,即單機模式、偽分布式模式、分布式模式。

單機模式

采用單機模式,意味着只有一台機器或者一個節點,因此流程較為簡單。首先,在 conf 目錄下面可以通過自己創建 zoo.cfg 文件的方式完成 ZooKeeper 的配置,如清單 1 所示,ZooKeeper 服務會讀取該配置文件。

清單 1.ZooKeeper 配置文件
[root@localhost zookeeper-3.4.7]# cd conf
[root@localhost conf]# ls -rlt
total 12
-rw-rw-r--. 1 1000 1000 922 Nov 10 22:32 zoo_sample.cfg
-rw-rw-r--. 1 1000 1000 2161 Nov 10 22:32 log4j.properties
-rw-rw-r--. 1 1000 1000 535 Nov 10 22:32 configuration.xsl
[root@localhost conf]# cat zoo_sample.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

上面是自帶的示例配置,與我們相關的三個配置項是 tickTime、dataDir 和 clientPort。

tickTime:這個參數主要是用來針對 ZooKeeper 服務端和客戶端的會話控制,包括心跳控制,一般來說,會話超時時間是該值的兩倍,它的單位是毫秒,我們設置為 2000 毫秒。

dataDir:這個目錄用來存放數據庫的鏡像和操作數據庫的日志。注意,如果這個文件夾不存在,需要手動創建一個並賦予讀寫權限,我們設置為/tmp/zookeeper,不用手動創建這個文件夾,系統運行后會自動創建或覆蓋。

clientPort:ZooKeeper 服務端監聽客戶端的端口,默認是 2181,這里沿用默認設置。

接下來通過 bin 目錄下面的 zkServer.sh 腳本啟動 ZooKeeper 服務,如果不清楚具體參數,可以直接調用腳本查看輸出,ZooKeeper 采用的是 Bourne Shell,如清單 2 所示。

清單 2. 調用 zkServer.sh 腳本
[root@localhost bin]# ./zkServer.sh
ZooKeeper JMX enabled by default
Using config: /home/zhoumingyao/zookeeper/zookeeper-3.4.7/bin/../conf/zoo.cfg
Usage: ./zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}

輸出中可以看到有 start 等選項,其他選項還有 start-foreground、print-cmd、stop、upgrade、restart、status 等,從字面上已經可以基本看出代表意義,這里我們使用 start 選項啟動 ZooKeeper 服務,如清單 3 所示。

清單 3. 啟動 ZooKeeper
[root@localhost bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/zhoumingyao/zookeeper/zookeeper-3.4.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

ZooKeeper 服務是否啟動成功,可以通過 ps 或者 jps 命令查看,如清單 4 所示。

清單 4. 查看 ZooKeeper 服務
[root@localhost bin]# jps
2737 QuorumPeerMain
2751 Jps
[root@localhost bin]# ps -ef | grep zookeeper | grep -v grep | awk '{print $2}'
2608

這里我們看到的進程號為 2737 的進程 QuorumPeerMain 代表了 ZooKeeper 服務。我們也可以通過 ZooKeeper 啟動腳本自帶的參數“Status”來查看 ZooKeeper 進程狀態,如清單 5 所示。

清單 5. 查看 ZooKeeper 進程狀態
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/zhoumingyao/zookeeper/zookeeper-3.4.7/bin/../conf/zoo.cfg
Mode: standalone

ZooKeeper 服務運行以后我們可以通過命令行工具去訪問它,默認是 Java 命令行腳本。我們可以通過以下命令方式啟動 ZooKeeper 命令行 Shell,運行輸出如清單 6 所示。

清單 6.ZKCli 運行輸出

點擊查看代碼清單

光標停留在 [zk: localhost:2181(CONNECTED) 0] 這一行,我們可以通過 help 請求來查看所有的支持命令。

偽分布式模式

我們可以在一台機器上創建模擬的 ZooKeeper 集群服務,假如我們需要 3 個節點,需要創建 3 個 cfg 文件,分別命名為 zoo1.cfg,zoo2.cfg,zoo3.cfg,此外我們還需要創建 3 個不同的數據文件夾,分別是 zoo1,zoo2 和 zoo3,目錄位於/var/lib/zookeeper,清單 7 是其中一個配置清單,其他的兩個類似。

清單 7. 配置文件 zoo1 內容
[root@localhost conf]# cat zoo1.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/var/lib/zookeeper/zoo1
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668

注意,每一個虛擬機器都對應一個自己的 zoo{$}.cfg,其中的 {$} 需要通過清單 8 所示命令來進行設置。

清單 8. 設置 myid
[root@localhost conf]# echo 1 > /var/lib/zookeeper/zoo1/myid
[root@localhost conf]# echo 2 > /var/lib/zookeeper/zoo2/myid
[root@localhost conf]# echo 3 > /var/lib/zookeeper/zoo3/myid

接下來我們開始啟動 ZooKeeper 的 3 個實例(虛擬的 3 台機器),需要調用三次 zkServer.sh 的 Start 命令,采用不同的配置文件,如清單 9 所示命令及輸出。

清單 9. 啟動偽分布式集群服務
[root@localhost bin]# ./zkServer.sh start /home/zhoumingyao/zookeeper/zookeeper-3.4.7/conf/zoo1.cfg
ZooKeeper JMX enabled by default
Using config: /home/zhoumingyao/zookeeper/zookeeper-3.4.7/conf/zoo1.cfg
Starting zookeeper ... STARTED
[root@localhost bin]# ./zkServer.sh start /home/zhoumingyao/zookeeper/zookeeper-3.4.7/conf/zoo2.cfg
ZooKeeper JMX enabled by default
Using config: /home/zhoumingyao/zookeeper/zookeeper-3.4.7/conf/zoo2.cfg
Starting zookeeper ... STARTED
[root@localhost bin]# ./zkServer.sh start /home/zhoumingyao/zookeeper/zookeeper-3.4.7/conf/zoo3.cfg
ZooKeeper JMX enabled by default
Using config: /home/zhoumingyao/zookeeper/zookeeper-3.4.7/conf/zoo3.cfg
Starting zookeeper ... STARTED
清單 10. 查看服務
[root@localhost bin]# jps
5537 QuorumPeerMain
5617 Jps
5585 QuorumPeerMain

確認服務都正常啟動,我們就可以通過 zkCli.sh 腳本方式連接到 ZooKeeper 集群,命令為./zkCli.sh -server localhost:2181,localhost:2182,localhost:2183,效果和單機模式一樣。

分布式模式

由於 ZooKeeper 單機模式不支持單點失敗保護,所以不推薦在生產環境下使用。

ZooKeeper 有另外一種支持多台機器的模式,即真正的分布式模式,這多台被包含在一個集群內的所有機器被稱為 quorum。機器數量而言,集群內部最小配置為 3 台,最佳配置為 5 台,其中包含 1 台 Leader(領導者)機器,由 5 台機器內部選舉產生,另外 4 台機器就立即成為 Follower(跟隨者)機器,一旦 Leader 宕機,剩余的 Follower 就會重新選舉出 Leader。

從配置文件內部的字段定義上來說,分布式模式的 ZooKeeper 與單機模式的 ZooKeeper 有一些差距,例如下面三個字段:

  • initLimit:follower 對於 Leader 的初始化連接 timeout 時間;

  • syncLimit:follower 對於 Leader 的同步 timeout 時間;

  • timeout 的計算公式是 initLimit*tickTime,syncLimit*tickTime。

此外,我們需要把組成 quorum 的所有機器也都列在這個配置文件里面。假設我們有兩個端口,第一個端口 2889 用於 Follower 和 Leader 之間的通信,通信方式是采用 TCP 方式,第二個端口 3889 是為選舉 Leader 用的,用於 quorum 內部的 Leader 選舉響應。那么我們配置文件如清單 11 所示。

清單 11. 分布式模式配置文件
server.1=node1:2889:3889
server.2=node2:2889:3889
server.3=node3:2889:3889

注意,分布式模式也需要設置 myid,這個和偽分布式模式基本一樣,只需要在每一台機器上實現一個 myid,例如第一台機器是 1,第二台機器上設置為 2,第三台機器上設置為 3,以此類推。

分布式模式的啟動方式和單機唯一的差距是每一台機器上都需要啟動 ZooKeeper 服務,即運行命令./zkServer.sh start。

ZooKeeper 服務端運行后,我們可以通過在每台機器上運行./zkServer.sh status 來查看選舉結果,其中 Follower 節點的運行結果如清單 12 所示,Leader 節點的運行結果如清單 13 所示。

清單 12.Follower 節點的運行結果
[root@node3 bin]# ./zkServer.sh status
JMX enabled by default
Using config: /usr/lib/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
清單 13.Leader 節點的運行結果
[root@node2 bin]# ./zkServer.sh status
JMX enabled by default
Using config: /usr/lib/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: leader

差距就在於 Mode 這一欄。接下來可以通過 zkCli 命令行訪問 ZooKeeper 服務,假如我們訪問 node2 節點,如清單 14 所示。

清單 14. 訪問 ZooKeeper 服務及輸出

點擊查看代碼清單

以上就證明分布式模式啟動成功,這里不多加描述,和偽分布式方式基本一樣。

注意,調試過程建議盡量使用分布式模式,單機模式不推薦在生產環境下使用,偽分布式模式實質上是在一個進程內派生多個線程模擬分布式形態,由於操作系統的內部結構設計,容易造成一些問題,建議與其解決問題不如切換到分布式模式。生產環境下建議一定采用分布式模式,如果機器不夠,推薦采用虛擬機方式。

 

流程及源代碼解釋

ZooKeeper 的啟動由 zkServer.sh 發起,真正的起源是 Java 類 QuorumPeerMain,然后進行了一系列配置后啟動負責 ZooKeeper 服務的線程,具體調用過程如圖 1 所示,我會在后續的操作過程中逐一解釋。

圖 1. 啟動 ZooKeeper 服務時序圖

zkServer.sh

這個腳本用於啟動 ZooKeeper 服務,第一個參數有幾種選擇,包括 start、start-foreground、print-cmd、stop、upgrade、restart、status 等 7 個。

我們主要講解 start 方法,腳本使用 nohup 命令提交作業,代碼如清單 15 所示。

清單 15.nohup 命令
nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

變量 ZOOMAIN 為類 QuorumPeerMain.java 進行如下設置,這是啟動服務的第一個入口類,如清單 16 所示。

清單 16.ZOOMAIN 變量設置

點擊查看代碼清單

QuorumPeerMain

QuorumPeerMain 類的 Main 函數較為簡單,直接調用了 initializeAndRun 方法,參數就是 zkServer.sh 轉入的參數,這里是“start”。在 initializeAndRun 方法內部,首先啟動的是定時清除鏡像任務,默認設置為保留 3 份。由於 purgeInterval 這個參數默認設置為 0,所以不會啟動鏡像定時清除機制,如清單 17 所示。

清單 17.ZOOMAIN 變量設置
if (purgeInterval <= 0) {
 LOG.info("Purge task is not scheduled.");
 return;
 }
//如果間隔大於 0,啟動定時任務機制
timer = new Timer("PurgeTask", true);
 TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
 timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

接下來,如果配置的 ZooKeeper 服務器大於 1 台,調用 runFromConfig 方法進行集群信息配置,並啟動 QuorumPeer 線程。

每個 QuorumPeer 線程啟動之前都會先啟動一個 cnxnFactory 線程,首先初始化 ServerCnxnFactory,這個是用來接收來自客戶端的連接的,也就是這里啟動的是一個 TCP 服務器。在 ZooKeeper 里提供兩種 TCP 服務器的實現,一個是使用 Java 原生 NIO 的方式,另外一個是使用 NETTY。默認是 NIO 的方式,一個典型的 Reactor 模型,如清單 18 所示。

清單 18. 創建 ServerCnxnFactory 對象
//首先根據配置創建對應 factory 的實例:NIOServerCnxnFactory 或者 NettyServerCnxnFactory
static public ServerCnxnFactory createFactory() throws IOException {
 String serverCnxnFactoryName =
 System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
 if (serverCnxnFactoryName == null) {
 serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
 }
 try {
 return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
 .newInstance();
 } catch (Exception e) {
 IOException ioe = new IOException("Couldn't instantiate "
 + serverCnxnFactoryName);
 ioe.initCause(e);
 throw ioe;
 }
}

接下來會開始針對 QuorumPeer 實例進行參數配置,QuorumPeer 類代表了 ZooKeeper 集群內的一個節點,所以每個節點打印的日志會有所區別,這在后面會介紹。QuorumPeer 的參數較多,比較關鍵的是 setQuorumPeers(設置集群節點信息,如 {1=org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer@19856a0a, 2=org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer@5f4c39d, 3=org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer@8567b79},這里顯示有三個個節點)、setMyid(每一個 ZooKeeper 節點對應有一個 MyId,我們在前面示例中已經手動創建)、setCnxnFactory(TCP 服務)、setZKDatabase(ZooKeeper 自帶的內存數據庫)、setTickTime(ZooKeeper 服務端和客戶端的會話控制)等等,如清單 19 所示。

清單 19.QuorumPeer 類參數配置
quorumPeer = new QuorumPeer();
 quorumPeer.setClientPortAddress(config.getClientPortAddress());
 quorumPeer.setTxnFactory(new FileTxnSnapLog(
 new File(config.getDataLogDir()),
 new File(config.getDataDir())));
 quorumPeer.setQuorumPeers(config.getServers());

QuorumPeer

接下來調用同步方法 start,正式進入 QuorumPeer 類。start 方法主要包括四個方法,即讀取內存數據庫、啟動 TCP 服務、選舉 ZooKeeper 的 Leader 角色、啟動自己線程,如清單 20 所示。

清單 20.start 方法代碼
@Override
 public synchronized void start() {
 loadDataBase();
 cnxnFactory.start(); 
 startLeaderElection();
 super.start();
 }

loadDataBase 方法用於恢復數據,即從磁盤讀取數據到內存,調用了 addCommittedProposal 方法,該方法維護了一個提交日志的隊列,用於快速同步 follower 角色的節點信息,日志信息默認保存 500 條,所以選用了 LinkedList 隊列用於快速刪除數據溢出時的第一條信息。

清單 21.addCommittedProposal 方法代碼
public void addCommittedProposal(Request request) {
 WriteLock wl = logLock.writeLock();
 try {
 wl.lock();
//采用 LinkedList 作為提交日志的存儲單元
 if (committedLog.size() > commitLogCount) {//數量限制在 500,如果超過 500,移除第一條
 committedLog.removeFirst();
 minCommittedLog = committedLog.getFirst().packet.getZxid();
 }
 if (committedLog.size() == 0) {
 minCommittedLog = request.zxid;
 maxCommittedLog = request.zxid;
 }

 ByteArrayOutputStream baos = new ByteArrayOutputStream();
 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 try {
 request.hdr.serialize(boa, "hdr");
 if (request.txn != null) {
 request.txn.serialize(boa, "txn");
 }
 baos.close();
 } catch (IOException e) {
 LOG.error("This really should be impossible", e);
 }
 QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
 baos.toByteArray(), null);
 Proposal p = new Proposal();
 p.packet = pp;
 p.request = request;
 committedLog.add(p);
 maxCommittedLog = p.packet.getZxid();
 } finally {
 wl.unlock();
 }
 }

為了保證事務的順序一致性,ZooKeeper 采用了遞增的事務 id 號(ZXID)來標識事務。所有的提議(Proposal)都在被提出的時候加上了 ZXID。實現中 ZXID 是一個 64 位的數字,高 32 位是 EPOCH 用來標識 Leader 節點是否改變,每次一個 Leader 被選出來以后它都會有一個新的 EPOCH 值,標識當前屬於哪個 Leader 的統治,低 32 位用於遞增計數。

清單 22. 讀取 ZXID 值和 EPOCH 值
//讀取 EPOCH
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
//從最新的 zxid 恢復 epoch 變量,zxid64 位,前 32 位是 epoch 值,后 32 位是 zxid 
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);

如果當前保存的 EPOCH 和最新獲取的不一樣,那就說明 Leader 重新選舉過了,用最新的值替換,如清單 23 所示。

清單 23.EPOCH 值比較邏輯
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
 LOG.info("currentEpoch="+currentEpoch);
 if (epochOfZxid > currentEpoch && updating.exists()) {
 LOG.info("{} found. The server was terminated after " +
 "taking a snapshot but before updating current " +
 "epoch. Setting current epoch to {}.",
 UPDATING_EPOCH_FILENAME, epochOfZxid);
 setCurrentEpoch(epochOfZxid);
 if (!updating.delete()) {
 throw new IOException("Failed to delete " +
 updating.toString());
 }
 }

接下來進入選舉過程,初始所有節點都一樣的狀態,通過提交自己的申請,然后開始進入投票流程,這個過程中和完成后部分節點(選為 Leader 或 Observer)會發生狀態切換,如清單 24 所示。

清單 24. 角色切換過程日志輸出

點擊查看代碼清單

從日志中可以看出,myid 為 3 的這台機器一開始是 LOOKING 狀態,6 秒以后被選舉為 LEADER,狀態改為 LEADING。對應的,myid 為 2 的機器從 LOOKING 狀態轉變為 FOLLOWING 狀態,即作為 Follower 機器。

選舉過程較為復雜,我們來看一下調用過程。startLeaderElection 方法調用了 createElectionAlgorithm 方法進行選舉,由於參數 electionType 值為 3,所以進入到了清單 25 所示代碼。

清單 25.createElectionAlgorithm 方法代碼
case 3:
 qcm = new QuorumCnxManager(this);
 QuorumCnxManager.Listener listener = qcm.listener;
 if(listener != null){
 listener.start();
 le = new FastLeaderElection(this, qcm);
 } else {
 LOG.error("Null listener when initializing cnx manager");
 }
 break;

清單 25 里面,首先啟動已綁定 3888 端口的選舉線程,等待集群其他機器連接,然后調用基於 TCP 的選舉算法 FastLeaderElection,這里已經通過 FastLeaderElection 的構造函數啟動了 WorkerReceiver 線程,等待后續所有節點上報申請完成。在等待其他節點提交自己申請的過程中,進入了 QuorumPeer 的線程,由於當前獲取 getPeerState 返回的狀態為“LOOKING”,所以進入 LOOKING 代碼塊,如清單 26 所示。

清單 26.LOOKING 方法代碼
while (running) {
 LOG.info("getPeerState()="+getPeerState());
 switch (getPeerState()) {
 case LOOKING:
 LOG.info("LOOKING");

 if (Boolean.getBoolean("readonlymode.enabled")) {
 LOG.info("Attempting to start ReadOnlyZooKeeperServer");

 // Create read-only server but don't start it immediately
 final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
 logFactory, this,
 new ZooKeeperServer.BasicDataTreeBuilder(),
 this.zkDb);

清單 25 里面啟動的線程會一直監聽其他兩台節點發出的申請,如果接收到則開始投票過程,監聽代碼如清單 27 所示。

清單 27. 監聽后處理代碼
while (!shutdown) {
 Socket client = ss.accept();
 setSockOpts(client);
 LOG.info("Received connection request "
 + client.getRemoteSocketAddress());
 receiveConnection(client);
 numRetries = 0;
 }

在選舉過程中,每 2 台選舉機器之間只會建立一個選舉連接,發送線程 SendWorker 啟動,開始發送選舉消息,其他機器通過 IO 線程 RecvWorker 收到消息,添加到接收隊列,后續業務層的接收線程 WorkerReceiver 負責取消息,如清單 28 所示。

清單 28.RecvWorker 代碼
while (running && !shutdown && sock != null) {
 /**
 * Reads the first int to determine the length of the
 * message
 */
 int length = din.readInt();
 if (length <= 0 || length > PACKETMAXSIZE) {
 throw new IOException(
 "Received packet with invalid packet: "
 + length);
 }
 /**
 * Allocates a new ByteBuffer to receive the message
 */
 byte[] msgArray = new byte[length];
 din.readFully(msgArray, 0, length);
 ByteBuffer message = ByteBuffer.wrap(msgArray);
 addToRecvQueue(new Message(message.duplicate(), sid));
 }

WorkServer 的選舉過程如果節點是 Observer 節點,則返回當前選舉結果,如果自己也在 LOOKING,則放入業務接收隊列,選舉主線程會消費該消息,如果自己不在選舉中,而對方服務器在 LOOKING 中,則向其發送當前的選舉結果,當有服務器加入一個集群時需要發送給它,告訴選舉結果。代碼如清單 29 所示。

清單 29.WorkServer 通信代碼
Vote current = self.getCurrentVote();
 LOG.info("Vote current="+current);
 if(ackstate == QuorumPeer.ServerState.LOOKING){
 if(LOG.isDebugEnabled()){
 LOG.debug("Sending new notification. My id = " +
 self.getId() + " recipient=" +
 response.sid + " zxid=0x" +
 Long.toHexString(current.getZxid()) +
 " leader=" + current.getId());
 }
 
 ToSend notmsg;
 if(n.version > 0x0) {
 notmsg = new ToSend(
 ToSend.mType.notification,
 current.getId(),
 current.getZxid(),
 current.getElectionEpoch(),
 self.getPeerState(),
 response.sid,
 current.getPeerEpoch());
 
 } else {
 Vote bcVote = self.getBCVote();
 notmsg = new ToSend(
 ToSend.mType.notification,
 bcVote.getId(),
 bcVote.getZxid(),
 bcVote.getElectionEpoch(),
 self.getPeerState(),
 response.sid,
 bcVote.getPeerEpoch());
 }
 sendqueue.offer(notmsg);
 }

采用 Fast Paxos 的選舉算法,在選舉過程中,某服務器首先向所有 Server 提議自己要成為 Leader,當其它服務器收到提議以后,解決 EPOCH 和 ZXID 的沖突,並接受對方的提議,然后向對方發送接受提議完成的消息,重復這個流程,最后一定能選舉出 Leader,這個過程的流程圖如圖 2 所示。

圖 2. Fast Paxos 算法流程圖

 

結束語

本文首先介紹了如何采用 Standalone、偽分布式、分布式三種方式分別啟動 ZooKeeper 服務,然后從啟動腳本開始解釋啟動過程,從腳本深入到 QuorumPeerMain 主類,再到 QuorumPeer 線程,接着介紹了啟動 SendWorker、RecvWorker、WorkServer 等多個子線程,最后是對 Fast Paxos 算法的介紹。通過全文介紹基本可以了解 ZooKeeper 服務的啟動流程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM