Zookeeper系列三:Zookeeper客戶端的使用(Zookeeper原生API如何進行調用、ZKClient、Curator)和Zookeeper會話


一、Zookeeper原生API如何進行調用

准備工作:

首先在新建一個maven項目ZK-Demo,然后在pom.xml里面引入zk的依賴

    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.10</version>
    </dependency>

1. 連接zk並監聽事件

package com.study.demo.zk;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;

//連接zk並監聽事件
public class ZKDemo implements Watcher {
    private static final CountDownLatch cdl = new CountDownLatch(1);

    public static void main(String[] args) throws IOException {
        ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDemo());
        System.out.println(zk.getState());

        try {
            cdl.await();
        } catch (Exception e) {
            System.out.println("ZK Session established.");
        }
    }


    //監聽到事件時進行處理
    public void process(WatchedEvent event) {
        System.out.println("Receive watched event:" + event);
        if (KeeperState.SyncConnected == event.getState()) {
            cdl.countDown();
        }
    }
}

輸出結果:

CONNECTING
Receive watched event:WatchedEvent state:SyncConnected type:None path:null

2. 創建znode並監聽事件

package com.study.demo.zk;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

//創建znode並監聽事件
public class ZKOperateDemo implements Watcher {
    private static final CountDownLatch cdl = new CountDownLatch(1);

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKOperateDemo());
        cdl.await();

        String path1 = zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("Success create path: " + path1);
        String path2 = zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Success create path: " + path2);
    }

    //監聽到事件時進行處理
    public void process(WatchedEvent event) {
        System.out.println("Receive watched event:" + event);
        if (KeeperState.SyncConnected == event.getState()) {
            cdl.countDown();
        }
    }
}

輸出結果:

Receive watched event:WatchedEvent state:SyncConnected type:None path:null
Success create path: /zk-test-
Success create path: /zk-test-0000000011

3. 改變znode數據並監聽事件

package com.study.demo.zk;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

//改變znode數據並監聽事件
public class ZKDataDemo implements Watcher {
    private static final CountDownLatch cdl = new CountDownLatch(1);
    private static ZooKeeper zk = null;
    private static Stat stat = new Stat();

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDataDemo());
        cdl.await();

        zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(new String(zk.getData("/zk-test", true, stat)));

        zk.getData("/zk-test", true, stat);
        System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
        zk.setData("/zk-test", "123".getBytes(), -1);

        Thread.sleep(Integer.MAX_VALUE);
    }

    //監聽到事件時進行處理
    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            if (EventType.None == event.getType() && null == event.getPath()) {
                cdl.countDown();
            } else if (event.getType() == EventType.NodeDataChanged) {
                try {
                    System.out.println(new String(zk.getData(event.getPath(), true, stat)));
                    System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                } catch (Exception e) {
                }
            }
        }
    }
}

輸出結果:

123
4294967354, 4294967354, 0
123
4294967354, 4294967355, 1

4. 改變子節點並監聽事件

package com.study.demo.zk;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

//改變子節點並監聽事件
public class ZKChildrenDemo implements Watcher {
    private static final CountDownLatch cdl = new CountDownLatch(1);
    private static ZooKeeper zk = null;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenDemo());
        cdl.await();

        zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        List<String> list = zk.getChildren("/zk-test", true);
        for (String str : list)
            System.out.println(str);

        zk.create("/zk-test/c2", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        Thread.sleep(Integer.MAX_VALUE);
    }

    //監聽到事件時進行處理
    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState())
            if (EventType.None == event.getType() && null == event.getPath()) {
                cdl.countDown();
            } else if (event.getType() == EventType.NodeChildrenChanged) {
                try {
                    System.out.println("Child: " + zk.getChildren(event.getPath(), true));
                } catch (Exception e) {
                }
            }
    }
}

輸出結果:

c1
Child: [c1, c2]

5. 異步調用並完成回調

package com.study.demo.zk;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

//異步調用並完成回調
class ChildrenCallback implements AsyncCallback.Children2Callback {
    
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        System.out.println(
                "Child: " + rc + ", path: " + path + ", ctx: " + ctx + ", children: " + children + ", stat: " + stat);
    }
}

public class ZKChildrenAsyncDemo implements Watcher {
    private static final CountDownLatch cdl = new CountDownLatch(1);
    private static ZooKeeper zk = null;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenAsyncDemo());
        cdl.await();

        zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        zk.getChildren("/zk-test", true, new ChildrenCallback(), "ok");

        Thread.sleep(Integer.MAX_VALUE);
    }

    
    //監聽到事件時進行處理
    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState())
            if (EventType.None == event.getType() && null == event.getPath()) {
                cdl.countDown();
            } else if (event.getType() == EventType.NodeChildrenChanged) {
                try {
                    System.out.println("Child: " + zk.getChildren(event.getPath(), true));
                } catch (Exception e) {
                }
            }
    }
}

輸出結果:

Child: 0, path: /zk-test, ctx: ok, children: [c1], stat: 4294967369,4294967369,1535536716381,1535536716381,0,1,0,0,3,1,4294967370

6. 連接后創建回調

package com.study.demo.zk;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

//連接后創建回調
class IStringCallback implements AsyncCallback.StringCallback {
    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("create path result: [" + rc + ", " + path + "," + ctx + ", real path name: " + name);
    }
}

public class ZKAsyncDemo implements Watcher {
    private static final CountDownLatch cdl = new CountDownLatch(1);

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKAsyncDemo());
        cdl.await();

        zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(),
                new String("I am context"));

        zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallback(), new String("I am context"));

        zk.create("/zk-test-", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                new IStringCallback(), new String("I am context"));

        Thread.sleep(Integer.MAX_VALUE);
    }

    //監聽到事件時進行處理
    public void process(WatchedEvent event) {
        System.out.println("Receive watched event:" + event);
        if (KeeperState.SyncConnected == event.getState()) {
            cdl.countDown();
        }
    }
}

輸出結果:

Receive watched event:WatchedEvent state:SyncConnected type:None path:null
create path result: [0, /zk-test-,I am context, real path name: /zk-test-
create path result: [-110, /zk-test-,I am context, real path name: null
create path result: [0, /zk-test-,I am context, real path name: /zk-test-0000000016

Chroot命名空間:
主要為了對業務進行隔離性

示例:
Zookeeper client=new Zookeeper(“192.168.56.101:2181/zk-client”, ........)
/zk-client就是Chroot命名空間。Chroot命名空間可以多級
后續的操作都只能在/zk-client及它的子節點下進行,由此進行了業務隔離

二、ZKClient

ZKClient的優點:

1)可以遞歸創建。在zookeeper命令行和zookeeper的原生API里面得先創建父節點才能創建子節點
2)可以遞歸刪除。在zookeeper命令行和zookeeper的原生API里面得先刪除子節點才能刪除父節點
3)避免不存在的異常

准備工作:

首先在新建一個maven項目ZK-Demo,然后在pom.xml里面引入ZKClient的依賴

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

1. ZkClient遞歸創建順序節點

package com.study.demo.client;

import org.I0Itec.zkclient.ZkClient;

/**
 * 
* @Description: ZkClient遞歸創建順序節點
* @author leeSmall
* @date 2018年9月2日
*
 */
public class CreateNodeDemo {
    public static void main(String[] args) {
        ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
        String path = "/zk-client/c1";
        // 遞歸創建順序節點 true:先創建父節點/zk-client
        client.createPersistent(path, true);
    }
}

 創建成功:

2. ZkClient獲取數據並監聽事件

package com.study.demo.client;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

/**
* 
* @Description: ZkClient獲取數據
* @author leeSmall
* @date 2018年9月2日
*
*/
public class GetDataDemo {
    public static void main(String[] args) throws InterruptedException {
        String path = "/zk-client";
        ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
        //創建臨時節點
        client.createEphemeral(path, "123");

        //注冊父節點數據改變的事件
        client.subscribeDataChanges(path, new IZkDataListener() {
            
            //父節點數據改變事件
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println(dataPath + " changed: " + data);
            }

            //父節點數據刪除事件
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(dataPath + " deleted");
            }
        });

        System.out.println(client.readData(path).toString());
        client.writeData(path, "456");
        Thread.sleep(1000);
        client.delete(path);
        //sleep的目的是為了更好的觀察事件變化
        Thread.sleep(Integer.MAX_VALUE);
    }
}

 輸出結果:

123
/zk-client changed: 456
/zk-client deleted

3. ZkClient獲取子節點數據並監聽事件

package com.study.demo.client;

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

/**
* 
* @Description: ZkClient獲取子節點數據
* @author leeSmall
* @date 2018年9月2日
*
*/
public class GetChildrenDemo {
    public static void main(String[] args) throws InterruptedException {
        String path = "/zk-client";
        ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
        //注冊子節點數據改變的事件
        client.subscribeChildChanges(path, new IZkChildListener() {
            
            //子節點數據改變事件
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath + "的子發生變化: " + currentChilds);
            }
        });

        //創建順序節點
        client.createPersistent(path);
        Thread.sleep(1000);
        //獲取子節點數據 此時還沒有創建獲取不到
        System.out.println(client.getChildren(path));
        //在前面的父節點 /zk-client下創建子節點c1
        client.createPersistent(path + "/c1");
        Thread.sleep(1000);
        //刪除子節點
        client.delete(path + "/c1");
        Thread.sleep(1000);
        //刪除父節點
        client.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

 輸出結果:

/zk-client的子發生變化: []
[]
/zk-client的子發生變化: [c1]
/zk-client的子發生變化: []
/zk-client的子發生變化: null

三、Curator

curator是連接ZK應用最廣泛的工具

原因如下:

1)zk應用場景(分布式鎖,Master選舉等等),curator包含了這些場景。
2)應用場景出現極端的情況下,curator考慮到處理了。

准備工作:

首先在新建一個maven項目ZK-Demo,然后在pom.xml里面引入curator的依賴

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

1. curator創建連接session

package com.study.demo.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
* 
* @Description: curator創建連接session
* @author leeSmall
* @date 2018年9月2日
*
*/
public class CreateSessionDemo {
    public static void main(String[] args) throws InterruptedException {
        RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                .sessionTimeoutMs(5000).retryPolicy(policy).build();
        client.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

這里介紹一種算法:Backoff退避算法

有這樣一種場景,有多個請求,如果網絡出現阻塞,每1分鍾重試一次。
20:25 request1(block)
20:26 request2(block)
20:27 request3(block)
當網絡通順的時候,請求都累在一起來發送
20:28 request4(通順)request2、3、4
那么前面的請求就沒有意義了,所以就有了退避算法,按照指數間隔重試,比如第一次1分鍾,第二次2分鍾......隨着時間的推移,重試間隔越長。

2. curator遞歸創建順序節點

package com.study.demo.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
* 
* @Description: curator遞歸創建順序節點
* @author leeSmall
* @date 2018年9月2日
*
*/
public class CreateNodeDemo {
    public static void main(String[] args) throws Exception {
        String path = "/zk-curator/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
    }
}

創建成功:

3. curator異步創建臨時節點

package com.study.demo.curator;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
* 
* @Description: curator異步創建臨時節點
* @author leeSmall
* @date 2018年9月2日
*
*/
public class CreateNodeAsyncDemo {
    static CountDownLatch cdl = new CountDownLatch(2);
    static ExecutorService es = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {
        String path = "/zk-curator";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        
        //創建臨時節點
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            //回調事件處理
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
                cdl.countDown();
            }
        }, es).forPath(path, "test".getBytes());

        //創建臨時節點
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
                cdl.countDown();
            }
        }).forPath(path, "test".getBytes());

        cdl.await();
        es.shutdown();
    }
}

 輸出結果:

event code: 0, type: CREATE
event code: -110, type: CREATE

4. curator更新節點數據

package com.study.demo.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
* 
* @Description: curator更新節點數據
* @author leeSmall
* @date 2018年9月2日
*
*/
public class UpdateDataDemo {
    public static void main(String[] args) throws Exception {
        String path = "/zk-curator/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println("Current data: " + stat.getVersion());
        System.out.println("Update data: "
                + client.setData().withVersion(stat.getVersion()).forPath(path, "some".getBytes()).getVersion());
    }
}

 輸出結果:

Current data: 0
Update data: 1

5. curator刪除節點數據

package com.study.demo.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
* 
* @Description: curator刪除節點數據
* @author leeSmall
* @date 2018年9月2日
*
*/
public class DeleteNodeDemo {
    public static void main(String[] args) throws Exception {
        String path = "/zk-curator/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
    }
}

6. curator事件監聽

package com.study.demo.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
* 
* @Description: curator事件監聽
* @author leeSmall
* @date 2018年9月2日
*
*/
public class NodeCacheDemo {
    public static void main(String[] args) throws Exception {
        String path = "/zk-curator/nodecache";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());

        final NodeCache nc = new NodeCache(client, path, false);
        nc.start();
        //通過回調函數監聽事件
        nc.getListenable().addListener(new NodeCacheListener() {
            
            public void nodeChanged() throws Exception {
                System.out.println("update--current data: " + new String(nc.getCurrentData().getData()));
            }
        });
        
        client.setData().forPath(path, "test123".getBytes());
        Thread.sleep(1000);
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(5000);
        nc.close();
    }
}

 輸出結果:

update--current data: test123

Curator事件監聽:

NodeCache:節點處理監聽(會使用緩存)。回調接口NodeCacheListener

PathChildrenCache:子節點緩存,處理子節點變化。回調接口PathChildrenCacheListener

TreeCache:NodeCache和PathChildrenCache的結合體。回調接口TreeCacheListener

四、zookeeper會話

1. zookeeper連接的幾種狀態

CONNECTING 正在連接
CONNECTED 已經連接
RECONNECTING 正在重新連接
RECONNECTED 重新連接上
CLOSE 會話關閉

2. session

2.1 session主要由幾個類控制:

SessionTracker, LearnerSessionTracker, SessionTrackerImpl

session初始化的方法:

org.apache.zookeeper.server.SessionTrackerImpl.initializeNextSession(long)

public static long initializeNextSession(long id) {
        long nextSid = 0;
        nextSid = (System.currentTimeMillis() << 24) >>> 8;
        nextSid =  nextSid | (id <<56);
        return nextSid;
    }

 說明:

SessionID的分配(初始化)函數,策略如下:
1)取時間,並且左移24位得到的結果再右移8位(高8位,低16位都是0)
2)sid拿出來進行左移56位
3)和第一步的結果做或運算

2.2 Session分桶(zookeeper的一個特性)

按照Session會話過期時間進行分區塊保存。
這樣設計的好處:可以快速清理過期的session

2.3 session激活過程:

1)檢測會話是否過期
2)計算會話下一次超時時間
3)定位會話的所在區塊
4)遷移會話

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM