java中操作zookeeper,curator操作zookeeper


2.4 java中操作zookeeper

引入依賴

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.14</version>
    </dependency>
</

1

創建zookeeper會話

org.apache.zookeeper.ZooKeeper類的構造方法用於創建zookeeper客戶端與服務端之間的會話。該類提供了如下幾個構造方法:

 

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

構造方法參數說明:

 

connectString:指zk的服物器列表,以英文輸入法下逗號分割的host:port,比如?192.168.1.1:2181, 192.168.1.2:2181,也可以通過在后面跟着根目錄,表示此客戶端的操作都是在此根目錄下,比如:比如192.168.1.1:2181,192.168.1.2:2181/zk-book,表示此客戶端操作的節點都是在/zk-book根目錄下,比如創建/foo/bar,實際完整路徑為/zk-book/foo/bar;

sessionTimeout:會話超時時間,單位是毫秒,當在這個時間內沒有收到心跳檢測,會話就會失效;

watcher:注冊的watcher,null表示不設置;

canBeReadOnly:用於標識當前會話是否支持”read-only”模式? ”,“read-only”模式是指當zk集群中的某台機器與集群中過半以上的機器網絡端口不同,則此機器將不會接受客戶端的任何讀寫請求,但是,有時候,我們希望繼續提供讀請求,因此設置此參數為true, 即客戶端還以從與集群中半數以上節點網絡不通的機器節點中讀?數據;

sessionId和sessionPasswd:分別代表會話ID和會話密鑰,這兩個個參數一起可以唯一確定一個會話,客戶端通過這兩個參數可以實現客戶端會話復用;

創建zookeeper節點

org.apache.zookeeper.ZooKeeper類提供了如下創建zk節點的api:

 

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)

public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

1

2

第一個方法以同步的方式創建節點,第二個方法以異步的方式創建節點,需要注意不論同步或異步都不支持遞歸創建節點,當節點已經存在時,會拋出NodeExistsException異常。

 

create方法參數說明:

 

path:被創建的節點路徑,比如:/zk-book/foo;

data[]:節點中的數據,是一個字節數組;

acl:acl策略

createMode:節點類型,枚舉類型,有四種選擇:持久(PERSISTENT)、持久順序(PERSISTENT_SEQUENTIAL)、臨時(EPHEMERAL)、臨時順序(EPHEMERAL_SEQUENTIAL);

cb:異步回調函數,需要實現接StringCallback接口,當服物器端創建完成后,客戶端會自動調用 這個對象的方法processResult;

ctx:用於傳遞一個對象,可以在回調方法執行的時候使用,通常用於傳遞業務的上下文信息;

其中org.apache.zookeeper.data.ACL類中有兩個成員:

 

private int perms;

private org.apache.zookeeper.data.Id id;

1

2

perms成員是ACL組成Scheme:id:permission中的permission,zk中perms定義如下:

 

public interface Perms {

        int READ = 1 << 0;

 

        int WRITE = 1 << 1;

 

        int CREATE = 1 << 2;

 

        int DELETE = 1 << 3;

 

        int ADMIN = 1 << 4;

 

        int ALL = READ | WRITE | CREATE | DELETE | ADMIN;

    }

org.apache.zookeeper.data.Id類中定義了如下成員:

 

private String scheme;

private String id;

1

2

分別代表ACL組成Scheme:id:permission中的Scheme和id。(ACL參考文檔[zookeeper] zookeeper系列二:zookeeper持久節點、臨時節點及ACL )

其中org.apache.zookeeper.ZooDefs.Ids接口中有預先定義的幾種ACL策略:

 

public interface Ids {

        /**

         * This Id represents anyone.

         */

        public final Id ANYONE_ID_UNSAFE = new Id("world", "anyone");

 

        /**

         * This Id is only usable to set ACLs. It will get substituted with the

         * Id's the client authenticated with.

         */

        public final Id AUTH_IDS = new Id("auth", "");

 

        /**

         * This is a completely open ACL .

         * 相當於 world:anyone:cdrwa

         */

        public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(

                Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));

 

        /**

         * This ACL gives the creators authentication id's all permissions.

         * 相當於  相?於auth:用戶:密碼,但是需要通過ZooKeeper的addAuthInfo添加對應的用戶和密碼對

         */

        public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(

                Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));

 

        /**

         * This ACL gives the world the ability to read.

         * 相當於world:anyone:r,即所有人擁有讀權限

         */

        public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(

                Collections

                        .singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));

    }

刪除zookeeper節點

org.apache.zookeeper.ZooKeeper類提供了如下刪除zk節點的api:

 

// 以同步的方式刪除節點

public void delete(final String path, int version)

        throws InterruptedException, KeeperException

// 以異步的方式刪除節點,如果寫測試代碼,客戶端主線程不能退出,否則可能請求沒有發到服物器或者異步回調不成功

public void delete(final String path, int version, VoidCallback cb, Object ctx)

1

2

3

4

5

參數說明:

 

path:被刪除節點的路徑

version:節點的數據版本,如果指定的版本不是最新版本,將會報錯

cb:異步回調函數

ctx:傳遞的上下文信息,即操作之前的信息傳遞到刪除之后的異步回調函數里面

獲取zookeeper子節點

org.apache.zookeeper.ZooKeeper類提供了如下獲取zk子節點的api:

 

public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException

public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException

public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)

public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)

public List<String> getChildren(final String path, Watcher watcher, Stat stat)

public List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException

public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)

public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)

參數說明:

 

path:數據節點路徑,比如?/zk-book/foo,獲取該路徑下的子節點列表

watcher:給節點設置的watcher,如果path對應節點的子節點數量發生變化,將會得到通知,允許?null

watch:使用使用默認watch,true的話當刪除path節點或path子節點數量發生變化則默認watch或得到通知

stat:指定數據節點的狀態信息

cb:異步回調函數

ctx:用於傳遞一個對象,可以在回調方法執行的時候使用,通常用於傳遞業務的上文信息

獲取zookeeper節點數據

org.apache.zookeeper.ZooKeeper類提供了如下獲取zk節點數據的api:

 

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException

public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException

public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)

public void getData(String path, boolean watch, DataCallback cb, Object ctx)

參數說明:

 

path:數據節點的路徑,比如:/zk-book/foo,獲取該路徑節點的數據;

watcher:設置watcher后,如果path對應節點的數據發生變化(設置新的數據或刪除節點),將會得到通知,允許?null;

watch:是否使用默認的watcher;

stat:獲取到的數據節點的狀態信息將會保存到stat變量中,stat定義如下成員:

private long czxid;

private long mzxid;

private long ctime;

private long mtime;

private int version;

private int cversion;

private int aversion;

private long ephemeralOwner;

private int dataLength;

private int numChildren;

private long pzxid;

代表了數據節點的狀態信息;

cb:異步回調函數;

ctx:用於傳遞一個對象,可以在回調方法執行的時候用,通常用於傳遞業務的上下文信息;

修改zookeeper節點數據

org.apache.zookeeper.ZooKeeper類提供了如下修改zk節點數據的api:

 

public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException

public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)

參數說明:

 

path:被修改的節點路徑;

data:新的數據

version:指定的數據節點的版本,如果指定的版本不是最新版本,將會報錯;

cb:異步回調函數;

ctx:傳遞的上下文信息,即操作之前的信息傳遞到刪除之后的異步回調函數里面;

檢查zookeeper節點是否存在

org.apache.zookeeper.ZooKeeper類提供了如下檢查zk節點是否存在的api:

 

public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException

public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException

public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)

public void exists(String path, boolean watch, StatCallback cb, Object ctx)

參數說明:

 

path:數據節點的路徑;

watcher:需要注冊的watcher,當監聽的節點被創建、被刪除或者被更新時該watcher會得到通知;

watch:是否使用默認的watcher;

cb:異步回調函數;

ctx:用於傳遞一個對象,可以在回調方法執行的時候用,通常用於傳遞業務的上下文信息;

zookeeper API使用示例

如下示例演示了zookeeper api的使用:

 

package com.ctrip.flight.test.zookeeper;

 

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Id;

import org.apache.zookeeper.data.Stat;

import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

 

import java.util.ArrayList;

import java.util.List;

 

public class ZkClientTest {

 

    private static final String PARENT_PATH = "/zkClientTest";

 

    private static final String CHILD_PATH = "/zkClientTest/childNodeTest";

 

    private static final String IDENTITY = "zhangsan:123456";

 

