java 操作zookeeper(一)
首先要使用java操作zookeeper,zookeeper的javaclient 使我們更輕松的去對zookeeper進行各種操作,我們引入zookeeper-3.4.5.jar 和 zkclient-0.1.jar即可。
zookeeper-3.3.4.jar 為官方提供的javaApi,zkClient-0.1jar為在源生api基礎上進行擴展的開源JAVA客戶端
創建會話方法:客戶端可以通過創建一個zookeeper實例來鏈接zookeeper服務器。
Zookeeper(Arguments)方法(一共4個構造方法,根據參數不同)
參數說明如下:
connectString:連接服務器列表,已“,”分割。
sessionTimeOut:心跳檢測時間周期(毫秒)
watcher:事件處理通知器。
canBereadOnly:標識當前會話是否支持只讀。
sessionId和sessionPasswd:提供連接zookeeper的sessionId和密碼,通過這兩個確定唯一一台客戶端,目的是可以提供重復會話
注意:zookeeper客戶端和服務器端會話的建立是一個異步的過程,也就是說在程序中,我們程序方法在處理完客戶端初始化后,立即返回(程序往下執行代碼,這樣,大多數情況下我們並沒有真正構建好一個可用會話,在會話的聲明周期處於"CONNECTING"時才算真正建立完畢,所以我們需要使用多線程中的一個工具類)
java 操作zookeeper(二)
創建節點(znode)方法,create
提供了兩套創建節點的方法,同步和異步創建節點方式。
同步方式:
參數1,節點路徑(名稱):/nodeName (不允許遞歸創建節點,也就是說在父節點不存在的情況下,不允許創建子節點)
參數2,節點內容:要求類型是字節數組 (不支持序列化方式,如果需要實現序列化,可使用java相關的序列化框架,如Hession,Kryo框架)
參數3,節點權限:使用Ids.OPEN_ACL_UNSAFE開發權限即可(一般在權限沒有太高要求的場景下,沒必要關注)
參數4,節點類型 創建節點的類型:CreateMode.* 提供四種節點類型
PERSISTENT (持久節點)
PERSISTENT_SEQUENTIAL(持久順序節點)
EPHEMRAL(臨時節點)
EPHEMRAL_SEQUENTIAL(臨時順序節點)
java 操作zookeeper(三)
異步方式:(在同步基礎上增加兩個參數)
參數5:注冊一個異步回調函數,要實現AsynCallBack.StringCallBack接口。重寫processResult(int rc,String path,Object ctx,String name) 方法,當節點創建完畢后,執行此方法。
1.rc:為服務端響應碼, 0表示調用成功,-4表示端口連接,-110表示指定節點存在,-112表示會話已經過期
2.path:接口調用時傳入API的數據節點的路徑參數
3.ctx:為調用接口傳入API的ctx的值
4.name:實際在服務器端創建節點的名稱
參數6傳遞給回調函數的參數,一般為上下文(Context)信息
java 操作zookeeper(四)
刪除節點:delete方法(api提供了兩個接口,同步刪除和異步刪除方式)
同步方式:
參數1,節點名稱 /deletePath
參數2,版本號,即表示本次刪除操作是針對該數據的某個版本進行操作。
異步方式:
參數3:一個異步回調函數
參數4:用於傳遞上下文信息的對象。
注意:在zookeeper中,只允許刪除葉子節點信息,也就是說如果當前節點不是葉子節點則無法刪除,或必須先刪除其下所有的子節點,
java 操作zookeeper(五)
getChildren讀取數據方法:包括子節點列表獲取和子節點數據的獲取。
參數1 path:獲取指定節點下的數據(獲取子節點列表)
參數2 watcher: 注冊的watcher ,一旦在本次子節點獲取后,子節點列表發生變更的話,那么就會向客戶端發送通知,該參數允許為null
參數3 wath:表明是否需要注冊一個watcher:如果為true,則會使用到zookeeper客戶端上下文中提到的那個默認watcher,如果false,則表明不需要注冊Watcher,
參數4 cb: 回調函數
參數5 ctx:上下文信息
參數6 stat :指定數據節點的節點狀態信息
注意:當我們獲取指定節點的子節點列表后,還需要訂閱這個子節點列表的變化通知,這時候就可以通過注冊一個watcher來實現,當子節點被添加或刪除時,服務器端就會觸發一個“NodeChildrenChanged”類型的時間通知,需要注意的是服務端發送給客戶端的事件通知中,是不包含最新的節點列表的,客戶端必須主動重新進行獲取,通常在客戶端收到這個事件通知后,就可以再次主動獲取最新的子節點列表,也就是說,zookeeper服務端在想客戶端發送watcher“NodeChildrenChanged”事件通知的時候,僅僅只發了一個通知,不會把節點的變化情況發給客戶端,需要客戶端自己重新獲取,另外Watcher通知是一次性的,即觸發后失效,因此客戶端需要反復注冊Watcher才行。
java 操作zookeeper(六)
exists方法:檢測節點是都存在
參數1 path:路徑
參數2 watcher :注冊的watcher對象。一旦之后節點內容有變更,則會向客戶端發送通知,該參數允許為null (用於三類事件監聽:節點的創建,刪除,更新)
參數3 watch :是否使用watcher,如果true 則使用默認上文中的watcher,false則不使用watcher
參數4 cb:回調函數
參數5 ctx:用於傳遞的下文信息對象
注意:exists 方法意義在與無論節點是都存在,都可以進行注冊watcher,能夠對節點的創建,刪除和修改進行監聽,但是其子節點發生各種變化,都不會通知客戶端。
1 package bjsxt.zookeeper.base; 2 3 import java.util.concurrent.CountDownLatch; 4 5 import org.apache.zookeeper.WatchedEvent; 6 import org.apache.zookeeper.Watcher; 7 import org.apache.zookeeper.Watcher.Event.EventType; 8 import org.apache.zookeeper.Watcher.Event.KeeperState; 9 import org.apache.zookeeper.ZooKeeper; 10 11 /** 12 * Zookeeper base學習筆記 13 */ 14 public class ZookeeperBase { 15 16 /** zookeeper地址 */ 17 static final String CONNECT_ADDR = "192.168.2.2:2181"; 18 /** session超時時間 */ 19 static final int SESSION_OUTTIME = 2000;// ms 20 /** 信號量,阻塞程序執行,用於等待zookeeper連接成功,發送成功信號 */ 21 static final CountDownLatch connectedSemaphore = new CountDownLatch(1); 22 23 public static void main(String[] args) throws Exception { 24 25 ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, 26 new Watcher() { 27 @Override 28 public void process(WatchedEvent event) { 29 // 獲取事件的狀態 30 KeeperState keeperState = event.getState(); 31 EventType eventType = event.getType(); 32 // 如果是建立連接 33 if (KeeperState.SyncConnected == keeperState) { 34 if (EventType.None == eventType) { 35 // 如果建立連接成功,則發送信號量,讓后續阻塞程序向下執行 36 System.out.println("zk 建立連接"); 37 connectedSemaphore.countDown(); 38 } 39 } 40 } 41 }); 42 43 // 進行阻塞 44 connectedSemaphore.await(); 45 46 System.out.println(".."); 47 // 創建父節點 48 // zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, 49 // CreateMode.PERSISTENT); 50 51 // 創建子節點 52 // zk.create("/testRoot/children", "children data".getBytes(), 53 // Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 54 55 // 獲取節點洗信息 56 // byte[] data = zk.getData("/testRoot", false, null); 57 // System.out.println(new String(data)); 58 // System.out.println(zk.getChildren("/testRoot", false)); 59 60 // 修改節點的值 61 // zk.setData("/testRoot", "modify data root".getBytes(), -1); 62 // byte[] data = zk.getData("/testRoot", false, null); 63 // System.out.println(new String(data)); 64 65 // 判斷節點是否存在 66 // System.out.println(zk.exists("/testRoot/children", false)); 67 // 刪除節點 68 // zk.delete("/testRoot/children", -1); 69 // System.out.println(zk.exists("/testRoot/children", false)); 70 71 zk.close(); 72 73 } 74 75 }