一、添加項目所需依賴:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <!-- Apache Curator 包含了幾個包: curator-client:提供一些客戶端的操作,例如重試策略等 curator-framework:對zookeeper的底層api的一些封裝 curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.13.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies>
二、連接zooKeeper 服務,使用 Client API:
1、 application.properties自定義配置: 使用 @ConfigurationProperties 、@EnableConfigurationProperties 注解用來屬性映射類
apache.zookeeper.connect-url=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 #連接地址,此地址為單機集群;
apache.zookeeper.session-timeout=60000 #會話超時時間,默認60000ms
apache.zookeeper.connection-timeout=15000 #連接創建超時時間,默認15000ms
apache.zookeeper.scheme=digest #訪問控制 驗證策略
apache.zookeeper.auth-id=username:password #權限 Id
apache.retrypolicy.base-sleep-time=1000 #重連策略,初始化間隔時間
apache.retrypolicy.max-retries=3 #重連次數
apache.retrypolicy.max-sleep=2147483647 #重連最長時間
2、配置類:
@ConfigurationProperties(prefix = "apache.zookeeper") @Configuration public class ApacheZooKeeperProperties { private String connectUrl; private int sessionTimeout; private int connectionTimeout; private String scheme; private String authId; public String getConnectUrl() { return connectUrl; } public void setConnectUrl(String connectUrl) { this.connectUrl = connectUrl; } public int getSessionTimeout() { return sessionTimeout; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; } public int getConnectionTimeout() { return connectionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } public String getScheme() { return scheme; } public void setScheme(String scheme) { this.scheme = scheme; } public String getAuthId() { return authId; } public void setAuthId(String authId) { this.authId = authId; } }
@ConfigurationProperties(prefix = "apache.retrypolicy") @Configuration public class ApacheRetryPolicy { private int baseSleepTime = 1000; private int maxRetries = 29; private int maxSleep = 2147483647; public int getBaseSleepTime() { return baseSleepTime; } public void setBaseSleepTime(int baseSleepTime) { this.baseSleepTime = baseSleepTime; } public int getMaxRetries() { return maxRetries; } public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } public int getMaxSleep() { return maxSleep; } public void setMaxSleep(int maxSleep) { this.maxSleep = maxSleep; } }
3、演示代碼:
/** * 演示 Apache Curator API * 1、增刪查改 * 2、ACL 訪問權限控制 * 3、注冊 watch 事件的三個接口 */ @RestController public class CuratorClientApiText { private Logger logger = LoggerFactory.getLogger(CuratorClientApiText.class); @Autowired private CuratorFramework zkClient; @Autowired private ApacheZooKeeperProperties apacheZooKeeperProperties; private String userParentPath = "/user"; private String userPersistent = "/user/persistent"; private String userEphemeral = "/ephemeral"; private String userPersistentSequential = "/user/persistent_sequential"; private String userEphemeralSequential = "/ephemeral_sequential"; private String result = ""; /** * Curator API 是鏈式調用風格,遇到 forPath 接口就觸發ZooKeeper 調用 * * 將演示創建 ZooKeeper 四種數據模型 * * @return */ @GetMapping("/create/node") public String createNode(){ try { //添加 acl 用戶 List<ACL> aclList = new ArrayList<>(); aclList.add(new ACL(ZooDefs.Perms.ALL, new Id(apacheZooKeeperProperties.getScheme(), DigestAuthenticationProvider.generateDigest(apacheZooKeeperProperties.getAuthId())))); /** * CuratorListener監聽,此監聽主要針對background通知和錯誤通知; * 使用 watched() 只會觀察一次,只對該節點本身 create、delete、setData 有效; * 使用 inBackground() 會異步監聽到返回信息,一旦使用該接口,就不會有返回值 */ zkClient.getCuratorListenable().addListener(new CuratorListenerImpl()); /** * NodeCache可以監聽節點本身創建、刪除,以及內容的變化,但對於子節點的變化不會監聽 */ NodeCache nodeCache = new NodeCache(zkClient,userParentPath); NodeCacheListenerImpl nodeCacheListener = new NodeCacheListenerImpl(); nodeCacheListener.setNodeCache(nodeCache); nodeCache.start(true); // 設置為 true 把該節點數據存儲到本地 nodeCache.getListenable().addListener(nodeCacheListener); /** * PathChildrenCache用於監聽所有子節點的變化 */ PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, userParentPath, true); /** * StartMode: 初始化方式 * POST_INITIALIZED_EVENT : 異步初始化之后觸發事件 * NORMAL:異步初始化 * BUILD_INITIAL_CACHE:同步初始化 */ pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListenerImpl()); // 注冊 watch 事件 /*Stat stat = zkClient.checkExists() .watched() .forPath(userParentPath); logger.info("/userParentPath 路徑狀態..." + stat); stat = zkClient.checkExists() .watched() .forPath(userPersistent); logger.info("/userPersistent 路徑狀態..." + stat); stat = zkClient.checkExists() .watched() .forPath(userEphemeral); logger.info("/userEphemeral 路徑狀態..." + stat);*/ //持久節點 creatingParentContainersIfNeeded() 接口自動遞歸創建所需節點的父節點 result = zkClient.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(aclList) .forPath(userPersistent, "userPersistentData".getBytes()); Thread.sleep(1000 * 5); logger.info("持久節點..." + result); //臨時節點 不能有子節點 result = zkClient.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(userEphemeral,"userEphemeralData".getBytes()); Thread.sleep(1000 * 5); logger.info("臨時節點..." + result); //持久序列節點 result = zkClient.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT_SEQUENTIAL) .forPath(userPersistentSequential,"userPersistentSequentialData".getBytes()); Thread.sleep(1000 * 5); logger.info("持久有序節點..." + result); //臨時序列節點 result = zkClient.create() .creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(userEphemeralSequential,"userEphemeralSequentialData".getBytes()); Thread.sleep(1000 * 5); logger.info("臨時有序節點..." + result); }catch(Exception e){ logger.info("創建節點失敗..."); e.printStackTrace(); } return "curator api 創建節點"; } @GetMapping("/node/children") public String nodeChildren(){ try{ Stat stat = zkClient.checkExists().watched().forPath(userParentPath); Thread.sleep(1000 * 5); logger.info("判斷節點是否存在..." + stat); List<String> userPaths = zkClient.getChildren().forPath(userParentPath); Thread.sleep(1000 * 5); logger.info("獲取所有子節點..." + userPaths); }catch(Exception e){ logger.info("失敗..."); e.printStackTrace(); } return "curator api 獲取所有子節點"; } @GetMapping("/data/node") public String dataNode(){ try{ //獲取一個節點的內容 byte[] bytes = zkClient.getData().forPath(userPersistent); Thread.sleep(1000 * 5); logger.info("獲取節點數據..." + new String(bytes)); //修改一個節點的內容 Stat stat = zkClient.setData().forPath(userPersistent, "updateUserPersistentData".getBytes()); Thread.sleep(1000 * 5); logger.info("修改節點數據..." + stat); }catch(Exception e){ logger.info("失敗..."); e.printStackTrace(); } return "curator api 獲取、修改節點數據"; } @GetMapping("/delete/node") public String deleteNode(){ try{ //刪除一個節點,強制指定版本進行刪除 Stat stat = new Stat(); zkClient.getData().storingStatIn(stat).forPath(userPersistent); zkClient.delete().withVersion(stat.getVersion()).forPath(userPersistent); Thread.sleep(1000 * 5); //刪除一個節點,並且遞歸刪除其所有子節點 zkClient.delete().deletingChildrenIfNeeded().forPath(userParentPath); Thread.sleep(1000 * 5); }catch(Exception e){ logger.info("刪除節點失敗..."); e.printStackTrace(); } return "curator api 刪除節點"; } }
/** * 連接zooKeeper server,獲得zkClient */ @Configuration @EnableConfigurationProperties(value = {ApacheZooKeeperProperties.class, ApacheRetryPolicy.class}) public class ApacheCuratorConfig { private Logger logger = LoggerFactory.getLogger(ApacheCuratorConfig.class); @Autowired private ApacheZooKeeperProperties apacheZooKeeperProperties; @Autowired private ApacheRetryPolicy apacheRetryPolicy; CuratorFramework client = null; @Bean public CuratorFramework getCuratorFramework(){ logger.info("zooKeeper client init..."); try { //當zk連接時失敗的重連策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(apacheRetryPolicy.getBaseSleepTime(), apacheRetryPolicy.getMaxRetries()); //獲得實例對象,拿到ZK client //CuratorFramework client = CuratorFrameworkFactory.newClient(apacheZooKeeperProperties.getConnectUrl(), apacheZooKeeperProperties.getSessionTimeout(), apacheZooKeeperProperties.getConnectionTimeout(), retryPolicy); List<AuthInfo> authInfos = new ArrayList<>(); authInfos.add(new AuthInfo(apacheZooKeeperProperties.getScheme(), apacheZooKeeperProperties.getAuthId().getBytes())); client = CuratorFrameworkFactory.builder() .authorization(authInfos) .connectString(apacheZooKeeperProperties.getConnectUrl()) .sessionTimeoutMs(apacheZooKeeperProperties.getSessionTimeout()) .connectionTimeoutMs(apacheZooKeeperProperties.getConnectionTimeout()) .retryPolicy(retryPolicy) .namespace("workspace") .build(); client.start(); logger.info("zooKeeper client start..."); }catch (Exception e){ logger.info("zooKeeper connect error..."); e.printStackTrace(); } return client; } }
