兩年前寫過ZooKeeper編程(一),那時候還在實習。近期組內做了個zookeeper編程的分享,就又把各種問題整理了一下。以下只是簡單地copy了幻燈片中的內容,寫得不夠連貫,讀者見諒。
ZooKeeper的輪廓
/---root
|
\----child1
|
\----child2
|
\----child3
|
\----grandson1
|
\----grandson2
采用了簡化的praxos算法來確保zookeeper集群節點的數據一致性
只要Quorum的成員有一半以上處於正常狀態,就能對外提供服務
任何修改命令都需要leader協調。 在leader的協調過程中,需要3次leader與Follower之間的來回請求響應。並且在此過程中還會涉及事務日志的記錄,更糟糕的情況是還有take snapshot的操作。因此此過程可能比較耗時。但Zookeeper的通信中最大特點是異步的,如果請求是連續不斷的,Zookeeper的處理是集中處理邏輯,然后批量發送,批量的大小也是有控制的。如果請求量不大,則即刻發送。這樣當負載很大時也能保證很大的吞吐量,時效性也在一定程度上進行了保證。
Zookeeper特性
服務端配置
# 心跳間隔時間
tickTime=2000
# 最小SessionTimeou
minSessionTimeout=4000
# 最大SessionTimeou
maxSessionTimeout=100000
# 允許 follower (相對於 leader 而言的“客戶端”)連接並同步到 leader 的初始化連接時間為tickTime的多少倍,超過這個時間則連接失敗
initLimit=10
# eader 與 follower 之間發送消息,請求和應答時間長度。如果 follower 在設置的時間內不能與 leader 進行通信,那么此 follower 將被丟棄
syncLimit=5
# 客戶端連接最大數
maxClientCnxns=30
客戶端配置
ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181, 192.168.119.97:2181 , 192.168.119.98:2181 /app/learn", TIMEOUT,null));
import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * * @description ZooKeeper基本讀寫操作演示類 * @author zhangchaoyang * @date 2014-6-22 */ public class SimplestDemo { private static final int TIMEOUT = 3000; public static void main(String[] args) throws IOException, KeeperException, InterruptedException { // Client向zookeeper發送連接請求 ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", // 指定zookeeper // server的IP、端口列表(當client連接不上server時會按照此列表嘗試連接下一台server),以及默認的根目錄 TIMEOUT,// Session Timeout null// 是否設置監聽器 ); zkp.create("/znodename", // 節點名稱 "znodedata".getBytes(), // 節點上的數據 Ids.OPEN_ACL_UNSAFE,// ACL CreateMode.EPHEMERAL// 節點類型,有三種:PERSISTENT、EPHEMERAL、SEQUENTIAL。EPHEMERAL節點不允許有子節點 ); Stat stat = zkp.exists("/znodename",// 節點名,如果節點不存在,則exists()返回null false// 是否設置監聽器 ); if (zkp.exists("/znodename", false) != null) { System.out.println("znodename exists now."); } // 修改節點上存儲的數據,需要提供version,version設為-1表示強制修改 zkp.setData("/znodename", "newdata".getBytes(), stat.getVersion()); // 讀取節點上的數據 String data = new String(zkp.getData("/znodename", false, stat)); System.out.println(data); // client端主動斷開連接 zkp.close(); } }
Session
import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.ZooKeeper; /** * * @description Zookeeper Session演示類 * @author zhangchaoyang * @date 2014-6-22 */ public class SessionDemo { /** * zoo.cfg中的配置: * * <pre> * tickTime=2000 * minSessionTimeout=4000(至少是tickTime的2倍) * maxSessionTimeout=40000(最大是tickTime的20倍) * </pre> * * 如果客戶端建立連接時指定的TIMEOUT不在[minSessionTimeout,maxSessionTimeout]區間內, * 服務端會強制把它修改到該區間內 */ private static final int TIMEOUT = 40000; // Session // Timeout設為40秒,因為心跳周期為2秒,所以如果server向client連續發送20個心跳都收不到回應,則Session過期失效 private static ZooKeeper zkp = null; private static void connect() throws IOException { zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null); } private static void createNode() throws KeeperException, InterruptedException { if (zkp != null) { zkp.create("/znodename", "znodedata".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } } private static String getData() throws KeeperException, InterruptedException { if (zkp != null) { Stat stat = zkp.exists("/znodename", false); return new String(zkp.getData("/znodename", false, stat)); } return null; } private static void disconnect() throws InterruptedException { if (zkp != null) { zkp.close(); } } /** * 休息,在此期間我們有三種選擇:<br> * <ol> * <li>永久性斷開網絡連接 * <li>斷開網絡連接一段時間timespan后再連上,其中timespan<{@code TIMEOUT} * <li>斷開網絡連接一段時間timespan后再連上,其中timespan>{@code TIMEOUT} * </ol> */ private static void sleepForNetworkDisturbances() { try { Thread.sleep(2 * TIMEOUT); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { try { connect(); } catch (IOException e) { System.err .println("Can't create zookeeper client, please check the network."); } System.out.println("Session build."); try { createNode(); } catch (Exception e) { System.err.println("Create znode failed."); } System.out.println("znode created."); sleepForNetworkDisturbances(); try { String data = getData(); if (data != null) { // 在“休息”期間做了第2件事情,Sesion沒有過期,EPHEMERAL節點依然存在 System.out.println("data=" + data); } } catch (KeeperException e) { e.printStackTrace(); // 在“休息”期間做了第1件事情 if (e instanceof ConnectionLossException) { System.err .println("Oops, network is disconnected. Retry getData()."); // 如果session沒有失效,而僅僅是網絡異常,則可以重新嘗試獲取數據,可能在重試時網絡已經正常了 try { Thread.sleep(1000); String data = getData(); if (data != null) { System.out.println("data=" + data); } else { System.out.println("can't get data."); } } catch (Exception e1) { e1.printStackTrace(); } } // 在“休息”期間做了第3件事情,則session會過期 else if (e instanceof SessionExpiredException) { System.err .println("Session Expired, client will reconnect and create znode again."); // 當發再Session Expired時,必須重新建立連接,即new一個ZooKeeper try { connect(); createNode(); String data = getData(); if (data != null) { System.out.println("data=" + data); } else { System.out.println("can't get data."); } } catch (Exception e1) { e1.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } try { disconnect(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Client disconnected."); } }
Watcher
什么樣的操作會產生什么類型的事件:
|
event For “/path” |
event For “/path/child” |
create(“/path”) |
EventType.NodeCreated |
-- |
delete(“/path”) |
EventType.NodeDeleted |
-- |
setData(“/path”) |
EventType.NodeDataChanged |
-- |
create(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeCreated |
delete(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeDeleted |
setData(“/path/child”) |
-- |
EventType.NodeDataChanged |
什么操作會觸發EventType.None?
事件類型與watcher的對應關系:
event For “/path” |
Default Watcher |
exists |
getData |
getChildren |
EventType.None |
√ |
√ |
√ |
√ |
EventType.NodeCreated |
|
√ |
√ |
|
EventType.NodeDeleted |
|
√ |
√ |
|
EventType.NodeDataChanged |
|
√ |
√ |
|
EventType.NodeChildrenChanged |
|
|
|
√ |
操作與watcher的對應關系:
"/path" |
"/path/child" |
|||||
|
exists |
getData |
getChildren |
exists |
getData |
getChildren |
create(“/path”) |
√ |
√ |
|
|
|
|
delete(“/path”) |
√ |
√ |
√ |
|
|
|
setData(“/path”) |
√ |
√ |
|
|
|
|
create(“/path/child”) |
|
|
√ |
√ |
√ |
|
delete(“/path/child”) |
|
|
√ |
√ |
√ |
√ |
setData(“/path/child”) |
|
|
|
√ |
√ |
|
import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * * @description Zookeeper Watcher演示類 * @author zhangchaoyang * @date 2014-6-22 */ public class WatcherDemo { private static ZooKeeper zkp = null; private static final int TIMEOUT = 6000; private static Watcher getWatcher(final String msg) { return new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(msg + "上的監聽被觸發\t事件類型" + event.getType() + "\t發生變化的節點" + event.getPath()); } }; } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { System.out.println("--------------1----------------"); zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, getWatcher("CONNECT")); Thread.sleep(1000); System.out.println("--------------2----------------"); zkp.create("/znodename", "znodedata".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zkp.create("/znodename/childnode", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zkp.exists("/znodename", getWatcher("EXISTS")); zkp.getChildren("/", getWatcher("GETCHILDREN")); zkp.getData("/znodename", getWatcher("GETDATA"), stat); stat = zkp.exists("/znodename/childnode", getWatcher("EXISTS")); zkp.getChildren("/znodename", getWatcher("GETCHILDREN")); zkp.getData("/znodename/childnode", getWatcher("GETDATA"), stat); // zkp.close(); zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, getWatcher("CONNECT")); Thread.sleep(1000); System.out.println("--------------3----------------"); zkp.delete("/znodename/childnode", -1); zkp.delete("/znodename", -1); zkp.close(); } }
import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * * @description 自定義持久性的zookeeper watcher * @author zhangchaoyang * @date 2014-6-22 */ public class PersistWatcher { private static final int TIMEOUT = 6000; private static final String znode = "/globalconfnode"; private static String globalConfData = ""; private static Watcher getConnectWatcher() { return new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType().equals(EventType.None)) { System.out.println("連接狀態發生變化。"); } } }; } private static Watcher getExistsWatcher(final ZooKeeper zkp) { return new Watcher() { @Override public void process(WatchedEvent event) { try { if (event.getType().equals(EventType.NodeDataChanged) || event.getType().equals(EventType.NodeCreated)) { // 節點被創建或修改時更新緩存中的值 Stat stat = zkp.exists(znode, this);// 再次注冊監聽 String data = new String( zkp.getData(znode, false, stat)); globalConfData = data; } else if (event.getType().equals(EventType.NodeDeleted)) { // 節點被刪除時報警 System.out .println("global configuration node have been deleted!"); try { // 再次注冊監聽 zkp.exists(znode, this); } catch (KeeperException e) { if (e instanceof ConnectionLossException) { System.out.println("連接已斷開。"); } } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }; } public static void main(String[] args) { try { ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, getConnectWatcher()); zkp.exists(znode, getExistsWatcher(zkp)); zkp.create(znode, "config_value".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(500);// 修改節點后必須sleep,等待watcher回調完成 System.out.println(globalConfData); for (int i = 0; i < 4; i++) { zkp.setData(znode, ("config_value" + i).getBytes(), -1); Thread.sleep(500);// 修改節點后必須sleep,等待watcher回調完成 System.out.println(globalConfData); } zkp.close();// EPHEMERAL節點會被刪除,但Session並不會馬上失效(只不過ConnectionLoss了),所以還是會觸發watcher try { // 此時Session已失效 zkp.exists(znode, false); } catch (KeeperException e) { if (e instanceof SessionExpiredException) System.out.println("Session已失效。"); } } catch (IOException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
ACL
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; /** * * @description Zookeeper ACL演示類 * @author zhangchaoyang * @date 2014-6-22 */ public class AclDemo { private static final int TIMEOUT = 6000; public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null); String schema = "digest";// schema類型有:world,auth,digest,ip,super String auth = "username:password"; zkp.addAuthInfo(schema, auth.getBytes()); List<ACL> acls = new ArrayList<ACL>(); for (ACL id : Ids.CREATOR_ALL_ACL) { acls.add(id); } zkp.create("/znodename", "znodedata".getBytes(), acls, CreateMode.PERSISTENT); ZooKeeper zoo = null; try { zoo = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null); System.out.println("采用不合法的認證方式:"); String badAuthentication = "username:wrongpass"; zoo.addAuthInfo(schema, badAuthentication.getBytes()); zoo.getData("/znodename", null, null); } catch (KeeperException e) { if (e instanceof NoAuthException) { System.out.println("認證失敗:" + e.getMessage()); } System.out.println("采用合法的認證方式:"); zoo.addAuthInfo(schema, auth.getBytes()); String data = new String(zoo.getData("/znodename", null, null)); if (data != null) { System.out.println("認證成功:data=" + data); } } finally { if (zoo != null && zoo.getState().isAlive()) { zoo.close(); } } zkp.delete("/znodename", -1); zkp.close(); } }
開源工具menagerie
import java.io.IOException; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; /** * * @description 使用ZooKeeper實現分布式鎖 * @author zhangchaoyang * @date 2014-6-22 */ public class ZooKeeperLock { private static Logger logger = Logger.getLogger(ZooKeeperLock.class); private static ZooKeeper zk = null; private static final int TIMEOUT = 1000 * 60; private static String connStr = null; public static void setServerPath(String path) { connStr = path + "/app/bqas/lock"; logger.info("ZooKeeperLock zookeeper node:" + connStr); } public static boolean getLock(String lockname) throws KeeperException, InterruptedException, IOException { connect(connStr, TIMEOUT); if (lockname.contains("-")) { throw new RuntimeException("鎖名稱不能包含'-'"); } boolean lock = false; String path = zk.create("/" + lockname + "-", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); int selfIndex = getIndex(path); List<String> children = zk.getChildren("/", false); int min = getMinIndex(children); if (min == selfIndex) { lock = true; } return lock; } public static void releaseLock(String lockname) throws InterruptedException, KeeperException { disconnect(); } private static int getIndex(String str) { int index = -1; int pos = str.lastIndexOf("-"); if (pos >= 0) { try { index = Integer.parseInt(str.substring(pos + 1)); } catch (NumberFormatException e) { e.printStackTrace(); } } return index; } private static int getMinIndex(List<String> list) { int min = Integer.MAX_VALUE; for (String ele : list) { int index = getIndex(ele); if (index < 0) { throw new RuntimeException("SEQUENTIAL節點名中不包含數字:" + ele); } if (index < min) { min = index; } } return min; } private static void waitUntilConnected(CountDownLatch connectedLatch) { if (States.CONNECTING == zk.getState()) { try { connectedLatch.await(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } } public static boolean connect(String hostPath, int sessionTimeout) { if (zk == null || zk.getState() == States.CLOSED) { try { CountDownLatch connectedLatch = new CountDownLatch(1); Watcher watcher = new ConnectedWatcher(connectedLatch); zk = new ZooKeeper(hostPath, sessionTimeout, watcher); waitUntilConnected(connectedLatch); } catch (Exception e) { logger.error("Connect to Zookeeper failed:", e); return false; } } return true; } public static boolean disconnect() { if (zk != null) { if (States.CLOSED != zk.getState()) { try { zk.close(); } catch (InterruptedException e) { logger.error("Disconnect from Zookeeper failed:", e); return false; } } } return true; } static class ConnectedWatcher implements Watcher { private CountDownLatch connectedLatch; ConnectedWatcher(CountDownLatch connectedLatch) { this.connectedLatch = connectedLatch; } @Override public void process(WatchedEvent event) { // 事件狀態為SyncConnected時,說明與服務端的連接已建立好 if (event.getState() == KeeperState.SyncConnected) { connectedLatch.countDown(); } } } public static void main(String[] args) { String lockname = "writeHitCount2DBlock"; System.out.println("begin to run."); ZooKeeperLock.setServerPath("192.168.119.96:2181"); try { boolean havelock = ZooKeeperLock.getLock(lockname); if (havelock) { Date date = new Date(); System.out .println("I got the lock,and I will write DB!" + date); Thread.sleep(1000);// 休息一段時間之后再釋放鎖 } System.out.println("Job done, I will release the lock."); ZooKeeperLock.releaseLock(lockname); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }