轉載:http://www.mamicode.com/info-detail-494364.html
標簽:
ZooKeeper原生的API支持通過注冊Watcher來進行事件監聽,但是Watcher通知是一次性的,因此開發過程中需要反復注冊Watcher,比較繁瑣。Curator引入了Cache來監聽ZooKeeper服務端的事件。Cache對ZooKeeper事件監聽進行了封裝,能夠自動處理反復注冊監聽,簡化了ZooKeeper原生API繁瑣的開發過程。
簡單的示例:
package com.huey.dream.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* Curator事件監聽
* @author huey
* @version 1.0
* @created 2015-3-2
*/
public class CarutorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.1.109:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
client.create()
.creatingParentsIfNeeded()
.forPath("/zk-huey/cnode", "hello".getBytes());
/**
* 在注冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理
*/
ExecutorService pool = Executors.newFixedThreadPool(2);
/**
* 監聽數據節點的變化情況
*/
final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);
nodeCache.start(true);
nodeCache.getListenable().addListener(
new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Node data is changed, new data: " +
new String(nodeCache.getCurrentData().getData()));
}
},
pool
);
/**
* 監聽子節點的變化情況
*/
final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED: " + event.getData().getPath());
break;
default:
break;
}
}
},
pool
);
client.setData().forPath("/zk-huey/cnode", "world".getBytes());
Thread.sleep(10 * 1000);
pool.shutdown();
client.close();
}
}
