ZooKeeper是用Java開發的,3.4.6版本的Java API文檔可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上找到。
Tips
本章的代碼在Linux操作系統下進行測試,運行ZooKeeper服務器實例的版本為3.4.6。
開發應用程序的ZooKeeper Java綁定主要由兩個Java包組成:
- org.apache.zookeeper
- org.apache.zookeeper.data
org.apache.zookeeper包由ZooKeeper監視的接口定義和ZooKeeper的各種回調處理程序組成。 它定義了ZooKeeper客戶端類庫的主要類以及許多ZooKeeper事件類型和狀態的靜態定義。 org.apache.zookeeper.data包定義了與數據寄存器(也稱為znode)相關的特性,例如訪問控制列表(ACL),IDs,stats等。
ZooKeeper Java API中的org.apache.zookeeper.server,org.apache.zookeeper.server.quorum和org.apache.zookeeper.server.upgrade包是服務器實現的一部分。 org.apache.zookeeper.client包用於查詢ZooKeeper服務器的狀態。
一 准備開發環境
Apache ZooKeeper是一個復雜的軟件,因此它需要運行許多其他類庫。 依賴庫作為jar文件在ZooKeeper發行版中附帶在lib目錄中。 核心ZooKeeper jar文件名字為zookeeper-3.4.6.jar,位於主目錄下。
要開發Java的ZooKeeper應用程序,我們必須設置指向ZooKeeper jar的類路徑,以及ZooKeeper所依賴的所有第三方庫。在 bin 目錄下有一個 zkEnv.sh文件,可以用來設置CLASSPATH。
我們需要將腳本如下設置,在命令行中執行以下語句:
$ ZOOBINDIR=${ZK_HOME}/bin
$ source ${ZOOBINDIR}/zkEnv.sh
shell變量ZK_HOME
被設置為安裝ZooKeeper的路徑,在我的設置中,它是/usr/share/zookeeper
。 之后,CLASSPATH變量被正確設置,在我的系統中,如下所示:
$ echo $CLASSPATH
/usr/share/zookeeper-3.4.6/bin/../build/classes :/usr/share/zookeeper-3.4.6/bin/../build/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar :/usr/share/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar :/usr/share/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar :/usr/share/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar :/usr/share/zookeeper-3.4.6/bin/../src/java/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../conf:
在Windows操作系統中,需要運行zkEnv.cmd腳本。 現在可以使用CLASSPATH變量來編譯和運行使用ZooKeeper API編寫的Java程序。 可以在Uni/Linux中的主目錄的.bashrc文件中找到zkEnv.sh腳本,避免每次啟動shell會話時都采用它。
二 第一個ZooKeeper程序
為了引入ZooKeeper Java API,讓我們從一個非常簡單的程序開始,它可以連接到localhost中的ZooKeeper實例,如果連接成功,它將在ZooKeeper名稱空間的根路徑下打印znode的列表。
這個程序的代碼如下所示:
/*Our First ZooKeeper Program*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
public class HelloZooKeeper {
public static void main(String[] args) throws IOException {
String hostPort = "localhost:2181";
String zpath = "/";
List <String> zooChildren = new ArrayList<String>();
ZooKeeper zk = new ZooKeeper(hostPort, 2000, null);
if (zk != null) {
try {
zooChildren = zk.getChildren(zpath, false);
System.out.println("Znodes of '/': ");
for (String child: zooChildren) {
//print the children
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在構建和執行前面的代碼片段之前,讓我們來看看它具體做了什么。代碼從導入語句開始。使用這些語句,我們導入了程序各個組件所需的包。如前所述,org.apache.zookeeper包包含客戶端與ZooKeeper服務器進行交互所需的所有類和接口。在導入包之后,定義了一個名為HelloZooKeeper
的類。由於我們要連接到在同一系統中運行的ZooKeeper實例,在main
方法中將主機和端口字符串定義為localhost:2181
。代碼行zk = new ZooKeeper(hostPort, 2000, null)
調用ZooKeeper構造方法,該構造方法嘗試連接到ZooKeeper服務器並返回一個引用。對於連接到ZooKeeper服務器實例並維護該連接的客戶端程序,需要維護一個實時會話。在此例中,構造方法實例化的zk
對象返回的引用表示這個會話。 ZooKeeper API是圍繞這個引用構建的,每個方法調用都需要一個引用來執行。
ZooKeeper類的構造方法使用以下代碼創建ZooKeeper實例的引用:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
使用的參數含義如下:
- connectString:以逗號分隔的主機:端口號列表,每個對應一個ZooKeeper服務器。 例如,10.0.0.1:2001,10.0.0.2:2002和10.0.0.3:2003表示三個節點的ZooKeeper ensemble的有效的主機:端口匹配對。
- sessionTimeout:這是以毫秒為單位的會話超時時間。這是ZooKeeper在宣布session結束之前,沒有從客戶端獲得心跳的時間。
- watcher:一個watcher對象,如果創建,當狀態改變和發生節點事件時會收到通知。這個watcher對象需要通過一個用戶定義的類單獨創建,通過實現
Watcher
接口並將實例化的對象傳遞給ZooKeeper構造方法。客戶端應用程序可以收到各種類型的事件的通知,例如連接丟失、會話過期等。
ZooKeeper Java API定義了另外帶有三個參數的構造方法,以指定更高級的操作。代碼如下:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
在ZooKeeper類的上面的構造方法中,如果設置為true,boolean canBeReadOnly
參數允許創建的客戶端在網絡分區的情況下進入只讀模式。只讀模式是客戶端無法找到任何多數服務器的場景,但有一個可以到達的分區服務器,以只讀模式連接到它,這樣就允許對服務器的讀取請求,而寫入請求則不允許。客戶端繼續嘗試在后台連接到大多數服務器,同時仍然保持只讀模式。分區服務器僅僅是ZooKeeper組的一個子集,它是由於集群中的網絡分配而形成的。大多數服務器構成了ensemble中的大多數quorum。
以下構造方法顯示了兩個附加參數的定義:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
這個構造方法允許ZooKeeper客戶端對象創建兩個額外的參數:
- sessionId:在客戶端重新連接到ZooKeeper服務器的情況下,可以使用特定的會話ID來引用先前連接的會話
- sessionPasswd:如果指定的會話需要密碼,可以在這里指定
以下構造方法是前兩個調用的組合:
ZooKeeper(String connectString, int sessionTimeout,Watcher watcher, long sessionId, byte[] sessionPasswd,boolean canBeReadOnly)
此構造方法是前兩個調用的組合,允許在啟用只讀模式的情況下重新連接到指定的會話。
Note
ZooKeeper類的詳細Java API文檔可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上查詢。
現在,回到我們的ZooKeeper程序。 在調用構造方法后,如果連接成功,我們將得到ZooKeeper服務器的引用。 我們通過下面的代碼將引用傳遞給getChildren
方法:
zooChildren = zk.getChildren(zpath, false)
ZooKeeper類的getChildren(String path,boolean watch)
方法返回給定路徑上znode的子級列表。 我們只是迭代這個方法返回的列表,並將字符串打印到控制台。
將程序命名為HelloZooKeeper.java,並編譯我們的程序如下:
$ javac -cp $CLASSPATH HelloZooKeeper.java
在我們運行的程序之前,需要使用以下命令來啟動ZooKeeper服務器實例:
$ ${ZK_HOME}/bin/zkServer.sh start
運行程序如下:
$ java -cp $CLASSPATH HelloZooKeeper
執行程序會在控制台上打印日志消息,顯示所使用的ZooKeeper版本,Java版本,Java類路徑,服務器體系結構等等。 這里顯示了這些日志消息的一部分:
ZooKeeper Java API生成的日志消息對調試非常有用。 它為我們提供了關於客戶端連接到ZooKeeper服務器,建立會話等后台得信息。 上面顯示的最后三條日志消息告訴我們客戶端如何使用程序中指定的參數來啟動連接,以及在成功連接后,服務器如何為客戶端分配會話ID。
最后,程序執行最后在控制台中輸出以下內容:
我們可以使用ZooKeeper shell來驗證程序的正確性:
$ $ZK_HOME/bin/zkCli.sh -server localhost
恭喜! 我們剛剛成功編寫了我們的第一個ZooKeeper客戶端程序。
二 實現Watcher接口
ZooKeeper Watcher監視使客戶端能夠接收來自ZooKeeper服務器的通知,並在發生時處理這些事件。 ZooKeeper Java API提供了一個名為Watcher
的公共接口,客戶端事件處理程序類必須實現該接口才能接收有關來自ZooKeeper服務器的事件通知。 以編程方式,使用這種客戶端的應用程序通過向客戶端注冊回調(callback)對象來處理這些事件。
我們將實現Watcher
接口,處理與znode關聯的數據更改時由ZooKeeper生成的事件。
Watcher
接口在org.apache.zookeeper包中聲明如下:
public interface Watcher {
void process(WatchedEvent event);
}
為了演示znode數據監視器(Watcher),有兩個Java類:DataWatcher
和DataUpdater
。 DataWatcher
將一直運行,並在/MyConfig
指定znode路徑中偵聽來自ZooKeeper服務器的NodeDataChange
事件。 DataUpdate
r類將定期更新此znode路徑中的數據字段,這將生成事件,並且在接收到這些事件后,DataWatcher
類將把更改后的數據打印到控制台上。
以下是DataWatcher.java
類的代碼:
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class DataWatcher implements Watcher, Runnable {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
byte zoo_data[] = null;
ZooKeeper zk;
public DataWatcher() {
try {
zk = new ZooKeeper(hostPort, 2000, this);
if (zk != null) {
try {
//Create the znode if it doesn't exist, with the following code:
if (zk.exists(zooDataPath, this) == null) {
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void printData() throws InterruptedException, KeeperException {
zoo_data = zk.getData(zooDataPath, this, null);
String zString = new String(zoo_data);
// The following code prints the current content of the znode to the console:
System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, zString);
}
@Override
public void process(WatchedEvent event) {
System.out.printf(
"\nEvent Received: %s", event.toString());
//We will process only events of type NodeDataChanged
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
printData();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
throws InterruptedException, KeeperException {
DataWatcher dataWatcher = new DataWatcher();
dataWatcher.printData();
dataWatcher.run();
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
我們來看一下DataWatcher.java
類的代碼來理解一個ZooKeeper監視器的實現。 DataWatcher
公共類實現Watcher
接口以及Runnable
接口,打算將監視器作為線程運行。 main
方法創建DataWatcher
類的一個實例。 在前面的代碼中,DataWatcher
構造方法嘗試連接到在本地主機上運行的ZooKeeper實例。 如果連接成功,我們用下面的代碼檢查znode路徑/MyConfig
是否存在:
if (zk.exists(zooDataPath, this) == null) {
如果znode不存在ZooKeeper命名空間中,那么exists
方法調用將返回null,並且嘗試使用代碼將其創建為持久化znode,如下所示:
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
接下來是process
方法,它在org.apache.ZooKeeper的Watcher
接口中聲明,並由DataWatcher
類使用以下代碼實現:
public void process(WatchedEvent event) {
為了簡單起見,在process
方法中,打印從ZooKeeper實例接收的事件,並僅對NodeDataChanged
類型的事件進行進一步處理,如下所示:
if (event.getType() == Event.EventType.NodeDataChanged)
當znode路徑/MyConfig
的數據字段發生任何更新或更改而收到NodeDataChanged
類型的事件時,調用printData
方法來打印znode的當前內容。 在znode上執行一個getData
調用時,我們再次設置一個監視,這是該方法的第二個參數,如下面的代碼所示:
zoo_data = zk.getData(zooDataPath, this, null);
監視事件是發送給設置監視的客戶端的一次性觸發器,為了不斷接收進一步的事件通知,客戶端應該重置監視器。
DataUpdater.java
是一個簡單的類,它連接到運行本地主機的ZooKeeper實例,並用隨機字符串更新znode路徑/MyConfig
的數據字段。 在這里,我們選擇使用通用唯一標識符(UUID)字符串更新znode,因為后續的UUID生成器調用將保證生成唯一的字符串。
DataUpdater.java
類代碼如下:
import java.io.IOException;
import java.util.UUID;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DataUpdater implements Watcher {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
ZooKeeper zk;
public DataUpdater() throws IOException {
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
}
// updates the znode path /MyConfig every 5 seconds with a new UUID string.
public void run() throws InterruptedException, KeeperException {
while (true) {
String uuid = UUID.randomUUID().toString();
byte zoo_data[] = uuid.getBytes();
zk.setData(zooDataPath, zoo_data, -1);
try {
Thread.sleep(5000); // Sleep for 5 secs
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws
IOException, InterruptedException, KeeperException {
DataUpdater dataUpdater = new DataUpdater();
dataUpdater.run();
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}
上面的代碼使ZooKeeper服務器觸發一個NodeDataChanged
事件。 由於DataWatcher
為此znode路徑設置了監視,因此它會接收數據更改事件的通知。 然后它檢索更新的數據,重置監視,並在控制台上打印數據。
使用以下命令編譯DataWatcher
和DataUpdater
類:
$ javac –cp $CLASSPATH DataWatcher.java
$ javac –cp $CLASSPATH DataUpdater.java
要執行監視器和更新程序,需要打開兩個終端窗口。 我要先運行監視器,因為它創建了/MyConfig
的znode(如果還未在ZooKeeper的命名空間中創建的話)。 運行監視器之前,請確保ZooKeeper服務器在本地主機上已經運行。
在其中一個終端窗口中,通過運行以下命令來執行watcher類:
$ java –cp $CLASSPATH DataWatcher
輸出類似於以下屏幕截圖所示的消息:
如前面的截圖所示,znode路徑/MyConfig
是由DataWatcher
類創建的。 它也打印znode的內容,但沒有打印在控制台中,因為我們在創建znode時沒有設置任何數據。 當znode被創建時,類中的監視者收到了NodeCreated
類型的事件通知,這個通知被打印在控制台中。 DataWatcher
類繼續運行,並從ZooKeeper服務器偵聽/MyConfig
節點上的事件。
讓我們在另一個終端窗口中運行DataUpdater
類:
$ java -cp $CLASSPATH DataUpdater
將最初的ZooKeeper特定日志消息記錄到控制台后,DataUpdater
類運行時沒有提示。 它將一個新的UUID字符串設置到ZooKeeper路徑/MyConfig
的數據字段中。 因此,看到每隔5秒鍾,在下面的屏幕截圖中顯示的輸出內容打印在運行DataWatche
的終端窗口中:
DataWatcher
也可以使用ZooKeeper shell進行測試。 繼續像以前一樣在終端中運行DataWatcher
類,並在另一個終端中調用ZooKeeper shell並運行以下屏幕截圖中所示的命令:
在DataWatcher正在運行的終端中,將打印以下消息:
三 示例——群集監視器
通過互聯網提供的流行服務,如電子郵件,文件服務平台,在線游戲等,都是通過跨越多個數據中心的高度可用的成百上千台服務器來服務的,而這些服務器通常在地理位置上分開。 在這種集群中,設置了一些專用的服務器節點來監視生產網絡中承載服務或應用程序的服務器的活躍性。 在雲計算環境中,也用於管理雲環境的這種監控節點被稱為雲控制器。 這些控制器節點的一個重要工作是實時檢測生產服務器的故障,並相應地通知管理員,並采取必要的措施,例如將故障服務器上的應用程序故障轉移到另一個服務器,從而確保容錯性和高可用性。
在本節中,我們將使用ZooKeeper Java客戶端API開發一個簡約的分布式集群監視器模型。 使用ZooKeeper的ephemeral znode概念來構建這個監視模型相當簡單和優雅,如以下步驟所述:
- 每個生產服務器運行一個ZooKeeper客戶端作為守護進程。 這個過程連接到ZooKeeper服務器,並在
/ZooKeeper
命名空間的預定義路徑(比如/Members
)下創建一個帶有名稱(最好是其網絡名稱或主機名)的ephemeral znode。 - 雲控制器節點運行ZooKeeper監視器進程,該進程監視路徑
/Members
並監聽NodeChildrenChanged
類型的事件。 這個監視器進程作為服務或守護進程運行,並設置或重置路徑上的監視,並且實現其邏輯以調用適當的模塊來為監視事件采取必要的行動。 - 現在,如果生產服務器由於硬件故障或軟件崩潰而關閉,ZooKeeper客戶端進程就會被終止,導致服務器和ZooKeeper服務之間的會話被終止。 由於ephemeral znode的屬性唯一,每當客戶端連接關閉時,ZooKeeper服務會自動刪除路徑
/Members
中的znode。 - 路徑中znode的刪除引發了
NodeChildrenChanged
事件,因此雲控制器中的觀察器進程會收到通知。 通過調用路徑/Members
中的getChildren
方法,可以確定哪個服務器節點已經關閉。 - 然后,控制器節點可以采取適當的措施,比如執行恢復邏輯以重啟另一台服務器中的故障服務。
- 這個邏輯可以構建為實時工作,保證接近於零停機的時間和高度可用的服務。
為實現這個集群監控模型,我們將開發兩個Java類。 ClusterMonito
r類將持續運行監視器,以監視ZooKeeper樹中的路徑/Members
。 處理完引發事件后,我們將在控制台中打印znode列表並重置監視。 另一個類ClusterClient
將啟動到ZooKeeper服務器的連接,在/Members
下創建一個ephemeral znode。
要模擬具有多個節點的集群,我們在同一台計算機上啟動多個客戶端,並使用客戶端進程的進程ID創建ephemeral znode。 通過查看進程標識,ClusterMonito
r類可以確定哪個客戶進程已經關閉,哪些進程還在。 在實際情況中,客戶端進程通常會使用當前正在運行的服務器的主機名創建ephemeral znode。 下面顯示了這兩個類的源代碼。
ClusterMonitor.java
類定義如下:
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterMonitor implements Runnable {
private static String membershipRoot = "/Members";
private final Watcher connectionWatcher;
private final Watcher childrenWatcher;
private ZooKeeper zk;
boolean alive=true;
public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
connectionWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType()==Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}
};
childrenWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
//Get current list of child znode,
//reset the watch
List<String> children = zk.getChildren( membershipRoot, this);
wall("!!!Cluster Membership Change!!!");
wall("Members: " + children);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
alive = false;
throw new RuntimeException(e);
}
}
}
};
zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
// Ensure the parent znode exists
if(zk.exists(membershipRoot, false) == null) {
zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// Set a watch on the parent znode
List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
System.err.println("Members: " + children);
}
public synchronized void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void wall (String message) {
System.out.printf("\nMESSAGE: %s", message);
}
public void run() {
try {
synchronized (this) {
while (alive) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
if (args.length != 1) {
System.err.println("Usage: ClusterMonitor <Host:Port>");
System.exit(0);
}
String hostPort = args[0];
new ClusterMonitor(hostPort).run();
}
}
ClusterClient.java
類定義如下:
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterClient implements Watcher, Runnable {
private static String membershipRoot = "/Members";
ZooKeeper zk;
public ClusterClient(String hostPort, Long pid) {
String processId = pid.toString();
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
if (zk != null) {
try {
zk.create(membershipRoot + '/' + processId, processId.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (
KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void close() {
try {
zk.close();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage: ClusterClient <Host:Port>");
System.exit(0);
}
String hostPort = args[0];
//Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0, index));
new ClusterClient(hostPort, processId).run();
}
}
使用下面命令編譯這兩個類:
$ javac -cp $CLASSPATH ClusterMonitor.java
$ javac -cp $CLASSPATH ClusterClient.java
要執行群集監控模型,打開兩個終端。 在其中一個終端中,運行ClusterMonitor
類。 在另一個終端中,通過在后台運行ClusterClient
類來執行多個實例。
在第一個終端中,執行ClusterMonitor
類:
$ java -cp $CLASSPATH ClusterMonitorlocalhost:2181
如前面的示例所示,看到來自客戶端API的調試日志消息,最后,ClusterMonitor
類開始監視事件,輸入如下內容:
現在,執行ClusterClient
類的五個實例來模擬一個集群的五個節點。ClusterClient
在ZooKeeper樹的/Members
路徑中使用自己的進程ID創建ephemeral znode:
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[1] 4028
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[2] 4045
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[3] 4057
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[4] 4072
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[5] 4084
與此相對應,將觀察到ClusterMonitor
類檢測到這些新的ClusterClient
類實例,因為它正在監視ZooKeeper樹的/Members
路徑上的事件。 這模擬了一個真正的集群中的節點加入事件。 可以在ClusterMonitor
類的終端中看到輸出,這與下面的截圖中顯示的類似:
現在,如果殺死一個ClusterClient.java
進程,那么它與ZooKeeper服務器一起維護的會話將被終止。因此,客戶端創建的ephemeral znode將被刪除。刪除將觸發NodeChildrenChanged
事件,該事件將被ClusterMonitor類捕獲。該模擬在集群中一個節點離開的場景。
讓我們用ID 4084終止ClusterClien
t進程:
$ kill -9 4084
以下屏幕截圖顯示了ClusterMonitor
類的終端中的輸出。 它列出了當前可用的進程及其進程ID,這些進程ID模擬了實時服務器:
上面的簡單而優雅的集群監控模型的示例實現展示了ZooKeeper的真正威力。 在沒有ZooKeeper的情況下,開發這樣一個能夠實時監控節點活躍度的模型將是一項真正的艱巨任務。