    public static void main(String[] args) {

        try {

            DefaultWatcher defaultWatcher = new DefaultWatcher();

 

            ChildrenWatcher childrenWatcher = new ChildrenWatcher();

 

            ParentWatcher parentWatcher = new ParentWatcher();

 

            // 創建會話

            ZooKeeper client = new ZooKeeper("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183", 30000, defaultWatcher);

 

            client.addAuthInfo("digest", IDENTITY.getBytes());

 

            Stat stat = client.exists(PARENT_PATH, false);

            if (null != stat) {

                client.delete(PARENT_PATH, -1);

            }

 

            // 創建節點,臨時節點不能有子節點,所以父節點是持久節點

            client.create(PARENT_PATH, "zkClientTestData_v1".getBytes(), getAcl(), CreateMode.PERSISTENT);

 

            // 創建子節點

            client.create(CHILD_PATH, "childNodeData_v1".getBytes(), getAcl(), CreateMode.EPHEMERAL);

 

            // 獲取子節點信息

            Stat childStat = new Stat();

            List<String> childs = client.getChildren(PARENT_PATH, childrenWatcher, childStat);

            System.out.println(PARENT_PATH + "'s childs:" + childs);

            System.out.println(PARENT_PATH + "'s stat:" + childStat);

 

            Thread.sleep(1000);

 

            // 獲取父節點數據

            Stat parentStat = new Stat();

            byte[] parentData = client.getData(PARENT_PATH, parentWatcher, parentStat);

            System.out.println(PARENT_PATH + "'s data: " + new String(parentData));

            System.out.println(PARENT_PATH + "'s stat: " + parentStat);

 

            Thread.sleep(1000);

 

            // 設置子節點數據

            childStat = client.setData(CHILD_PATH, "childNodeData_v2".getBytes(), -1);

            System.out.println(CHILD_PATH + "'s stat:" + childStat);

            byte[] childData = client.getData(CHILD_PATH, false, childStat);

            System.out.println(CHILD_PATH + "'s data:" + new String(childData));

 

            Thread.sleep(1000);

 

            // 刪除子節點

            client.delete(CHILD_PATH, -1);

            // 判斷子節點是否存在

            childStat = client.exists(CHILD_PATH, false);

            System.out.println(CHILD_PATH + " is exist: " + (childStat != null));

 

            client.delete(PARENT_PATH, -1);

 

            client.close();

 

            Thread.sleep(1000);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

 

    /**

     * ACL格式為:schema:id:permission

     * @return

     */

    private static List<ACL> getAcl() throws Exception {

        List<ACL> acls = new ArrayList<>();

 

        // 指定schema

        String scheme = "auth";

        // 指定id

        // String identity = "zhangsan:123456";

        Id id = new Id(scheme, DigestAuthenticationProvider.generateDigest(IDENTITY));

 

        // Perms.ALL的權限為crdwa

        ACL acl = new ACL(ZooDefs.Perms.ALL, id);

 

        acls.add(acl);

 

        return acls;

    }

}

Watcher的使用

Watcher的原理

 

 

分別是zookeeper服務端、客戶端以及客戶端的watchManager。

如圖所示,客戶端向zk注冊watcher的同時,會將客戶端的watcher對象存儲在客戶端的WatchManager中;zk服務器觸發watch事件后,會向客戶端發送通知,客戶端線程從watchManager中取出對應watcher執行。

 

客戶端如何實現事件通知的動作

客戶端只需定義一個類實現org.apache.zookeeper.Watcher接口並實現接口中的如下方法:

 

abstract public void process(WatchedEvent event);

1

即可在得到通知后執行相應的動作。參數org.apache.zookeeper.WatchedEvent是zk服務端傳過來的事件,有三個成員:

 

final private KeeperState keeperState; // 通知狀態

final private EventType eventType; // 事件類型

private String path; // 哪個節點發生的時間

1

2

3

分別代表通知的狀態、事件類型和發生事件的節點。

 

keeperState是個枚舉對象,代表客戶端和zk服務器的鏈接狀態,定義如下:

 

/**

 * Enumeration of states the ZooKeeper may be at the event

 */

 public enum KeeperState {

      /** Unused, this state is never generated by the server */

      @Deprecated

      Unknown (-1),

 

      /** The client is in the disconnected state - it is not connected

       * to any server in the ensemble. */

       Disconnected (0),

 

      /** Unused, this state is never generated by the server */

       @Deprecated

       NoSyncConnected (1),

 

     /** The client is in the connected state - it is connected

      * to a server in the ensemble (one of the servers specified

      * in the host connection parameter during ZooKeeper client

      * creation).

      * /

      SyncConnected (3),

 

      /**

       * Auth failed state

       */

       AuthFailed (4),

 

      /**

       * The client is connected to a read-only server, that is the

       * server which is not currently connected to the majority.

       * The only operations allowed after receiving this state is

       * read operations.

       * This state is generated for read-only clients only since

       * read/write clients aren't allowed to connect to r/o servers.

       */

       ConnectedReadOnly (5),

 

       /**

        * SaslAuthenticated: used to notify clients that they are SASL-authenticated,

        * so that they can perform Zookeeper actions with their SASL-authorized permissions.

        */

        SaslAuthenticated(6),

 

       /** The serving cluster has expired this session. The ZooKeeper

        * client connection (the session) is no longer valid. You must

        * create a new client connection (instantiate a new ZooKeeper

        * instance) if you with to access the ensemble.

        */

        Expired (-112);

 

        private final int intValue;     // Integer representation of value

                                        // for sending over wire

 

        KeeperState(int intValue) {

            this.intValue = intValue;

        }

 

        public int getIntValue() {

            return intValue;

        }

 

        public static KeeperState fromInt(int intValue) {

              switch(intValue) {

                  case   -1: return KeeperState.Unknown;

                  case    0: return KeeperState.Disconnected;

                  case    1: return KeeperState.NoSyncConnected;

                  case    3: return KeeperState.SyncConnected;

                  case    4: return KeeperState.AuthFailed;

                  case    5: return KeeperState.ConnectedReadOnly;

                  case    6: return KeeperState.SaslAuthenticated;

                  case -112: return KeeperState.Expired;

 

                  default:

                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");

               }

        }

 }

eventType也是個枚舉類型,代表節點發生的事件類型,比如創建新的子節點、改變節點數據等,定義如下:

 

/**

 * Enumeration of types of events that may occur on the ZooKeeper

 */

 public enum EventType {

       None (-1),

       NodeCreated (1),

       NodeDeleted (2),

       NodeDataChanged (3),

       NodeChildrenChanged (4),

       DataWatchRemoved (5),

       ChildWatchRemoved (6);

 

       private final int intValue;     // Integer representation of value

                                       // for sending over wire

 

       EventType(int intValue) {

            this.intValue = intValue;

       }

 

       public int getIntValue() {

            return intValue;

       }

 

       public static EventType fromInt(int intValue) {

            switch(intValue) {

                case -1: return EventType.None;

                case  1: return EventType.NodeCreated;

                case  2: return EventType.NodeDeleted;

                case  3: return EventType.NodeDataChanged;

                case  4: return EventType.NodeChildrenChanged;

                case  5: return EventType.DataWatchRemoved;

                case  6: return EventType.ChildWatchRemoved;

 

                default:

                         throw new RuntimeException("Invalid integer value for conversion to EventType");

            }

       }          

}

keeperState和eventType對應關系如下所示:

 

 

 

對於NodeDataChanged事件:無論節點數據發生變化還是數據版本發生變化都會觸發(即使被更新數據與新數據一樣,數據版本都會發生變化)。

 

對於NodeChildrenChanged事件:新增和刪除子節點會觸發該事件類型。

 

需要注意的是:WatchedEvent只是事件相關的通知,並沒有對應數據節點的原始數據內容及變更后的新數據內容,因此如果需要知道變更前的數據或變更后的新數據,需要業務保存變更前的數據和調用接口獲取新的數據

 

如何注冊watcher

watcher注冊api

可以在創建zk客戶端實例的時候注冊watcher(構造方法中注冊watcher):

 

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,ZKClientConfig conf)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig conf)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

