java 操作zookeeper


java 操作zookeeper(一)

首先要使用java操作zookeeper,zookeeper的javaclient 使我們更輕松的去對zookeeper進行各種操作,我們引入zookeeper-3.4.5.jar 和 zkclient-0.1.jar即可。

zookeeper-3.3.4.jar 為官方提供的javaApi,zkClient-0.1jar為在源生api基礎上進行擴展的開源JAVA客戶端

創建會話方法:客戶端可以通過創建一個zookeeper實例來鏈接zookeeper服務器。

Zookeeper(Arguments)方法(一共4個構造方法,根據參數不同)

參數說明如下:

connectString:連接服務器列表,已“,”分割。

sessionTimeOut:心跳檢測時間周期(毫秒)

watcher:事件處理通知器。

canBereadOnly:標識當前會話是否支持只讀。

sessionId和sessionPasswd:提供連接zookeeper的sessionId和密碼,通過這兩個確定唯一一台客戶端,目的是可以提供重復會話

注意:zookeeper客戶端和服務器端會話的建立是一個異步的過程,也就是說在程序中,我們程序方法在處理完客戶端初始化后,立即返回(程序往下執行代碼,這樣,大多數情況下我們並沒有真正構建好一個可用會話,在會話的聲明周期處於"CONNECTING"時才算真正建立完畢,所以我們需要使用多線程中的一個工具類) 

java 操作zookeeper(二)

創建節點(znode)方法,create

提供了兩套創建節點的方法,同步和異步創建節點方式。

同步方式:

  參數1,節點路徑(名稱):/nodeName (不允許遞歸創建節點,也就是說在父節點不存在的情況下,不允許創建子節點)

  參數2,節點內容:要求類型是字節數組 (不支持序列化方式,如果需要實現序列化,可使用java相關的序列化框架,如Hession,Kryo框架)

  參數3,節點權限:使用Ids.OPEN_ACL_UNSAFE開發權限即可(一般在權限沒有太高要求的場景下,沒必要關注)

  參數4,節點類型 創建節點的類型:CreateMode.* 提供四種節點類型

    PERSISTENT (持久節點)

    PERSISTENT_SEQUENTIAL(持久順序節點)

    EPHEMRAL(臨時節點)

    EPHEMRAL_SEQUENTIAL(臨時順序節點)

java 操作zookeeper(三)  

    異步方式:(在同步基礎上增加兩個參數)

    參數5:注冊一個異步回調函數,要實現AsynCallBack.StringCallBack接口。重寫processResult(int rc,String path,Object ctx,String name) 方法,當節點創建完畢后,執行此方法。

      1.rc:為服務端響應碼, 0表示調用成功,-4表示端口連接,-110表示指定節點存在,-112表示會話已經過期

      2.path:接口調用時傳入API的數據節點的路徑參數

      3.ctx:為調用接口傳入API的ctx的值

      4.name:實際在服務器端創建節點的名稱

    參數6傳遞給回調函數的參數,一般為上下文(Context)信息

    

java 操作zookeeper(四)

    刪除節點:delete方法(api提供了兩個接口,同步刪除和異步刪除方式)

    同步方式:

      參數1,節點名稱 /deletePath

      參數2,版本號,即表示本次刪除操作是針對該數據的某個版本進行操作。

    異步方式:

      參數3:一個異步回調函數

      參數4:用於傳遞上下文信息的對象。

    注意:在zookeeper中,只允許刪除葉子節點信息,也就是說如果當前節點不是葉子節點則無法刪除,或必須先刪除其下所有的子節點,

 

java 操作zookeeper(五)

    getChildren讀取數據方法:包括子節點列表獲取和子節點數據的獲取。

    參數1 path:獲取指定節點下的數據(獲取子節點列表)

    參數2 watcher: 注冊的watcher ,一旦在本次子節點獲取后,子節點列表發生變更的話,那么就會向客戶端發送通知,該參數允許為null

    參數3 wath:表明是否需要注冊一個watcher:如果為true,則會使用到zookeeper客戶端上下文中提到的那個默認watcher,如果false,則表明不需要注冊Watcher,

    參數4 cb: 回調函數

    參數5 ctx:上下文信息

    參數6 stat :指定數據節點的節點狀態信息

  注意:當我們獲取指定節點的子節點列表后,還需要訂閱這個子節點列表的變化通知,這時候就可以通過注冊一個watcher來實現,當子節點被添加或刪除時,服務器端就會觸發一個“NodeChildrenChanged”類型的時間通知,需要注意的是服務端發送給客戶端的事件通知中,是不包含最新的節點列表的,客戶端必須主動重新進行獲取,通常在客戶端收到這個事件通知后,就可以再次主動獲取最新的子節點列表,也就是說,zookeeper服務端在想客戶端發送watcher“NodeChildrenChanged”事件通知的時候,僅僅只發了一個通知,不會把節點的變化情況發給客戶端,需要客戶端自己重新獲取,另外Watcher通知是一次性的,即觸發后失效,因此客戶端需要反復注冊Watcher才行。

java 操作zookeeper(六)

    exists方法:檢測節點是都存在

    參數1 path:路徑

    參數2 watcher :注冊的watcher對象。一旦之后節點內容有變更,則會向客戶端發送通知,該參數允許為null (用於三類事件監聽:節點的創建,刪除,更新)

    參數3 watch :是否使用watcher,如果true 則使用默認上文中的watcher,false則不使用watcher

    參數4 cb:回調函數

    參數5 ctx:用於傳遞的下文信息對象

  注意:exists 方法意義在與無論節點是都存在,都可以進行注冊watcher,能夠對節點的創建,刪除和修改進行監聽,但是其子節點發生各種變化,都不會通知客戶端。

 

 

 

 1 package bjsxt.zookeeper.base;
 2 
 3 import java.util.concurrent.CountDownLatch;
 4 
 5 import org.apache.zookeeper.WatchedEvent;
 6 import org.apache.zookeeper.Watcher;
 7 import org.apache.zookeeper.Watcher.Event.EventType;
 8 import org.apache.zookeeper.Watcher.Event.KeeperState;
 9 import org.apache.zookeeper.ZooKeeper;
10 
11 /**
12  * Zookeeper base學習筆記
13  */
14 public class ZookeeperBase {
15 
16     /** zookeeper地址 */
17     static final String CONNECT_ADDR = "192.168.2.2:2181";
18     /** session超時時間 */
19     static final int SESSION_OUTTIME = 2000;// ms
20     /** 信號量,阻塞程序執行,用於等待zookeeper連接成功,發送成功信號 */
21     static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
22 
23     public static void main(String[] args) throws Exception {
24 
25         ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME,
26                 new Watcher() {
27                     @Override
28                     public void process(WatchedEvent event) {
29                         // 獲取事件的狀態
30                         KeeperState keeperState = event.getState();
31                         EventType eventType = event.getType();
32                         // 如果是建立連接
33                         if (KeeperState.SyncConnected == keeperState) {
34                             if (EventType.None == eventType) {
35                                 // 如果建立連接成功,則發送信號量,讓后續阻塞程序向下執行
36                                 System.out.println("zk 建立連接");
37                                 connectedSemaphore.countDown();
38                             }
39                         }
40                     }
41                 });
42 
43         // 進行阻塞
44         connectedSemaphore.await();
45 
46         System.out.println("..");
47         // 創建父節點
48         // zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE,
49         // CreateMode.PERSISTENT);
50 
51         // 創建子節點
52         // zk.create("/testRoot/children", "children data".getBytes(),
53         // Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
54 
55         // 獲取節點洗信息
56         // byte[] data = zk.getData("/testRoot", false, null);
57         // System.out.println(new String(data));
58         // System.out.println(zk.getChildren("/testRoot", false));
59 
60         // 修改節點的值
61         // zk.setData("/testRoot", "modify data root".getBytes(), -1);
62         // byte[] data = zk.getData("/testRoot", false, null);
63         // System.out.println(new String(data));
64 
65         // 判斷節點是否存在
66         // System.out.println(zk.exists("/testRoot/children", false));
67         // 刪除節點
68         // zk.delete("/testRoot/children", -1);
69         // System.out.println(zk.exists("/testRoot/children", false));
70 
71         zk.close();
72 
73     }
74 
75 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM