兩年前寫過ZooKeeper編程(一),那時候還在實習。近期組內做了個zookeeper編程的分享,就又把各種問題整理了一下。以下只是簡單地copy了幻燈片中的內容,寫得不夠連貫,讀者見諒。
ZooKeeper的輪廓

/---root
|
\----child1
|
\----child2
|
\----child3
|
\----grandson1
|
\----grandson2
采用了簡化的praxos算法來確保zookeeper集群節點的數據一致性
只要Quorum的成員有一半以上處於正常狀態,就能對外提供服務
任何修改命令都需要leader協調。 在leader的協調過程中,需要3次leader與Follower之間的來回請求響應。並且在此過程中還會涉及事務日志的記錄,更糟糕的情況是還有take snapshot的操作。因此此過程可能比較耗時。但Zookeeper的通信中最大特點是異步的,如果請求是連續不斷的,Zookeeper的處理是集中處理邏輯,然后批量發送,批量的大小也是有控制的。如果請求量不大,則即刻發送。這樣當負載很大時也能保證很大的吞吐量,時效性也在一定程度上進行了保證。
Zookeeper特性
服務端配置
# 心跳間隔時間
tickTime=2000
# 最小SessionTimeou
minSessionTimeout=4000
# 最大SessionTimeou
maxSessionTimeout=100000
# 允許 follower (相對於 leader 而言的“客戶端”)連接並同步到 leader 的初始化連接時間為tickTime的多少倍,超過這個時間則連接失敗
initLimit=10
# eader 與 follower 之間發送消息,請求和應答時間長度。如果 follower 在設置的時間內不能與 leader 進行通信,那么此 follower 將被丟棄
syncLimit=5
# 客戶端連接最大數
maxClientCnxns=30
客戶端配置
ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181, 192.168.119.97:2181 , 192.168.119.98:2181 /app/learn", TIMEOUT,null));
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
*
* @description ZooKeeper基本讀寫操作演示類
* @author zhangchaoyang
* @date 2014-6-22
*/
public class SimplestDemo {
private static final int TIMEOUT = 3000;
public static void main(String[] args) throws IOException, KeeperException,
InterruptedException {
// Client向zookeeper發送連接請求
ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", // 指定zookeeper
// server的IP、端口列表(當client連接不上server時會按照此列表嘗試連接下一台server),以及默認的根目錄
TIMEOUT,// Session Timeout
null// 是否設置監聽器
);
zkp.create("/znodename", // 節點名稱
"znodedata".getBytes(), // 節點上的數據
Ids.OPEN_ACL_UNSAFE,// ACL
CreateMode.EPHEMERAL// 節點類型,有三種:PERSISTENT、EPHEMERAL、SEQUENTIAL。EPHEMERAL節點不允許有子節點
);
Stat stat = zkp.exists("/znodename",// 節點名,如果節點不存在,則exists()返回null
false// 是否設置監聽器
);
if (zkp.exists("/znodename", false) != null) {
System.out.println("znodename exists now.");
}
// 修改節點上存儲的數據,需要提供version,version設為-1表示強制修改
zkp.setData("/znodename", "newdata".getBytes(), stat.getVersion());
// 讀取節點上的數據
String data = new String(zkp.getData("/znodename", false, stat));
System.out.println(data);
// client端主動斷開連接
zkp.close();
}
}
Session
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
/**
*
* @description Zookeeper Session演示類
* @author zhangchaoyang
* @date 2014-6-22
*/
public class SessionDemo {
/**
* zoo.cfg中的配置:
*
* <pre>
* tickTime=2000
* minSessionTimeout=4000(至少是tickTime的2倍)
* maxSessionTimeout=40000(最大是tickTime的20倍)
* </pre>
*
* 如果客戶端建立連接時指定的TIMEOUT不在[minSessionTimeout,maxSessionTimeout]區間內,
* 服務端會強制把它修改到該區間內
*/
private static final int TIMEOUT = 40000; // Session
// Timeout設為40秒,因為心跳周期為2秒,所以如果server向client連續發送20個心跳都收不到回應,則Session過期失效
private static ZooKeeper zkp = null;
private static void connect() throws IOException {
zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null);
}
private static void createNode() throws KeeperException,
InterruptedException {
if (zkp != null) {
zkp.create("/znodename", "znodedata".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
private static String getData() throws KeeperException,
InterruptedException {
if (zkp != null) {
Stat stat = zkp.exists("/znodename", false);
return new String(zkp.getData("/znodename", false, stat));
}
return null;
}
private static void disconnect() throws InterruptedException {
if (zkp != null) {
zkp.close();
}
}
/**
* 休息,在此期間我們有三種選擇:<br>
* <ol>
* <li>永久性斷開網絡連接
* <li>斷開網絡連接一段時間timespan后再連上,其中timespan<{@code TIMEOUT}
* <li>斷開網絡連接一段時間timespan后再連上,其中timespan>{@code TIMEOUT}
* </ol>
*/
private static void sleepForNetworkDisturbances() {
try {
Thread.sleep(2 * TIMEOUT);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
connect();
} catch (IOException e) {
System.err
.println("Can't create zookeeper client, please check the network.");
}
System.out.println("Session build.");
try {
createNode();
} catch (Exception e) {
System.err.println("Create znode failed.");
}
System.out.println("znode created.");
sleepForNetworkDisturbances();
try {
String data = getData();
if (data != null) {
// 在“休息”期間做了第2件事情,Sesion沒有過期,EPHEMERAL節點依然存在
System.out.println("data=" + data);
}
} catch (KeeperException e) {
e.printStackTrace();
// 在“休息”期間做了第1件事情
if (e instanceof ConnectionLossException) {
System.err
.println("Oops, network is disconnected. Retry getData().");
// 如果session沒有失效,而僅僅是網絡異常,則可以重新嘗試獲取數據,可能在重試時網絡已經正常了
try {
Thread.sleep(1000);
String data = getData();
if (data != null) {
System.out.println("data=" + data);
} else {
System.out.println("can't get data.");
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
// 在“休息”期間做了第3件事情,則session會過期
else if (e instanceof SessionExpiredException) {
System.err
.println("Session Expired, client will reconnect and create znode again.");
// 當發再Session Expired時,必須重新建立連接,即new一個ZooKeeper
try {
connect();
createNode();
String data = getData();
if (data != null) {
System.out.println("data=" + data);
} else {
System.out.println("can't get data.");
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
disconnect();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Client disconnected.");
}
}
Watcher
什么樣的操作會產生什么類型的事件:
|
|
event For “/path” |
event For “/path/child” |
| create(“/path”) |
EventType.NodeCreated |
-- |
| delete(“/path”) |
EventType.NodeDeleted |
-- |
| setData(“/path”) |
EventType.NodeDataChanged |
-- |
| create(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeCreated |
| delete(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeDeleted |
| setData(“/path/child”) |
-- |
EventType.NodeDataChanged |
什么操作會觸發EventType.None?
事件類型與watcher的對應關系:
| event For “/path” |
Default Watcher |
exists |
getData |
getChildren |
| EventType.None |
√ |
√ |
√ |
√ |
| EventType.NodeCreated |
|
√ |
√ |
|
| EventType.NodeDeleted |
|
√ |
√ |
|
| EventType.NodeDataChanged |
|
√ |
√ |
|
| EventType.NodeChildrenChanged |
|
|
|
√ |
操作與watcher的對應關系:
| "/path" |
"/path/child" |
|||||
|
|
exists |
getData |
getChildren |
exists |
getData |
getChildren |
| create(“/path”) |
√ |
√ |
|
|
|
|
| delete(“/path”) |
√ |
√ |
√ |
|
|
|
| setData(“/path”) |
√ |
√ |
|
|
|
|
| create(“/path/child”) |
|
|
√ |
√ |
√ |
|
| delete(“/path/child”) |
|
|
√ |
√ |
√ |
√ |
| setData(“/path/child”) |
|
|
|
√ |
√ |
|
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
*
* @description Zookeeper Watcher演示類
* @author zhangchaoyang
* @date 2014-6-22
*/
public class WatcherDemo {
private static ZooKeeper zkp = null;
private static final int TIMEOUT = 6000;
private static Watcher getWatcher(final String msg) {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(msg + "上的監聽被觸發\t事件類型" + event.getType()
+ "\t發生變化的節點" + event.getPath());
}
};
}
public static void main(String[] args) throws IOException, KeeperException,
InterruptedException {
System.out.println("--------------1----------------");
zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,
getWatcher("CONNECT"));
Thread.sleep(1000);
System.out.println("--------------2----------------");
zkp.create("/znodename", "znodedata".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zkp.create("/znodename/childnode", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Stat stat = zkp.exists("/znodename", getWatcher("EXISTS"));
zkp.getChildren("/", getWatcher("GETCHILDREN"));
zkp.getData("/znodename", getWatcher("GETDATA"), stat);
stat = zkp.exists("/znodename/childnode", getWatcher("EXISTS"));
zkp.getChildren("/znodename", getWatcher("GETCHILDREN"));
zkp.getData("/znodename/childnode", getWatcher("GETDATA"), stat);
// zkp.close();
zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,
getWatcher("CONNECT"));
Thread.sleep(1000);
System.out.println("--------------3----------------");
zkp.delete("/znodename/childnode", -1);
zkp.delete("/znodename", -1);
zkp.close();
}
}
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
*
* @description 自定義持久性的zookeeper watcher
* @author zhangchaoyang
* @date 2014-6-22
*/
public class PersistWatcher {
private static final int TIMEOUT = 6000;
private static final String znode = "/globalconfnode";
private static String globalConfData = "";
private static Watcher getConnectWatcher() {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(EventType.None)) {
System.out.println("連接狀態發生變化。");
}
}
};
}
private static Watcher getExistsWatcher(final ZooKeeper zkp) {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
if (event.getType().equals(EventType.NodeDataChanged)
|| event.getType().equals(EventType.NodeCreated)) {
// 節點被創建或修改時更新緩存中的值
Stat stat = zkp.exists(znode, this);// 再次注冊監聽
String data = new String(
zkp.getData(znode, false, stat));
globalConfData = data;
} else if (event.getType().equals(EventType.NodeDeleted)) {
// 節點被刪除時報警
System.out
.println("global configuration node have been deleted!");
try {
// 再次注冊監聽
zkp.exists(znode, this);
} catch (KeeperException e) {
if (e instanceof ConnectionLossException) {
System.out.println("連接已斷開。");
}
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
public static void main(String[] args) {
try {
ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn",
TIMEOUT,
getConnectWatcher());
zkp.exists(znode, getExistsWatcher(zkp));
zkp.create(znode, "config_value".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Thread.sleep(500);// 修改節點后必須sleep,等待watcher回調完成
System.out.println(globalConfData);
for (int i = 0; i < 4; i++) {
zkp.setData(znode, ("config_value" + i).getBytes(), -1);
Thread.sleep(500);// 修改節點后必須sleep,等待watcher回調完成
System.out.println(globalConfData);
}
zkp.close();// EPHEMERAL節點會被刪除,但Session並不會馬上失效(只不過ConnectionLoss了),所以還是會觸發watcher
try {
// 此時Session已失效
zkp.exists(znode, false);
} catch (KeeperException e) {
if (e instanceof SessionExpiredException)
System.out.println("Session已失效。");
}
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ACL
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoAuthException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
/**
*
* @description Zookeeper ACL演示類
* @author zhangchaoyang
* @date 2014-6-22
*/
public class AclDemo {
private static final int TIMEOUT = 6000;
public static void main(String[] args) throws IOException, KeeperException,
InterruptedException {
ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,
null);
String schema = "digest";// schema類型有:world,auth,digest,ip,super
String auth = "username:password";
zkp.addAuthInfo(schema, auth.getBytes());
List<ACL> acls = new ArrayList<ACL>();
for (ACL id : Ids.CREATOR_ALL_ACL) {
acls.add(id);
}
zkp.create("/znodename", "znodedata".getBytes(), acls,
CreateMode.PERSISTENT);
ZooKeeper zoo = null;
try {
zoo = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null);
System.out.println("采用不合法的認證方式:");
String badAuthentication = "username:wrongpass";
zoo.addAuthInfo(schema, badAuthentication.getBytes());
zoo.getData("/znodename", null, null);
} catch (KeeperException e) {
if (e instanceof NoAuthException) {
System.out.println("認證失敗:" + e.getMessage());
}
System.out.println("采用合法的認證方式:");
zoo.addAuthInfo(schema, auth.getBytes());
String data = new String(zoo.getData("/znodename", null, null));
if (data != null) {
System.out.println("認證成功:data=" + data);
}
} finally {
if (zoo != null && zoo.getState().isAlive()) {
zoo.close();
}
}
zkp.delete("/znodename", -1);
zkp.close();
}
}
開源工具menagerie
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
/**
*
* @description 使用ZooKeeper實現分布式鎖
* @author zhangchaoyang
* @date 2014-6-22
*/
public class ZooKeeperLock {
private static Logger logger = Logger.getLogger(ZooKeeperLock.class);
private static ZooKeeper zk = null;
private static final int TIMEOUT = 1000 * 60;
private static String connStr = null;
public static void setServerPath(String path) {
connStr = path + "/app/bqas/lock";
logger.info("ZooKeeperLock zookeeper node:" + connStr);
}
public static boolean getLock(String lockname) throws KeeperException,
InterruptedException, IOException {
connect(connStr, TIMEOUT);
if (lockname.contains("-")) {
throw new RuntimeException("鎖名稱不能包含'-'");
}
boolean lock = false;
String path = zk.create("/" + lockname + "-", new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
int selfIndex = getIndex(path);
List<String> children = zk.getChildren("/", false);
int min = getMinIndex(children);
if (min == selfIndex) {
lock = true;
}
return lock;
}
public static void releaseLock(String lockname)
throws InterruptedException, KeeperException {
disconnect();
}
private static int getIndex(String str) {
int index = -1;
int pos = str.lastIndexOf("-");
if (pos >= 0) {
try {
index = Integer.parseInt(str.substring(pos + 1));
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
return index;
}
private static int getMinIndex(List<String> list) {
int min = Integer.MAX_VALUE;
for (String ele : list) {
int index = getIndex(ele);
if (index < 0) {
throw new RuntimeException("SEQUENTIAL節點名中不包含數字:" + ele);
}
if (index < min) {
min = index;
}
}
return min;
}
private static void waitUntilConnected(CountDownLatch connectedLatch) {
if (States.CONNECTING == zk.getState()) {
try {
connectedLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
public static boolean connect(String hostPath, int sessionTimeout) {
if (zk == null || zk.getState() == States.CLOSED) {
try {
CountDownLatch connectedLatch = new CountDownLatch(1);
Watcher watcher = new ConnectedWatcher(connectedLatch);
zk = new ZooKeeper(hostPath, sessionTimeout, watcher);
waitUntilConnected(connectedLatch);
} catch (Exception e) {
logger.error("Connect to Zookeeper failed:", e);
return false;
}
}
return true;
}
public static boolean disconnect() {
if (zk != null) {
if (States.CLOSED != zk.getState()) {
try {
zk.close();
} catch (InterruptedException e) {
logger.error("Disconnect from Zookeeper failed:", e);
return false;
}
}
}
return true;
}
static class ConnectedWatcher implements Watcher {
private CountDownLatch connectedLatch;
ConnectedWatcher(CountDownLatch connectedLatch) {
this.connectedLatch = connectedLatch;
}
@Override
public void process(WatchedEvent event) {
// 事件狀態為SyncConnected時,說明與服務端的連接已建立好
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
}
}
}
public static void main(String[] args) {
String lockname = "writeHitCount2DBlock";
System.out.println("begin to run.");
ZooKeeperLock.setServerPath("192.168.119.96:2181");
try {
boolean havelock = ZooKeeperLock.getLock(lockname);
if (havelock) {
Date date = new Date();
System.out
.println("I got the lock,and I will write DB!" + date);
Thread.sleep(1000);// 休息一段時間之后再釋放鎖
}
System.out.println("Job done, I will release the lock.");
ZooKeeperLock.releaseLock(lockname);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