ZooKeeper的構造方法中傳入的watcher將會作為整個zk會話期間的默認watcher,該watcher會一直保存為客戶端ZKWatchManager的defaultWatcher成員,如果有其他的設置,這個watcher會被覆蓋。

 

除了可以通過ZooKeeper類的構造方法注冊watcher外,還可以通過ZooKeeper類中其他一些api來注冊watcher,只不過這些api注冊的watcher就不是默認watcher了(以下每個注冊watcher的方法有很多個重載的方法,就不一一列舉出來)。

 

public List<String> getChildren(final String path, Watcher watcher)

// boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher

public List<String> getChildren(String path, boolean watch)

// boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher

public byte[] getData(String path, boolean watch, Stat stat)

public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)

// boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher

public Stat exists(String path, boolean watch)

public Stat exists(final String path, Watcher watcher)

watcher注冊示例代碼

本示例中使用zookeeper自帶客戶端演示watcher的使用,zookeeper自帶客戶端有一點需要注意:

 

Watcher設置后,一旦觸發一次即會失效,如果需要一直監聽,則需要再注冊

 

定義默認watcher:

 

/**

 * 測試默認watcher

 */

public class DefaultWatcher implements Watcher {

 

    @Override

    public void process(WatchedEvent event) {

 

        System.out.println("==========DefaultWatcher start==============");

 

        System.out.println("DefaultWatcher state: " + event.getState().name());

 

        System.out.println("DefaultWatcher type: " + event.getType().name());

 

        System.out.println("DefaultWatcher path: " + event.getPath());

 

        System.out.println("==========DefaultWatcher end==============");

    }

}

