上一章我們知道zookeeper的簡介,啟動,設置節點以及結構性能。本小節我們來玩玩api,獲取下數據。
php版本: http://anykoro.sinaapp.com/2013/04/05/%E4%BD%BF%E7%94%A8apache-zookeeper%E5%88%86%E5%B8%83%E5%BC%8F%E9%83%A8%E7%BD%B2php%E5%BA%94%E7%94%A8%E7%A8%8B%E5%BA%8F/
go版本:http://mmcgrana.github.io/2014/05/getting-started-with-zookeeper-and-go.html
讀一下:http://zookeeper.apache.org/doc/trunk/javaExample.html
然后我說 what the fuck it is?
我就想讀個數據,需要這么復雜么。。。
動手改一下
版本1: 只獲取數據,不管別的:
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; public class ZkReader { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String hostPort = "192.168.1.2,192.168.1.3,192.168.1.4"; String znode = "/test"; ZooKeeper zk = new ZooKeeper(hostPort, 3000, null); System.out.println(new String(zk.getData(znode,false,null))); } }
在zkcli上創建 /test 並改變它的值:123,運行,輸出:
123
能得到結果,但是報錯了:
14/10/17 11:51:58 ERROR zookeeper.ClientCnxn: Error while calling watcher java.lang.NullPointerException at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:521) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:497)
看下源碼,需要注冊個watcher,意思是不這樣zookeeper就只是個純配置了?ok
版本2:zk get data+watcher
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; public class ZkReader { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13"; String znode = "/test"; ZooKeeper zk = new ZooKeeper(hostPort, 3000, new MyWatcher()); System.out.println(new String(zk.getData(znode,false,null))); } } class MyWatcher implements Watcher { @Override public void process(WatchedEvent event) { System.out.println("hello zookeeper"); System.out.println(String.format("hello event! type=%s, stat=%s, path=%s",event.getType(),event.getState(),event.getPath())); } }
輸出卻是:
hello zookeeper
123
hello event! type=None, stat=SyncConnected, path=null
data總是在中間?百撕不得姐,在郵件組里咨詢下,幾天后有了回復(不夠活躍的郵件組了):
“
Zookeeper works asynchronously in several threads. Therefore the sequence of execution in different threads is not generally predictable. It could therefore happen that when the connection status change is detected, the Watcher is executed, but only the first "hello zookeeper" gets echoed, then the main thread gets some cycles again and prints "123", after which the second print statement "hello event!..." is executed. If you don't want this to happen, use a CountDownLatch to make the main thread wait until the Zookeeper connection is established and propertly recognized in your program. The main thread creates the CountDownLatch(1), opens the Zk connection and waits latch.await(). The Watcher does its job and then counts the latch down by one, causing the main thread to leave the await and continue doing its job.
”
被認為是多線程問題,建立zk連接時會啟動多個線程:sendThread eventThread
eventThread執行到一半時,主線程獲得了cpu,打印出結果,然后eventThread繼續執行watcher.process。
這兩個版本只是做到了獲取數據,如果數據有變動,需要自動更新呢?ok,參照zk給的例子,簡化出第三個版本:
DataMonitor.java :
/** * A simple class that monitors the data and existence of a ZooKeeper * node. It uses asynchronous ZooKeeper APIs. */ import java.util.Arrays; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void showData(byte data[]); } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() != Event.EventType.None) { System.out.println("watch event type: "+event.getType()); if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } } public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc : "+rc); byte b[] = null; try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.showData(b); prevData = b; } } }
Executor.java:
import java.io.IOException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { DataMonitor dm; ZooKeeper zk; public Executor(String hostPort, String znode) throws KeeperException, IOException { zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, this); } public static void main(String[] args) { String hostPort = "192.168.1.22,192.168.1.12,192.168.1.13"; String znode = "/test"; try { new Executor(hostPort, znode).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher# */ public void process(WatchedEvent event) { System.out.println("Executor process event: "+event.getType()); dm.process(event); } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { } } public void showData(byte[] data) { System.out.println("data changes: "+new String(data)); } }
一個執行者一個監控,注冊watcher到zk,當有事件發生時,推送本身的StatCallback到Zookeeper,當節點有變動時調用processResult展示結果。
Executor process event: NodeDataChanged
watch event type: NodeDataChanged
rc : 0
data changes: abcd
還是有點復雜,仔細看下DataMonitor似乎沒有存在的必要,我只需要一個類,啟動zk client,並監聽數據變化就好了,於是有了第四個單對象版本:
Executor.java
import java.io.IOException; import java.util.Arrays; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; public class Executor implements Watcher, Runnable, AsyncCallback.StatCallback { ZooKeeper zk; String znode; byte prevData[]; public Executor(String hostPort, String znode) throws KeeperException, IOException { zk = new ZooKeeper(hostPort, 3000, this); this.znode = znode; zk.exists(znode, true, this, null); } public static void main(String[] args) { String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13"; String znode = "/test"; try { new Executor(hostPort, znode).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher# */ public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() != Event.EventType.None) { System.out.println("watch event type: "+event.getType()); if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { } } public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc : "+rc); byte b[] = null; try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { System.out.println("data changes: "+new String(b)); prevData = b; } } }
自己做watcher,並注冊回調函數給zk,更簡潔。
經測試,zk三台停掉一台,剩一主一從,仍能正常服務,剩一台時則報錯,無法連接,重啟動zk變成兩台,客戶端也無法恢復,重啟了才恢復。
看了php api,理解了一下zk.exists 做的操作,exists和get方法都會注冊回調過去,一個是注冊watcher,一個是注冊StatCallback,當觸發事件時,監視器會被消費掉,所以我們需要在回調函數中再次設置監視器。於是有了第五個版本
import java.io.IOException; import org.apache.zookeeper.*; public class Executor implements Watcher, Runnable { ZooKeeper zk; String znode; public Executor(String hostPort, String znode) throws KeeperException, IOException, InterruptedException { zk = new ZooKeeper(hostPort, 30000, this); this.znode = znode; zk.getData(znode, this, null); } public static void main(String[] args) { String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13"; String znode = "/test"; try { new Executor(hostPort, znode).run(); } catch (Exception e) { e.printStackTrace(); } } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() != Event.EventType.None) { System.out.println("watch event type: "+event.getType()); if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out try { System.out.println(new String(zk.getData(znode, this, null))); } catch (KeeperException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { } } }
上邊這兩個版本已經可以檢測到zk的數據節點變動,但沒有處理異常情況,沒有處理close事件,大家可以自己動手改造下難懂的http://zookeeper.apache.org/doc/trunk/javaExample.html 例子。
更多java api操作(創建節點、刪除修改等):http://www.cnblogs.com/haippy/archive/2012/07/19/2600032.html