Curator框架使用鏈式編程風格,易讀性更強,使用工廠方法創建連接對象。
1.使用CuratorFrameworkFactory的兩個靜態工廠方法(參數不同)來實現
參數1:connectString,連接信息
參數2:RetryPolicy,重試連接策略,有四種實現
ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed
參數3:sessionTimeoutMs會話超時時間,默認為60s
參數4:connectionTimeoutMs連接超時時間,默認為15s
注意:對於retryPolicy策略通過一個接口來讓用戶自定義實現
1 import java.util.List; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 5 import org.apache.curator.RetryPolicy; 6 import org.apache.curator.framework.CuratorFramework; 7 import org.apache.curator.framework.CuratorFrameworkFactory; 8 import org.apache.curator.framework.api.BackgroundCallback; 9 import org.apache.curator.framework.api.CuratorEvent; 10 import org.apache.curator.retry.ExponentialBackoffRetry; 11 import org.apache.zookeeper.CreateMode; 12 import org.apache.zookeeper.ZooKeeper.States; 13 import org.apache.zookeeper.data.Stat; 14 15 public class CuratorBase { 16 17 /** zookeeper地址 */ 18 static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; 19 /** session超時時間 */ 20 static final int SESSION_OUTTIME = 5000;//ms 21 22 public static void main(String[] args) throws Exception { 23 24 //1 重試策略:初試時間為1s 重試10次 25 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 26 //2 通過工廠創建連接 27 CuratorFramework cf = CuratorFrameworkFactory.builder() 28 .connectString(CONNECT_ADDR) 29 .sessionTimeoutMs(SESSION_OUTTIME) 30 .retryPolicy(retryPolicy) 31 // .namespace("super") 32 .build(); 33 //3 開啟連接 34 cf.start(); 35 36 // System.out.println(States.CONNECTED); 37 // System.out.println(cf.getState()); 38 39 // 新加、刪除 40 /** 41 //4 建立節點 指定節點類型(不加withMode默認為持久類型節點)、路徑、數據內容 42 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1內容".getBytes()); 43 //5 刪除節點 44 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); 45 */ 46 47 // 讀取、修改 48 /** 49 //創建節點 50 // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1內容".getBytes()); 51 // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2內容".getBytes()); 52 //讀取節點 53 // String ret1 = new String(cf.getData().forPath("/super/c2")); 54 // System.out.println(ret1); 55 //修改節點 56 // cf.setData().forPath("/super/c2", "修改c2內容".getBytes()); 57 // String ret2 = new String(cf.getData().forPath("/super/c2")); 58 // System.out.println(ret2); 59 */ 60 61 // 綁定回調函數 62 /** 63 ExecutorService pool = Executors.newCachedThreadPool(); 64 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) 65 .inBackground(new BackgroundCallback() { 66 @Override 67 public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { 68 System.out.println("code:" + ce.getResultCode()); 69 System.out.println("type:" + ce.getType()); 70 System.out.println("線程為:" + Thread.currentThread().getName()); 71 } 72 }, pool) 73 .forPath("/super/c3","c3內容".getBytes()); 74 Thread.sleep(Integer.MAX_VALUE); 75 */ 76 77 78 // 讀取子節點getChildren方法 和 判斷節點是否存在checkExists方法 79 /** 80 List<String> list = cf.getChildren().forPath("/super"); 81 for(String p : list){ 82 System.out.println(p); 83 } 84 85 Stat stat = cf.checkExists().forPath("/super/c3"); 86 System.out.println(stat); 87 88 Thread.sleep(2000); 89 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); 90 */ 91 92 93 //cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); 94 95 } 96 }