定義監聽子節點變化的watcher:

 

/**

 * 用於監聽子節點變化的watcher

 */

public class ChildrenWatcher implements Watcher {

 

    @Override

    public void process(WatchedEvent event) {

 

        System.out.println("==========ChildrenWatcher start==============");

 

        System.out.println("ChildrenWatcher state: " + event.getState().name());

 

        System.out.println("ChildrenWatcher type: " + event.getType().name());

 

        System.out.println("ChildrenWatcher path: " + event.getPath());

 

        System.out.println("==========ChildrenWatcher end==============");

    }

}

定義監聽節點變化的watcher:

 

public class DataWatcher implements Watcher {

 

    @Override

    public void process(WatchedEvent event) {

 

        System.out.println("==========DataWatcher start==============");

 

        System.out.println("DataWatcher state: " + event.getState().name());

 

        System.out.println("DataWatcher type: " + event.getType().name());

 

        System.out.println("DataWatcher path: " + event.getPath());

 

        System.out.println("==========DataWatcher end==============");

    }

}

watcher測試代碼:

 

public class WatcherTest {

 

    /**

     * 鏈接zk服務端的地址

     */

    private static final String CONNECT_STRING = "192.168.0.113:2181";

 

    public static void main(String[] args) {

 

        // 除了默認watcher外其他watcher一旦觸發就會失效,需要充新注冊,本示例中因為

        // 還未想到比較好的重新注冊watcher方式(考慮到如果在Watcher中持有一個zk客戶端的

        // 實例可能存在循環引用的問題),因此暫不實現watcher失效后重新注冊watcher的問題,

        // 后續可以查閱curator重新注冊watcher的實現方法。

 

        // 默認watcher

        DefaultWatcher defaultWatcher = new DefaultWatcher();

        // 監聽子節點變化的watcher

        ChildrenWatcher childrenWatcher = new ChildrenWatcher();

        // 監聽節點數據變化的watcher

        DataWatcher dataWatcher = new DataWatcher();

        try {

            // 創建zk客戶端,並注冊默認watcher

            ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, 100000, defaultWatcher);

 

            // 讓默認watcher監聽 /GetChildren 節點的子節點變化

            // zooKeeper.getChildren("/GetChildren", true);

 

            // 讓childrenWatcher監聽 /GetChildren 節點的子節點變化(默認watcher不再監聽該節點子節點變化)

            zooKeeper.getChildren("/GetChildren", childrenWatcher);

 

            // 讓dataWatcher監聽 /GetChildren 節點本省的變化(默認watcher不再監聽該節點變化)

            zooKeeper.getData("/GetChildren", dataWatcher, null);

 

            TimeUnit.SECONDS.sleep(1000000);

        } catch (Exception ex) {

            ex.printStackTrace();

        }

    }

}

 

  1. 使用開源curator操作zookeeper

zookeeper原生api的不足

zookeeper原生api存在以下不足之處:

 

連接的創建是異步的,需要開發人員自行編碼實現等待;

連接沒有自動的超時重連機制;

Zk本身不提供序列化機制,需要開發人員自行指定,從而實現數據的序列化和反序列化;

Watcher注冊一次只會生效一次,需要不斷的重復注冊;

Watcher本身的使用方式不符合java本身的術語,如果采用監聽器的方式,更容易理解;

不支持遞歸創建樹形節點;

zookeeper第三方開源客戶端

zookeeper的第三方開源客戶端主要有zkClient和Curator。其中zkClient解決了session會話超時重連、Watcher反復注冊等問題,提供了更加簡潔的api,但zkClient社區不活躍,文檔不夠完善。而Curator是Apache基金會的頂級項目之一,它解決了session會話超時重連、Watcher反復注冊、NodeExitsException異常等問題,Curator具有更加完善的文檔,因此我們這里只學習Curator的使用。

 

Curator客戶端api介紹

Curator包含了如下幾個包:

 

curator-framework:對zookeeper底層api的一些封裝;

curator-client:提供一些客戶端的操作,如重試策略等;

curator-recipes:封裝了一些高級特性,如Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等

首先我們在gradle中引入curator的依賴就行:

 

 

curator提供了一種類似jdk8中stream一樣的流式操作。

 

創建zookeeper會話

Curator中org.apache.curator.framework.CuratorFrameworkFactory類提供了如下兩個創建zookeeper會話的方法:

 

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

該方法返回一個org.apache.curator.framework.CuratorFramework類型的對象,參數說明如下:

 

connectString:逗號分開的ip:port對;

sessionTimeoutMs:會話超時時間,單位為毫秒,默認是60000ms,指連接建立完后多久沒有收到心跳檢測,超過該時間即為會話超時;

connectionTimeoutMs:連接創建超時時間,單位為毫秒,默認是15000ms,指客戶端與服務端建立連接時多長時間沒連接上就算超時;

retryPolicy:重試策略,retryPolicy的類型定義如下

   /**

    * Abstracts the policy to use when retrying connections

    */

    public interface RetryPolicy

    {

             /**

             * Called when an operation has failed for some reason. This method should return

             * true to make another attempt.

             *

               *

              * @param retryCount the number of times retried so far (0 the first time),第幾次重試

              * @param elapsedTimeMs the elapsed time in ms since the operation was attempted,到當前重試時刻總的重試時間

              * @param sleeper use this to sleep - DO NOT call Thread.sleep,重試策略

              * @return true/false

              */

              public boolean      allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);

    }

allowRetry返回true繼續重試,返回false不再重試

可以通過實現該接口來自定義策略,curator已經為我們提供了若干重試策略:

ExponentialBackoffRetry:該重試策略隨着重試次數的增加,sleep的時間呈指數增長,該提供了兩個構造方法

  public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

  public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

第retryCount次重試的sleep時間計算方式為:baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),如果該值大於maxSleepMs,則sleep時間為maxSleepMs,如果重試次數大於maxRetries,則不再重試;

RetryNTimes:該重試策略重試指定次數,每次sleep固定的時間,構造方法如下

public RetryNTimes(int n, int sleepMsBetweenRetries)

n是重試的次數,sleepMsBetweenRetries是sleep的時間;

RetryOneTime:該重試策略只重試一次

RetryUntilElapsed:該重試策略對重試次數不做限制,但對總的重試時間做限制,構造方法如下

public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

1

maxElapsedTimeMs是最大的重試時間,sleepMsBetweenRetries是sleep的時間間隔;

通過newClient獲得CuratorFramework對象后我們就可以進行各種操作了。

除了newClient,CuratorFrameworkFactory還提供了一種Builder的方式來創建CuratorFramework對象,如下的示例所示:

 

RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);

CuratorFramework client =  CuratorFrameworkFactory.builder()

                .connectString("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183")

                .sessionTimeoutMs(30000).connectionTimeoutMs(15000)

                .retryPolicy(retryPolicy)

                .namespace("curatorTest")

                .build();

創建zookeeper節點

在curator中無論執行何種操作都必須先獲得一個構建該操作的包裝類(Builder對象),創建zookeeper節點需要先獲得一個org.apache.curator.framework.api.CreateBuilder(實際上是CreateBuilder的實現類CreateBuilderImpl)對象,然后用這個對象來構建創建節點的操作,CreateBuilderImpl中常見的操作如下:

 

// 遞歸創建(持久)父目錄

public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded()

// 設置創建節點的屬性

public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)

// 設置節點的acl屬性

public ACLBackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)

// 指定創建節點的路徑和節點上的數據

public String forPath(final String givenPath, byte[] data) throws Exception

如下所示為創建一個節點的示例:

 

String test1Data = client.create()

                .creatingParentsIfNeeded()

                .withMode(CreateMode.PERSISTENT)

                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                .forPath("/curatorTest/test1", "test1".getBytes());

刪除zookeeper節點

同理先調用CuratorFramework的delete()獲取構建刪除操作的DeleteBuilder(實際上為DeleteBuilderImpl),DeleteBuilderImpl提供了如下方法來構建刪除操作:

 

// 指定要刪除數據的版本號

public BackgroundPathable<Void> withVersion(int version)

// 確保數據被刪除,本質上就是重試,當刪除失敗時重新發起刪除操作

public ChildrenDeletable guaranteed()

// 指定刪除的節點

public Void forPath(String path) throws Exception

// 遞歸刪除子節點

public BackgroundVersionable deletingChildrenIfNeeded()

讀取zookeeper節點數據

同理先調用CuratorFramework的getData()獲取構建獲取數據操作的GetDataBuilder(實際上為GetDataBuilderImpl),GetDataBuilderImpl提供了如下方法來構建讀取操作:

 

// 將節點狀態信息保存到stat

public WatchPathable<byte[]> storingStatIn(Stat stat)

// 指定節點路徑

public byte[] forPath(String path) throws Exception

如下示例為獲取節點數據:

 

Stat test1Stat = new Stat();

byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");

System.out.println("test1 data: " + new String(test1DataBytes));

更新zookeeper節點數據

同理先調用CuratorFramework的setData()獲取構建獲取數據操作的SetDataBuilder(實際上為SetDataBuilderImpl),SetDataBuilderImpl提供了如下方法來構建更新操作:

 

// 指定版本號

public BackgroundPathAndBytesable<Stat> withVersion(int version)

// 指定節點路徑和要更新的數據

public Stat forPath(String path, byte[] data) throws Exception

示例程序:

 

test1Stat = client.setData()

                .withVersion(-1)

                .forPath("/curatorTest/test1", "test1DataV2".getBytes());

讀取zookeeper子節點

同理先調用CuratorFramework的getChildren()獲取構建獲取子節點數據操作的GetChildrenBuilder(實際上為GetChildrenBuilderImpl),GetChildrenBuilderImpl提供了如下方法來構建更新操作:

 

// 把服務器端獲取到的狀態數據存儲到stat對象中

public WatchPathable<List<String>> storingStatIn(Stat stat)

// 指定獲取子節點數據的節點路徑

public List<String> forPath(String path) throws Exception

// 設置watcher,類似於zookeeper本身的api,也只能使用一次

public BackgroundPathable<List<String>> usingWatcher(Watcher watcher)

public BackgroundPathable<List<String>> usingWatcher(CuratorWatcher watcher)

示例程序:

 

Stat childStat = new Stat();

List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");

curator中關於異步操作

curator為所有操作都提供了異步執行的版本,只需要在構建操作的方法鏈中添加如下操作之一即可:

 

public ErrorListenerPathable<List<String>> inBackground()

public ErrorListenerPathable<List<String>> inBackground(Object context)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Executor executor)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context, Executor executor)

如下示例程序為使用異步執行刪除操作:

 

client.delete()

          .guaranteed()

          .withVersion(-1)

          .inBackground(((client1, event) -> {

                    System.out.println(event.getPath() + ", data=" + event.getData());

                    System.out.println("event type=" + event.getType());

                    System.out.println("event code=" + event.getResultCode());

           }))

           .forPath("/curatorTest/test1");

curator中的NodeCache

NodeCache會將某一路徑的節點(節點本身)在本地緩存一份,當zookeeper中相應路徑的節點發生更新、創建或者刪除操作時,NodeCache將會得到響應,並且會將最新的數據拉到本地緩存中,NodeCache只會監聽路徑本身的變化,並不會監聽子節點的變化。我們可以通過NodeCache注冊一個監聽器來獲取發生變化的通知。NodeCache提供了如下構造函數:

 

public NodeCache(CuratorFramework client, String path)

public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

1

2

參數說明:

 

client: curator客戶端;

path: 需要緩存的節點路徑;

dataIsCompressed:是否壓縮節點下的數據;

NodeCache提供了一個如下類型的監聽器容器,只要往容器中添加監聽器,當節點發生變更時,容器中的監聽器都將得到通知。

 

private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();

1

NodeCache緩存數據及添加Listener的示例代碼如下:

 

NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");

// 是否立即拉取/curatorTest/test1節點下的數據緩存到本地

nodeCache.start(true);

// 添加listener

nodeCache.getListenable().addListener(() -> {

        ChildData childData = nodeCache.getCurrentData();

        if (null != childData) {

             System.out.println("path=" + childData.getPath() + ", data=" + childData.getData() + ";");

        }

});

Note: NodeCache只會緩存節點本身的數據和狀態,並不會緩存節點下的子節點信息,所以如果我們在節點下創建子節點,NodeCache中的Listener是不會得到通知的*

 

curator中的PathChildrenCache

PathChildrenCache會將指定路徑節點下的所有子節點緩存在本地,但不會緩存節點本身的信息,當執行新增(CHILD_ADDED)、刪除(CHILD_REMOVED)、更新(CHILD_UPDATED)指定節點下的子節點等操作時,PathChildrenCache中的Listener將會得到通知,PathChildrenCache提供了如下幾個構造函數:

 

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

參數說明:

 

client:curator客戶端;

path:緩存的節點路徑;

cacheData:除了緩存節點狀態外是否緩存節點數據,如果為true,那么客戶端在接收到節點列表變更的同時,也能夠獲取到節點的數據內容,如果為false,則無法獲取到數據內容;

