一、Zookeeper原生API如何進行調用
准備工作:
首先在新建一個maven項目ZK-Demo,然后在pom.xml里面引入zk的依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency>
1. 連接zk並監聽事件
package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; //連接zk並監聽事件 public class ZKDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); public static void main(String[] args) throws IOException { ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDemo()); System.out.println(zk.getState()); try { cdl.await(); } catch (Exception e) { System.out.println("ZK Session established."); } } //監聽到事件時進行處理 public void process(WatchedEvent event) { System.out.println("Receive watched event:" + event); if (KeeperState.SyncConnected == event.getState()) { cdl.countDown(); } } }
輸出結果:
CONNECTING
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
2. 創建znode並監聽事件
package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //創建znode並監聽事件 public class ZKOperateDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKOperateDemo()); cdl.await(); String path1 = zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create path: " + path1); String path2 = zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Success create path: " + path2); } //監聽到事件時進行處理 public void process(WatchedEvent event) { System.out.println("Receive watched event:" + event); if (KeeperState.SyncConnected == event.getState()) { cdl.countDown(); } } }
輸出結果:
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
Success create path: /zk-test-
Success create path: /zk-test-0000000011
3. 改變znode數據並監聽事件
package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; //改變znode數據並監聽事件 public class ZKDataDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); private static ZooKeeper zk = null; private static Stat stat = new Stat(); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDataDemo()); cdl.await(); zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(new String(zk.getData("/zk-test", true, stat))); zk.getData("/zk-test", true, stat); System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); zk.setData("/zk-test", "123".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } //監聽到事件時進行處理 public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { cdl.countDown(); } else if (event.getType() == EventType.NodeDataChanged) { try { System.out.println(new String(zk.getData(event.getPath(), true, stat))); System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); } catch (Exception e) { } } } } }
輸出結果:
123
4294967354, 4294967354, 0
123
4294967354, 4294967355, 1
4. 改變子節點並監聽事件
package com.study.demo.zk; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //改變子節點並監聽事件 public class ZKChildrenDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenDemo()); cdl.await(); zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); List<String> list = zk.getChildren("/zk-test", true); for (String str : list) System.out.println(str); zk.create("/zk-test/c2", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(Integer.MAX_VALUE); } //監聽到事件時進行處理 public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) if (EventType.None == event.getType() && null == event.getPath()) { cdl.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { System.out.println("Child: " + zk.getChildren(event.getPath(), true)); } catch (Exception e) { } } } }
輸出結果:
c1
Child: [c1, c2]
5. 異步調用並完成回調
package com.study.demo.zk; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; //異步調用並完成回調 class ChildrenCallback implements AsyncCallback.Children2Callback { public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { System.out.println( "Child: " + rc + ", path: " + path + ", ctx: " + ctx + ", children: " + children + ", stat: " + stat); } } public class ZKChildrenAsyncDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenAsyncDemo()); cdl.await(); zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zk.getChildren("/zk-test", true, new ChildrenCallback(), "ok"); Thread.sleep(Integer.MAX_VALUE); } //監聽到事件時進行處理 public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) if (EventType.None == event.getType() && null == event.getPath()) { cdl.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { System.out.println("Child: " + zk.getChildren(event.getPath(), true)); } catch (Exception e) { } } } }
輸出結果:
Child: 0, path: /zk-test, ctx: ok, children: [c1], stat: 4294967369,4294967369,1535536716381,1535536716381,0,1,0,0,3,1,4294967370
6. 連接后創建回調
package com.study.demo.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //連接后創建回調 class IStringCallback implements AsyncCallback.StringCallback { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("create path result: [" + rc + ", " + path + "," + ctx + ", real path name: " + name); } } public class ZKAsyncDemo implements Watcher { private static final CountDownLatch cdl = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKAsyncDemo()); cdl.await(); zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), new String("I am context")); zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), new String("I am context")); zk.create("/zk-test-", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new IStringCallback(), new String("I am context")); Thread.sleep(Integer.MAX_VALUE); } //監聽到事件時進行處理 public void process(WatchedEvent event) { System.out.println("Receive watched event:" + event); if (KeeperState.SyncConnected == event.getState()) { cdl.countDown(); } } }
輸出結果:
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
create path result: [0, /zk-test-,I am context, real path name: /zk-test-
create path result: [-110, /zk-test-,I am context, real path name: null
create path result: [0, /zk-test-,I am context, real path name: /zk-test-0000000016
Chroot命名空間:
主要為了對業務進行隔離性
示例:
Zookeeper client=new Zookeeper(“192.168.56.101:2181/zk-client”, ........)
/zk-client就是Chroot命名空間。Chroot命名空間可以多級
后續的操作都只能在/zk-client及它的子節點下進行,由此進行了業務隔離
二、ZKClient
ZKClient的優點:
1)可以遞歸創建。在zookeeper命令行和zookeeper的原生API里面得先創建父節點才能創建子節點
2)可以遞歸刪除。在zookeeper命令行和zookeeper的原生API里面得先刪除子節點才能刪除父節點
3)避免不存在的異常
准備工作:
首先在新建一個maven項目ZK-Demo,然后在pom.xml里面引入ZKClient的依賴
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
1. ZkClient遞歸創建順序節點
package com.study.demo.client; import org.I0Itec.zkclient.ZkClient; /** * * @Description: ZkClient遞歸創建順序節點 * @author leeSmall * @date 2018年9月2日 * */ public class CreateNodeDemo { public static void main(String[] args) { ZkClient client = new ZkClient("192.168.152.130:2181", 5000); String path = "/zk-client/c1"; // 遞歸創建順序節點 true:先創建父節點/zk-client client.createPersistent(path, true); } }
創建成功:
2. ZkClient獲取數據並監聽事件
package com.study.demo.client; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; /** * * @Description: ZkClient獲取數據 * @author leeSmall * @date 2018年9月2日 * */ public class GetDataDemo { public static void main(String[] args) throws InterruptedException { String path = "/zk-client"; ZkClient client = new ZkClient("192.168.152.130:2181", 5000); //創建臨時節點 client.createEphemeral(path, "123"); //注冊父節點數據改變的事件 client.subscribeDataChanges(path, new IZkDataListener() { //父節點數據改變事件 public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println(dataPath + " changed: " + data); } //父節點數據刪除事件 public void handleDataDeleted(String dataPath) throws Exception { System.out.println(dataPath + " deleted"); } }); System.out.println(client.readData(path).toString()); client.writeData(path, "456"); Thread.sleep(1000); client.delete(path); //sleep的目的是為了更好的觀察事件變化 Thread.sleep(Integer.MAX_VALUE); } }
輸出結果:
123
/zk-client changed: 456
/zk-client deleted
3. ZkClient獲取子節點數據並監聽事件
package com.study.demo.client; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; /** * * @Description: ZkClient獲取子節點數據 * @author leeSmall * @date 2018年9月2日 * */ public class GetChildrenDemo { public static void main(String[] args) throws InterruptedException { String path = "/zk-client"; ZkClient client = new ZkClient("192.168.152.130:2181", 5000); //注冊子節點數據改變的事件 client.subscribeChildChanges(path, new IZkChildListener() { //子節點數據改變事件 public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println(parentPath + "的子發生變化: " + currentChilds); } }); //創建順序節點 client.createPersistent(path); Thread.sleep(1000); //獲取子節點數據 此時還沒有創建獲取不到 System.out.println(client.getChildren(path)); //在前面的父節點 /zk-client下創建子節點c1 client.createPersistent(path + "/c1"); Thread.sleep(1000); //刪除子節點 client.delete(path + "/c1"); Thread.sleep(1000); //刪除父節點 client.delete(path); Thread.sleep(Integer.MAX_VALUE); } }
輸出結果:
/zk-client的子發生變化: []
[]
/zk-client的子發生變化: [c1]
/zk-client的子發生變化: []
/zk-client的子發生變化: null
三、Curator
curator是連接ZK應用最廣泛的工具
原因如下:
1)zk應用場景(分布式鎖,Master選舉等等),curator包含了這些場景。
2)應用場景出現極端的情況下,curator考慮到處理了。
准備工作:
首先在新建一個maven項目ZK-Demo,然后在pom.xml里面引入curator的依賴
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
1. curator創建連接session
package com.study.demo.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; /** * * @Description: curator創建連接session * @author leeSmall * @date 2018年9月2日 * */ public class CreateSessionDemo { public static void main(String[] args) throws InterruptedException { RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181") .sessionTimeoutMs(5000).retryPolicy(policy).build(); client.start(); Thread.sleep(Integer.MAX_VALUE); } }
這里介紹一種算法:Backoff退避算法
有這樣一種場景,有多個請求,如果網絡出現阻塞,每1分鍾重試一次。
20:25 request1(block)
20:26 request2(block)
20:27 request3(block)
當網絡通順的時候,請求都累在一起來發送
20:28 request4(通順)request2、3、4
那么前面的請求就沒有意義了,所以就有了退避算法,按照指數間隔重試,比如第一次1分鍾,第二次2分鍾......隨着時間的推移,重試間隔越長。
2. curator遞歸創建順序節點
package com.study.demo.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; /** * * @Description: curator遞歸創建順序節點 * @author leeSmall * @date 2018年9月2日 * */ public class CreateNodeDemo { public static void main(String[] args) throws Exception { String path = "/zk-curator/c1"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes()); } }
創建成功:
3. curator異步創建臨時節點
package com.study.demo.curator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; /** * * @Description: curator異步創建臨時節點 * @author leeSmall * @date 2018年9月2日 * */ public class CreateNodeAsyncDemo { static CountDownLatch cdl = new CountDownLatch(2); static ExecutorService es = Executors.newFixedThreadPool(2); public static void main(String[] args) throws Exception { String path = "/zk-curator"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); //創建臨時節點 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { //回調事件處理 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType()); cdl.countDown(); } }, es).forPath(path, "test".getBytes()); //創建臨時節點 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType()); cdl.countDown(); } }).forPath(path, "test".getBytes()); cdl.await(); es.shutdown(); } }
輸出結果:
event code: 0, type: CREATE
event code: -110, type: CREATE
4. curator更新節點數據
package com.study.demo.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** * * @Description: curator更新節點數據 * @author leeSmall * @date 2018年9月2日 * */ public class UpdateDataDemo { public static void main(String[] args) throws Exception { String path = "/zk-curator/c1"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes()); Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path); System.out.println("Current data: " + stat.getVersion()); System.out.println("Update data: " + client.setData().withVersion(stat.getVersion()).forPath(path, "some".getBytes()).getVersion()); } }
輸出結果:
Current data: 0
Update data: 1
5. curator刪除節點數據
package com.study.demo.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** * * @Description: curator刪除節點數據 * @author leeSmall * @date 2018年9月2日 * */ public class DeleteNodeDemo { public static void main(String[] args) throws Exception { String path = "/zk-curator/c1"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes()); Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path); client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path); } }
6. curator事件監聽
package com.study.demo.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; /** * * @Description: curator事件監聽 * @author leeSmall * @date 2018年9月2日 * */ public class NodeCacheDemo { public static void main(String[] args) throws Exception { String path = "/zk-curator/nodecache"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes()); final NodeCache nc = new NodeCache(client, path, false); nc.start(); //通過回調函數監聽事件 nc.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { System.out.println("update--current data: " + new String(nc.getCurrentData().getData())); } }); client.setData().forPath(path, "test123".getBytes()); Thread.sleep(1000); client.delete().deletingChildrenIfNeeded().forPath(path); Thread.sleep(5000); nc.close(); } }
輸出結果:
update--current data: test123
Curator事件監聽:
NodeCache:節點處理監聽(會使用緩存)。回調接口NodeCacheListener
PathChildrenCache:子節點緩存,處理子節點變化。回調接口PathChildrenCacheListener
TreeCache:NodeCache和PathChildrenCache的結合體。回調接口TreeCacheListener
四、zookeeper會話
1. zookeeper連接的幾種狀態
CONNECTING 正在連接
CONNECTED 已經連接
RECONNECTING 正在重新連接
RECONNECTED 重新連接上
CLOSE 會話關閉
2. session
2.1 session主要由幾個類控制:
SessionTracker, LearnerSessionTracker, SessionTrackerImpl
session初始化的方法:
org.apache.zookeeper.server.SessionTrackerImpl.initializeNextSession(long)
public static long initializeNextSession(long id) { long nextSid = 0; nextSid = (System.currentTimeMillis() << 24) >>> 8; nextSid = nextSid | (id <<56); return nextSid; }
說明:
SessionID的分配(初始化)函數,策略如下:
1)取時間,並且左移24位得到的結果再右移8位(高8位,低16位都是0)
2)sid拿出來進行左移56位
3)和第一步的結果做或運算
2.2 Session分桶(zookeeper的一個特性)
按照Session會話過期時間進行分區塊保存。
這樣設計的好處:可以快速清理過期的session
2.3 session激活過程:
1)檢測會話是否過期
2)計算會話下一次超時時間
3)定位會話的所在區塊
4)遷移會話