1、篇首語
curator是zookeeper的一個高級api開發包。封裝了zookeeper眾多的recipes,並且實現了一些新的recipes原語,最重要的是基於zookeeper提供的各種機制實現了更健壯的連接和異常處理。
本文將其中比較常用的一種recipe,就是cache。
2、各種Caches
cache是一種緩存機制,可以借助cache實現監聽。
簡單來說,cache在客戶端緩存了znode的各種狀態,當感知到zk集群的znode狀態變化,會觸發event事件,注冊的監聽器會處理這些事件。是不是很簡單。
curator支持的cache種類有3種Path Cache,Node Cache,Tree Cache
1)Path Cache
Path Cache用來觀察ZNode的子節點並緩存狀態,如果ZNode的子節點被創建,更新或者刪除,那么Path Cache會更新緩存,並且觸發事件給注冊的監聽器。
Path Cache是通過PathChildrenCache類來實現的,監聽器注冊是通過PathChildrenCacheListener。
2)Node Cache
Node Cache用來觀察ZNode自身,如果ZNode節點本身被創建,更新或者刪除,那么Node Cache會更新緩存,並觸發事件給注冊的監聽器。
Node Cache是通過NodeCache類來實現的,監聽器對應的接口為NodeCacheListener。
3)Tree Cache
可以看做是上兩種的合體,Tree Cache觀察的是所有節點的所有數據。
3、下面給出一個例子。
1)這是在springboot中使用curator,先給出curator依賴pom
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.9.1</version> </dependency>
2)三種cache的實現
package com.dqa.prometheus.client.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.util.ArrayList; import java.util.List; public class ZkClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private CuratorFramework client; private NodeCache nodeCache; private PathChildrenCache pathChildrenCache; private TreeCache treeCache; private String zookeeperServer; private int sessionTimeoutMs; private int connectionTimeoutMs; private int baseSleepTimeMs; private int maxRetries; public void setZookeeperServer(String zookeeperServer) { this.zookeeperServer = zookeeperServer; } public String getZookeeperServer() { return zookeeperServer; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public int getSessionTimeoutMs() { return sessionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public int getConnectionTimeoutMs() { return connectionTimeoutMs; } public void setBaseSleepTimeMs(int baseSleepTimeMs) { this.baseSleepTimeMs = baseSleepTimeMs; } public int getBaseSleepTimeMs() { return baseSleepTimeMs; } public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } public int getMaxRetries() { return maxRetries; } public void init() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy) .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build(); client.start(); } public void stop() { if (client != null) CloseableUtils.closeQuietly(client); if (pathChildrenCache != null) CloseableUtils.closeQuietly(pathChildrenCache); if (nodeCache != null) CloseableUtils.closeQuietly(nodeCache); if (treeCache != null) CloseableUtils.closeQuietly(treeCache); } public CuratorFramework getClient() { return client; } /* * 設置Path Cache, 監控本節點的子節點被創建,更新或者刪除,注意是子節點, 子節點下的子節點不能遞歸監控 * 事件類型有3個, 可以根據不同的動作觸發不同的動作 * 本例子只是演示, 所以只是打印了狀態改變的信息, 並沒有在PathChildrenCacheListener中實現復雜的邏輯 * @Param path 監控的節點路徑, cacheData 是否緩存data * 可重入監聽 * */ public void setPathCacheListener(String path, boolean cacheData) { try { pathChildrenCache = new PathChildrenCache(client, path, cacheData); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: logger.info("子節點增加, path={}, data={}", data.getPath(), data.getData()); break; case CHILD_UPDATED: logger.info("子節點更新, path={}, data={}", data.getPath(), data.getData()); break; case CHILD_REMOVED: logger.info("子節點刪除, path={}, data={}", data.getPath(), data.getData()); break; default: break; } } }; pathChildrenCache.getListenable().addListener(childrenCacheListener); pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT); } catch (Exception e) { logger.error("PathCache監聽失敗, path=", path); } } /* * 設置Node Cache, 監控本節點的新增,刪除,更新 * 節點的update可以監控到, 如果刪除會自動再次創建空節點 * 本例子只是演示, 所以只是打印了狀態改變的信息, 並沒有在NodeCacheListener中實現復雜的邏輯 * @Param path 監控的節點路徑, dataIsCompressed 數據是否壓縮 * 不可重入監聽 * */ public void setNodeCacheListener(String path, boolean dataIsCompressed) { try { nodeCache = new NodeCache(client, path, dataIsCompressed); NodeCacheListener nodeCacheListener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData childData = nodeCache.getCurrentData(); logger.info("ZNode節點狀態改變, path={}", childData.getPath()); logger.info("ZNode節點狀態改變, data={}", childData.getData()); logger.info("ZNode節點狀態改變, stat={}", childData.getStat()); } }; nodeCache.getListenable().addListener(nodeCacheListener); nodeCache.start(); } catch (Exception e) { logger.error("創建NodeCache監聽失敗, path={}", path); } } /* * 設置Tree Cache, 監控本節點的新增,刪除,更新 * 節點的update可以監控到, 如果刪除不會自動再次創建 * 本例子只是演示, 所以只是打印了狀態改變的信息, 並沒有在NodeCacheListener中實現復雜的邏輯 * @Param path 監控的節點路徑, dataIsCompressed 數據是否壓縮 * 可重入監聽 * */ public void setTreeCacheListener(final String path) { try { treeCache = new TreeCache(client, path); TreeCacheListener treeCacheListener = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if(data != null){ switch (event.getType()) { case NODE_ADDED: logger.info("[TreeCache]節點增加, path={}, data={}", data.getPath(), data.getData()); break; case NODE_UPDATED: logger.info("[TreeCache]節點更新, path={}, data={}", data.getPath(), data.getData()); break; case NODE_REMOVED: logger.info("[TreeCache]節點刪除, path={}, data={}", data.getPath(), data.getData()); break; default: break; } }else{ logger.info("[TreeCache]節點數據為空, path={}", data.getPath()); } } }; treeCache.getListenable().addListener(treeCacheListener); treeCache.start(); } catch (Exception e) { logger.error("創建TreeCache監聽失敗, path={}", path); } } }
3)configuration
init方法是初始化zookeeper client的操作
stop是停止zookeeper是的清理動作
package com.dqa.prometheus.configuration; import com.xiaoju.dqa.prometheus.client.zookeeper.ZkClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ZkConfiguration { @Value("${zookeeper.server}") private String zookeeperServer; @Value(("${zookeeper.sessionTimeoutMs}")) private int sessionTimeoutMs; @Value("${zookeeper.connectionTimeoutMs}") private int connectionTimeoutMs; @Value("${zookeeper.maxRetries}") private int maxRetries; @Value("${zookeeper.baseSleepTimeMs}") private int baseSleepTimeMs; @Bean(initMethod = "init", destroyMethod = "stop") public ZkClient zkClient() { ZkClient zkClient = new ZkClient(); zkClient.setZookeeperServer(zookeeperServer); zkClient.setSessionTimeoutMs(sessionTimeoutMs); zkClient.setConnectionTimeoutMs(connectionTimeoutMs); zkClient.setMaxRetries(maxRetries); zkClient.setBaseSleepTimeMs(baseSleepTimeMs); return zkClient; } }
3)zk配置文件
其中最重要的應該是會話超時和重試機制了。
============== zookeeper =================== zookeeper.server=10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181 zookeeper.sessionTimeoutMs=6000 zookeeper.connectionTimeoutMs=6000 zookeeper.maxRetries=3 zookeeper.baseSleepTimeMs=1000