threadFactory:線程池工廠,當內部需要開啟新的線程執行時,使用該線程池工廠來創建線程;

dataIsCompressed:是否壓縮節點數據;

executorService:線程池;

PathChildrenCache通過start方法可以傳入三種啟動模式,這三種啟動模式定義在org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode中:

 

NORMAL:異步初始化cache;

BUILD_INITIAL_CACHE:同步初始化cache,以及創建cache后,就從服務器拉取對應的數據;

POST_INITIALIZED_EVENT:異步初始化cache,初始化完成觸發PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener會收到該事件的通知;

PathChildrenCache示例代碼如下:

 

PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);

// startMode為BUILD_INITIAL_CACHE,cache是初始化完成會發送INITIALIZED事件

pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

System.out.println(pathChildrenCache.getCurrentData().size());

pathChildrenCache.getListenable().addListener(((client1, event) -> {

            ChildData data = event.getData();

            switch (event.getType()) {

                case INITIALIZED:

                    System.out.println("子節點cache初始化完成(StartMode為POST_INITIALIZED_EVENT的情況)");

                    System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());

                    break;

                case CHILD_ADDED:

                    System.out.println("添加子節點,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_UPDATED:

                    System.out.println("更新子節點,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_REMOVED:

                    System.out.println("刪除子節點,path=" + data.getPath());

                    break;

                default:

                    System.out.println(event.getType());

            }

}));

curator完整示例代碼

如下所示為演示curator使用的完整示例代碼:

public class CuratorTest {

    public static void main(String[] args) throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);

        CuratorFramework client =  CuratorFrameworkFactory.builder()

                .connectString("192.168.0.104:2181,192.168.0.104:2182,192.168.0.104:2183")

                .sessionTimeoutMs(30000).connectionTimeoutMs(15000)

                .retryPolicy(retryPolicy)

                //.namespace("curatorTest")

                .build();

        client.start();

 

        // 判斷節點是否存在,存在則先刪除節點

        Stat test1Stat = client.checkExists().forPath("/curatorTest/test1");

        if (null != test1Stat) {

            client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/curatorTest/test1");

        }

 

        // 創建節點

        String test1Data = client.create()

                .creatingParentsIfNeeded()

                .withMode(CreateMode.PERSISTENT)

                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                .forPath("/curatorTest/test1", "test1DataV1".getBytes());

 

        // 獲取節點信息

        test1Stat = new Stat();

        byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");

        System.out.println("test1 stat: " + test1Stat);

        System.out.println("test1 data: " + new String(test1DataBytes));

 

        // 更新節點數據

        test1Stat = client.setData()

                .withVersion(-1)

                .forPath("/curatorTest/test1", "test1DataV2".getBytes());

        System.out.println("test1 stat: " + test1Stat);

 

        // 獲取所有子節點

        Stat childStat = new Stat();

        List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");

        System.out.println("curatorTest childs: " + childs);

 

//        client.delete()

//                .guaranteed()

//                .withVersion(-1)

//                .inBackground(((client1, event) -> {

//                    System.out.println(event.getPath() + ", data=" + event.getData());

//                    System.out.println("event type=" + event.getType());

//                    System.out.println("event code=" + event.getResultCode());

//                }))

//                .forPath("/curatorTest/test1");

 

        // 緩存節點

        NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");

        nodeCache.start(true);

        nodeCache.getListenable().addListener(() -> {

            System.out.println("NodeCache:");

            ChildData childData = nodeCache.getCurrentData();

            if (null != childData) {

                System.out.println("path=" + childData.getPath() + ", data=" + new String(childData.getData()) + ";");

            }

        });

 

 

        // 緩存子節點

        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);

        // startMode為BUILD_INITIAL_CACHE,cache是初始化完成會發送INITIALIZED事件

        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        System.out.println(pathChildrenCache.getCurrentData().size());

        pathChildrenCache.getListenable().addListener(((client1, event) -> {

            ChildData data = event.getData();

            switch (event.getType()) {

                case INITIALIZED:

                    System.out.println("子節點cache初始化完成(StartMode為POST_INITIALIZED_EVENT的情況)");

                    System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());

                    break;

                case CHILD_ADDED:

                    System.out.println("添加子節點,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_UPDATED:

                    System.out.println("更新子節點,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_REMOVED:

                    System.out.println("刪除子節點,path=" + data.getPath());

                    break;

                default:

                    System.out.println(event.getType());

            }

        }));

 

        Thread.sleep(20000000);

    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM