tickTime=2000 dataDir=D:/devtools/zookeeper-3.2.2/build clientPort=2181 |
-
tickTime:這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
-
dataDir:顧名思義就是 Zookeeper 保存數據的目錄,默認情況下,Zookeeper 將寫數據的日志文件也保存在這個目錄里。
- clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。
initLimit=5 syncLimit=2 server.1=192.168.211.1:2888:3888 server.2=192.168.211.2:2888:3888 |
-
initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 10 個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒
-
syncLimit:這個配置項標識 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 2*2000=4 秒
-
server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;B 是這個服務器的 ip 地址;C 表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;D 表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。如果是偽集群的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 實例通信端口號不能一樣,所以要給它們分配不同的端口號。
|
|
|
| String create(String path, byte[] data, List<ACL> acl,CreateMode createMode) | 創建一個給定的目錄節點 path, 並給它設置數據,CreateMode 標識有四種形式的目錄節點,分別是 PERSISTENT:持久化目錄節點,這個目錄節點存儲的數據不會丟失;PERSISTENT_SEQUENTIAL:順序自動編號的目錄節點,這種目錄節點會根據當前已近存在的節點數自動加 1,然后返回給客戶端已經成功創建的目錄節點名;EPHEMERAL:臨時目錄節點,一旦創建這個節點的客戶端與服務器端口也就是 session 超時,這種節點會被自動刪除;EPHEMERAL_SEQUENTIAL:臨時自動編號節點 |
| Stat exists(String path, boolean watch) | 判斷某個 path 是否存在,並設置是否監控這個目錄節點,這里的 watcher 是在創建 ZooKeeper 實例時指定的 watcher,exists方法還有一個重載方法,可以指定特定的watcher |
| Stat exists(String path,Watcher watcher) | 重載方法,這里給某個目錄節點設置特定的 watcher,Watcher 在 ZooKeeper 是一個核心功能,Watcher 可以監控目錄節點的數據變化以及子目錄的變化,一旦這些狀態發生變化,服務器就會通知所有設置在這個目錄節點上的 Watcher,從而每個客戶端都很快知道它所關注的目錄節點的狀態發生變化,而做出相應的反應 |
| void delete(String path, int version) | 刪除 path 對應的目錄節點,version 為 -1 可以匹配任何版本,也就刪除了這個目錄節點所有數據 |
| List<String>getChildren(String path, boolean watch) | 獲取指定 path 下的所有子目錄節點,同樣 getChildren方法也有一個重載方法可以設置特定的 watcher 監控子節點的狀態 |
| Stat setData(String path, byte[] data, int version) | 給 path 設置數據,可以指定這個數據的版本號,如果 version 為 -1 怎可以匹配任何版本 |
| byte[] getData(String path, boolean watch, Stat stat) | 獲取這個 path 對應的目錄節點存儲的數據,數據的版本等信息可以通過 stat 來指定,同時還可以設置是否監控這個目錄節點數據的狀態 |
| voidaddAuthInfo(String scheme, byte[] auth) | 客戶端將自己的授權信息提交給服務器,服務器將根據這個授權信息驗證客戶端的訪問權限。 |
| Stat setACL(String path,List<ACL> acl, int version) | 給某個目錄節點重新設置訪問權限,需要注意的是 Zookeeper 中的目錄節點權限不具有傳遞性,父目錄節點的權限不能傳遞給子目錄節點。目錄節點 ACL 由兩部分組成:perms 和 id。 Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 幾種 而 id 標識了訪問目錄節點的身份列表,默認情況下有以下兩種: ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分別表示任何人都可以訪問和創建者擁有訪問權限。 |
| List<ACL>getACL(String path,Stat stat) | 獲取某個目錄節點的訪問權限列表 |
// 創建一個與服務器的連接
ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT,
ClientBase.CONNECTION_TIMEOUT, new Watcher() {
// 監控所有被觸發的事件
public void process(WatchedEvent event) {
System.out.println("已經觸發了" + event.getType() + "事件!");
}
});
// 創建一個目錄節點
zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// 創建一個子目錄節點
zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath",false,null)));
// 取出子目錄節點列表
System.out.println(zk.getChildren("/testRootPath",true));
// 修改子目錄節點數據
zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);
System.out.println("目錄節點狀態:["+zk.exists("/testRootPath",true)+"]");
// 創建另外一個子目錄節點
zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));
// 刪除子目錄節點
zk.delete("/testRootPath/testChildPathTwo",-1);
zk.delete("/testRootPath/testChildPathOne",-1);
// 刪除父目錄節點
zk.delete("/testRootPath",-1);
// 關閉連接
zk.close();
|
已經觸發了 None 事件! testRootData [testChildPathOne] 目錄節點狀態:[5,5,1281804532336,1281804532336,0,1,0,0,12,1,6] 已經觸發了 NodeChildrenChanged 事件! testChildDataTwo 已經觸發了 NodeDeleted 事件! 已經觸發了 NodeDeleted 事件! |
- I0Itec-zkClient
zkClient簡單的使用樣例如下:
下面表格列出了寫操作與ZK內部產生的事件的對應關系:
event For “/path” event For “/path/child”|
|
||
| create(“/path”) | EventType.NodeCreated | NA |
| delete(“/path”) | EventType.NodeDeleted | NA |
| setData(“/path”) | EventType.NodeDataChanged | NA |
| create(“/path/child”) | EventType.NodeChildrenChanged | EventType.NodeCreated |
| delete(“/path/child”) | EventType.NodeChildrenChanged | EventType.NodeDeleted |
| setData(“/path/child”) | NA | EventType.NodeDataChanged |
而ZK內部的寫事件與所觸發的watcher的對應關系如下:
event For “/path” defaultWatcher exists(“/path”) getData
(“/path”) getChildren
(“/path”)
|
|
||||
| EventType.None | √ | √ | √ | √ |
| EventType.NodeCreated | √ | √ | ||
| EventType.NodeDeleted | √(不正常) | √ | ||
| EventType.NodeDataChanged | √ | √ | ||
| EventType.NodeChildrenChanged | √ |
綜合上面兩個表,我們可以總結出各種寫操作可以觸發哪些watcher,如下表所示:
“/path” “/path/child” exists getData getChildren exists getData getChildren|
|
||||||
|
|
||||||
| create(“/path”) | √ | √ | ||||
| delete(“/path”) | √ | √ | √ | |||
| setData(“/path”) | √ | √ | ||||
| create(“/path/child”) | √ | √ | √ | |||
| delete(“/path/child”) | √ | √ | √ | √ | ||
| setData(“/path/child”) | √ | √ |
如果發生session close、authFail和invalid,那么所有類型的wather都會被觸發。
zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。比如subscribeChildChanges實際上是通過exists和getChildren關注了兩個事件。這樣當create(“/path”)時,對應path上通過getChildren注冊的listener也會被調用。另外subscribeDataChanges實際上只是通過exists注冊了事件。因為從上表可以看到,對於一個更新,通過exists和getData注冊的watcher要么都會觸發,要么都不會觸發。
zkClient地址:https://github.com/sgroschupf/zkclient
Maven工程中使用zkClient需要加的依賴:
<dependency>
<groupId>zkclient</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
- menagerie
基於分布式鎖,還實現了其他業務場景,比如leader選舉:
public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(“ZK-host-ip:2181″, 5000);
LeaderElector elector = new ZkLeaderElector(“/leaderElectionTest”, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(“Try to become the leader success!”);
}
}
java.util.concurrent包下面的其他接口實現,也主要是基於ReentrantZkLock的,比如ZkHashMap實現了ConcurrentMap。具體請參見menagerie的API文檔
menagerie地址:https://github.com/openUtility/menagerie
Maven工程中使用menagerie需要加的依賴:
<dependency>
<groupId>org.menagerie</groupId>
<artifactId>menagerie</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
void findLeader() throws InterruptedException {
byte[] leader = null;
try {
leader = zk.getData(root + "/leader", true, null);
} catch (Exception e) {
logger.error(e);
}
if (leader != null) {
following();
} else {
String newLeader = null;
try {
byte[] localhost = InetAddress.getLocalHost().getAddress();
newLeader = zk.create(root + "/leader", localhost,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
logger.error(e);
}
if (newLeader != null) {
leading();
} else {
mutex.wait();
}
}
}
|
void getLock() throws KeeperException, InterruptedException{
List<String> list = zk.getChildren(root, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if(myZnode.equals(root+"/"+nodes[0])){
doAction();
}
else{
waitForLock(nodes[0]);
}
}
void waitForLock(String lower) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
if(stat != null){
mutex.wait();
}
else{
getLock();
}
}
|
void addQueue() throws KeeperException, InterruptedException{
zk.exists(root + "/start",true);
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
synchronized (mutex) {
List<String> list = zk.getChildren(root, false);
if (list.size() < size) {
mutex.wait();
} else {
zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
}
|
public void process(WatchedEvent event) {
if(event.getPath().equals(root + "/start") &&
event.getType() == Event.EventType.NodeCreated){
System.out.println("得到通知");
super.process(event);
doAction();
}
}
|
boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
|
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
if(tempValue < min) min = tempValue;
}
byte[] b = zk.getData(root + "/element" + min,false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
|







