zookeeper framework 之 Netflix curator(完美支持永久監聽)
介紹
curator是Netflix公司開源的zookeeper client library
官方地址:https://github.com/Netflix/curator/wiki/Recipes
詳細介紹1:http://macrochen.iteye.com/blog/1366136/
詳細介紹2:http://blog.csdn.net/alivetime/article/details/7101014
Curator主要解決了三類問題:
- 封裝ZooKeeper client與ZooKeeper server之間的連接處理;
- 提供了一套Fluent風格的操作API;
- 提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 集群領導選舉機制)的抽象封裝.
Curator幾個組成部分
- Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法.
- Framework: 用來簡化ZooKeeper高級功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper集群的連接, 重試處理
- Recipes: 實現了通用ZooKeeper的recipe, 該組件建立在Framework的基礎之上
- Utilities:各種ZooKeeper的工具類
- Errors: 異常處理, 連接, 恢復等.
- Extensions: recipe擴展
maven dependency
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>1.3.3</version>
</dependency>
curator framework 使用
String path = "/test_path";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("test:2181").namespace("/test1")
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
.connectionTimeoutMs(5000).build();
// start
client.start();
// create a node
client.create().forPath("/head", new byte[0]);
// delete a node in background
client.delete().inBackground().forPath("/head");
// create a EPHEMERAL_SEQUENTIAL
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
// get the data
client.getData().watched().inBackground().forPath("/test");
// check the path exits
client.checkExists().forPath(path);
InterProcessMutex(進程間互斥鎖)
String lockName = "/lock1";
InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName);
InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName);
lock1.acquire();
boolean result = lock2.acquire(1, TimeUnit.SECONDS);
assertFalse(result);
lock1.release();
result = lock2.acquire(1, TimeUnit.SECONDS);
assertTrue(result);
原理:每次調用acquire在lock1節點下使用createMode.EPHEMERAL_SEQUENTIAL創建的ephemeral節點,然后getChildren獲取所有的children,判斷剛剛創建的臨時節點是否為第一個,如果是,則獲取鎖成功;如果不是則刪除剛剛創建的臨時節點。
注意:每次accquire操作,成功則請求zkserver 2次,一次寫,一次getChildren;如果失敗則請求zkserver三次(一次寫,一次getChildren,一次刪除)
InterProcessReadWriteLock(節點讀寫鎖)
示例
@Test
public void testReadWriteLock() throws Exception{
String readWriteLockPath = "/RWLock";
InterProcessReadWriteLock readWriteLock1 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
InterProcessMutex writeLock1 = readWriteLock1.writeLock();
InterProcessMutex readLock1 = readWriteLock1.readLock();
InterProcessReadWriteLock readWriteLock2 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
InterProcessMutex writeLock2 = readWriteLock2.writeLock();
InterProcessMutex readLock2 = readWriteLock2.readLock();
writeLock1.acquire();
// same with WriteLock, can read
assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
// different lock, can't read while writting
assertFalse(readLock2.acquire(1, TimeUnit.SECONDS));
// different write lock, can't write
assertFalse(writeLock2.acquire(1, TimeUnit.SECONDS));
// release the write lock
writeLock1.release();
//both read lock can read
assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
assertTrue(readLock2.acquire(1, TimeUnit.SECONDS));
}
原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。
注意: 同一個InterProcessReadWriteLock如果已經獲取了write lock,則獲取read lock也會成功
LeaderSelector(leader 選舉)
@Test
public void testLeader() throws Exception{
LeaderSelectorListener listener = new LeaderSelectorListener(){
@Override
public void takeLeadership(CuratorFramework client)
throws Exception {
System.out.println("i'm leader");
}
@Override
public void handleException(CuratorFramework client,
Exception exception) {
}
@Override
public void notifyClientClosing(CuratorFramework client) {
}};
String leaderPath = "/leader";
LeaderSelector selector1 = new LeaderSelector(this.curator, leaderPath, listener);
selector1.start();
LeaderSelector selector2 = new LeaderSelector(this.curator, leaderPath, listener);
selector2.start();
assertFalse(selector2.hasLeadership());
}
原理:內部基於InterProcessMutex實現
NodeCache(監聽節點數據)
/**
* 在注冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理
*/
ExecutorService pool = Executors.newFixedThreadPool(2);
/**
* 監聽數據節點的變化情況
*/
final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);
nodeCache.start(true);
nodeCache.getListenable().addListener(
new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Node data is changed, new data: " +
new String(nodeCache.getCurrentData().getData()));
}
},
pool
);
PathChildrenCache(監聽子節點目錄變化)
/**
* 監聽子節點的變化情況
*/
final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED: " + event.getData().getPath());
break;
default:
break;
}
}
},
pool
);