springBoot 整合 ZooKeeper Java客戶端之 Apache Curator 實戰


一、添加項目所需依賴:

   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    
    <!-- Apache Curator 包含了幾個包:
               curator-client:提供一些客戶端的操作,例如重試策略等      
      curator-framework:對zookeeper的底層api的一些封裝
               curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等-->

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>                            

 

二、連接zooKeeper 服務,使用 Client API:

  1、 application.properties自定義配置: 使用 @ConfigurationProperties 、@EnableConfigurationProperties 注解用來屬性映射類

apache.zookeeper.connect-url=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 #連接地址,此地址為單機集群;
apache.zookeeper.session-timeout=60000 #會話超時時間,默認60000ms
apache.zookeeper.connection-timeout=15000 #連接創建超時時間,默認15000ms
apache.zookeeper.scheme=digest #訪問控制 驗證策略
apache.zookeeper.auth-id=username:password #權限 Id
apache.retrypolicy.base-sleep-time=1000 #重連策略,初始化間隔時間 
apache.retrypolicy.max-retries=3 #重連次數
apache.retrypolicy.max-sleep=2147483647 #重連最長時間

  2、配置類:

@ConfigurationProperties(prefix = "apache.zookeeper")
@Configuration
public class ApacheZooKeeperProperties {

    private String connectUrl;

    private int sessionTimeout;

    private int connectionTimeout;

    private String scheme;

    private String authId;


    public String getConnectUrl() {
        return connectUrl;
    }

    public void setConnectUrl(String connectUrl) {
        this.connectUrl = connectUrl;
    }

    public int getSessionTimeout() {
        return sessionTimeout;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public int getConnectionTimeout() {
        return connectionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public String getScheme() {
        return scheme;
    }

    public void setScheme(String scheme) {
        this.scheme = scheme;
    }

    public String getAuthId() {
        return authId;
    }

    public void setAuthId(String authId) {
        this.authId = authId;
    }
}
@ConfigurationProperties(prefix = "apache.retrypolicy")
@Configuration
public class ApacheRetryPolicy {

    private int baseSleepTime = 1000;

    private int maxRetries = 29;

    private int maxSleep = 2147483647;

    public int getBaseSleepTime() {
        return baseSleepTime;
    }

    public void setBaseSleepTime(int baseSleepTime) {
        this.baseSleepTime = baseSleepTime;
    }

    public int getMaxRetries() {
        return maxRetries;
    }

    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }

    public int getMaxSleep() {
        return maxSleep;
    }

    public void setMaxSleep(int maxSleep) {
        this.maxSleep = maxSleep;
    }
}

  3、演示代碼:

/**
 * 演示 Apache Curator API
 * 1、增刪查改
 * 2、ACL 訪問權限控制
 * 3、注冊 watch 事件的三個接口
 */
@RestController
public class CuratorClientApiText {

    private Logger logger = LoggerFactory.getLogger(CuratorClientApiText.class);

    @Autowired
    private CuratorFramework zkClient;

    @Autowired
    private ApacheZooKeeperProperties apacheZooKeeperProperties;

    private String userParentPath = "/user";

    private String userPersistent = "/user/persistent";

    private String userEphemeral = "/ephemeral";

    private String userPersistentSequential = "/user/persistent_sequential";

    private String userEphemeralSequential = "/ephemeral_sequential";

    private String result = "";


    /**
     * Curator API 是鏈式調用風格,遇到 forPath 接口就觸發ZooKeeper 調用
     *
     * 將演示創建 ZooKeeper 四種數據模型
     *
     * @return
     */
    @GetMapping("/create/node")
    public String createNode(){

        try {
            //添加 acl 用戶
            List<ACL> aclList  = new ArrayList<>();
            aclList.add(new ACL(ZooDefs.Perms.ALL, new Id(apacheZooKeeperProperties.getScheme(), DigestAuthenticationProvider.generateDigest(apacheZooKeeperProperties.getAuthId()))));

            /**
             * CuratorListener監聽,此監聽主要針對background通知和錯誤通知;
             * 使用 watched() 只會觀察一次,只對該節點本身 create、delete、setData 有效;
             * 使用 inBackground() 會異步監聽到返回信息,一旦使用該接口,就不會有返回值
             */
            zkClient.getCuratorListenable().addListener(new CuratorListenerImpl());

            /**
             * NodeCache可以監聽節點本身創建、刪除,以及內容的變化,但對於子節點的變化不會監聽
             */
            NodeCache nodeCache = new NodeCache(zkClient,userParentPath);
            NodeCacheListenerImpl nodeCacheListener = new NodeCacheListenerImpl();
            nodeCacheListener.setNodeCache(nodeCache);
            nodeCache.start(true); // 設置為 true 把該節點數據存儲到本地
            nodeCache.getListenable().addListener(nodeCacheListener);

            /**
             * PathChildrenCache用於監聽所有子節點的變化
             */
            PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, userParentPath, true);
            /**
             * StartMode: 初始化方式
             * POST_INITIALIZED_EVENT : 異步初始化之后觸發事件
             * NORMAL:異步初始化
             * BUILD_INITIAL_CACHE:同步初始化
             */
            pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            pathChildrenCache.getListenable().addListener(new PathChildrenCacheListenerImpl());

            // 注冊 watch 事件 
            /*Stat stat = zkClient.checkExists()
                    .watched()
                    .forPath(userParentPath);
            logger.info("/userParentPath 路徑狀態..." + stat);

            stat = zkClient.checkExists()
                    .watched()
                    .forPath(userPersistent);
            logger.info("/userPersistent 路徑狀態..." + stat);

            stat = zkClient.checkExists()
                    .watched()
                    .forPath(userEphemeral);
            logger.info("/userEphemeral 路徑狀態..." + stat);*/

            //持久節點 creatingParentContainersIfNeeded() 接口自動遞歸創建所需節點的父節點
            result = zkClient.create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .withACL(aclList)
                        .forPath(userPersistent, "userPersistentData".getBytes());

            Thread.sleep(1000 * 5);
            logger.info("持久節點..." + result);

            //臨時節點 不能有子節點
            result = zkClient.create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(userEphemeral,"userEphemeralData".getBytes());

            Thread.sleep(1000 * 5);
            logger.info("臨時節點..." + result);

            //持久序列節點
            result = zkClient.create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                        .forPath(userPersistentSequential,"userPersistentSequentialData".getBytes());

            Thread.sleep(1000 * 5);
            logger.info("持久有序節點..." + result);

            //臨時序列節點
            result = zkClient.create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .forPath(userEphemeralSequential,"userEphemeralSequentialData".getBytes());

            Thread.sleep(1000 * 5);
            logger.info("臨時有序節點..." + result);
        }catch(Exception e){
            logger.info("創建節點失敗...");
            e.printStackTrace();
        }

        return "curator api 創建節點";
    }

    @GetMapping("/node/children")
    public String nodeChildren(){
        try{
            Stat stat = zkClient.checkExists().watched().forPath(userParentPath);
            Thread.sleep(1000 * 5);
            logger.info("判斷節點是否存在..." + stat);

            List<String> userPaths =  zkClient.getChildren().forPath(userParentPath);
            Thread.sleep(1000 * 5);
            logger.info("獲取所有子節點..." + userPaths);

        }catch(Exception e){
            logger.info("失敗...");
            e.printStackTrace();
        }

        return "curator api 獲取所有子節點";
    }

    @GetMapping("/data/node")
    public String dataNode(){
        try{
            //獲取一個節點的內容
            byte[] bytes = zkClient.getData().forPath(userPersistent);
            Thread.sleep(1000 * 5);
            logger.info("獲取節點數據..." + new String(bytes));

            //修改一個節點的內容
            Stat stat = zkClient.setData().forPath(userPersistent, "updateUserPersistentData".getBytes());
            Thread.sleep(1000 * 5);
            logger.info("修改節點數據..." + stat);

        }catch(Exception e){
            logger.info("失敗...");
            e.printStackTrace();
        }
        return "curator api 獲取、修改節點數據";
    }

    @GetMapping("/delete/node")
    public String deleteNode(){
        try{

            //刪除一個節點,強制指定版本進行刪除
            Stat stat = new Stat();
            zkClient.getData().storingStatIn(stat).forPath(userPersistent);
            zkClient.delete().withVersion(stat.getVersion()).forPath(userPersistent);
            Thread.sleep(1000 * 5);

            //刪除一個節點,並且遞歸刪除其所有子節點
            zkClient.delete().deletingChildrenIfNeeded().forPath(userParentPath);
            Thread.sleep(1000 * 5);

        }catch(Exception e){
            logger.info("刪除節點失敗...");
            e.printStackTrace();
        }

        return "curator api 刪除節點";
    }

}
/**
 * 連接zooKeeper server,獲得zkClient
 */
@Configuration
@EnableConfigurationProperties(value = {ApacheZooKeeperProperties.class, ApacheRetryPolicy.class})
public class ApacheCuratorConfig {

    private Logger logger = LoggerFactory.getLogger(ApacheCuratorConfig.class);

    @Autowired
    private ApacheZooKeeperProperties apacheZooKeeperProperties;

    @Autowired
    private ApacheRetryPolicy apacheRetryPolicy;

    CuratorFramework client = null;

    @Bean
    public CuratorFramework getCuratorFramework(){
        logger.info("zooKeeper client init...");

        try {
            //當zk連接時失敗的重連策略
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(apacheRetryPolicy.getBaseSleepTime(), apacheRetryPolicy.getMaxRetries());

            //獲得實例對象,拿到ZK client
            //CuratorFramework client = CuratorFrameworkFactory.newClient(apacheZooKeeperProperties.getConnectUrl(), apacheZooKeeperProperties.getSessionTimeout(), apacheZooKeeperProperties.getConnectionTimeout(), retryPolicy);
            List<AuthInfo> authInfos = new ArrayList<>();
            authInfos.add(new AuthInfo(apacheZooKeeperProperties.getScheme(), apacheZooKeeperProperties.getAuthId().getBytes()));

            client = CuratorFrameworkFactory.builder()
                    .authorization(authInfos)
                    .connectString(apacheZooKeeperProperties.getConnectUrl())
                    .sessionTimeoutMs(apacheZooKeeperProperties.getSessionTimeout())
                    .connectionTimeoutMs(apacheZooKeeperProperties.getConnectionTimeout())
                    .retryPolicy(retryPolicy)
                    .namespace("workspace")
                    .build();

            client.start();
            logger.info("zooKeeper client start...");

        }catch (Exception e){
            logger.info("zooKeeper connect error...");
            e.printStackTrace();
        }
        return client;
    }
} 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM