使用Apache Curator監控Zookeeper的Node和Path的狀態


  1.Zookeeper經常被我們用來做配置管理,配置的管理在分布式應用環境中很常見,例如同一個應用系統需要多台 PC Server 運行,但是它們運行的應用系統的某些配置項是相同的,如果要修改這些相同的配置項,那么就必須同時修改每台運行這個應用系統的 PC Server,這樣非常麻煩而且容易出錯。像這樣的配置信息完全可以交給 Zookeeper 來管理,將配置信息保存在 Zookeeper 的某個目錄節點中,然后將所有需要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每台應用機器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應用到系統中。

 

                                                                                       

 

  我們通過Curator是如何實現的呢? 那就是NodeCache,關於如何實現,后面代碼給出說明。

 

  假如我們有多個服務保存在Zookeeper的/services下,例如/services/service1,/services/service2......在service1,servce2下,保存的有服務的ip,端口等信息。如果我們需要增加服務,或者某個服務不可用了,從Zookeeper中刪除了,或者我們修改了某個Service下的ip和端口值,我們有必要第一時間內收到通知,已進行相應的處理,這時候可以怎么辦呢? Curator提供了一個pathChildrenCache來滿足我們的需求。下面我們給出代碼來說明兩個的用法.

 

 

package com.hupengcool.cache;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.EnsurePath;

import java.util.List;

/**
 * Created by hupeng on 2014/9/19.
 */
public class Cache {

    public static PathChildrenCache pathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
        final PathChildrenCache cached = new PathChildrenCache(client, path, cacheData);
        cached.getListenable().addListener(new PathChildrenCacheListener() { 
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                PathChildrenCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        cached.rebuild();
                        break;
                    case CONNECTION_SUSPENDED:
                    case CONNECTION_LOST:
                        System.out.println("Connection error,waiting...");
                        break;
                    default:
                        System.out.println("PathChildrenCache changed : {path:" + event.getData().getPath() + " data:" +
                                new String(event.getData().getData()) + "}");
                }
            }
        });
        return cached;
    }


    public static NodeCache nodeCache(CuratorFramework client, String path) {
        final NodeCache cache = new NodeCache(client, path);
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("NodeCache changed, data is: " + new String(cache.getCurrentData().getData()));
            }
        });

        return cache;
    }


    public static void main(String[] args) throws Exception {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);

        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1", retryPolicy);
        client.start();

        EnsurePath ensurePath = client.newNamespaceAwareEnsurePath("/create/test");
        ensurePath.ensure(client.getZookeeperClient());

        /**
         * pathChildrenCache
         */
        PathChildrenCache cache = pathChildrenCache(client, "/create", true);
        cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        List<ChildData> datas = cache.getCurrentData();

        for (ChildData data : datas) {
            System.out.println("pathcache:{" + data.getPath() + ":" + new String(data.getData())+"}");
        }


        /**
         *    NodeCache
         */
        NodeCache nodeCache = nodeCache(client, "/create/test");
        nodeCache.start(true);

        client.setData().forPath("/create/test", "1111".getBytes());

        System.out.println(new String(nodeCache.getCurrentData().getData()));

        Thread.sleep(10000);
        CloseableUtils.closeQuietly(cache);
        CloseableUtils.closeQuietly(client);
    }
}

 

 

不足之處,請指正。(*^__^*) ,Thanks

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM