使用Curator操作ZooKeeper


Curator是Netflix公司開源的一個ZooKeeper client library,用於簡化ZooKeeper客戶端編程。它包含如下模塊:

Framework:Framework是ZooKeeper API的High-Level的封裝,它讓訪問ZooKeeper更加簡單。它基於ZooKeeper添加了一些新的特性,同時屏蔽了訪問ZooKeeper集群在管理連接和重試操作方面的復雜度。

Recipes:在Framework的基礎上,實現了一些通用的功能,稱之為“菜單”。

Utilities:訪問ZooKeeper時候的一些公用方法。

Client:一個Low-Level的ZooKeeper客戶端,並有一些公用方法。

Errors:Curator的異常處理,包括連接問題,異常恢復等等。

Extensions:

連接ZooKeeper

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
_client.start();
 

Curator通過CuratorFrameworkFactory來創建客戶端。new出來的客戶端可以保存並且重用。在使用之前需要start一下,絕大部分Curator的操作都必須先start。

在new函數中需要傳入RetryPolicy接口,重連的策略。當和ZooKeeper發生連接異常或者操作異常的時候,就會使用重連策略。ExponentialBackoffRetry是其中一種重連策略。Curator支持很多種重連策略:RetryNTimes(重連N次策略)、RetryForever(永遠重試策略)、ExponentialBackoffRetry(基於backoff的重連策略)、BoundedExponentialBackoffRetry(有邊界的基於backoff的重連策略,即,設定最大sleep時間)等等。

  

下面是官方例子中,ExponentialBackoffRetry的代碼片段。

 
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
 if ( sleepMs > maxSleepMs )
{
log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
sleepMs = maxSleepMs;
}
return sleepMs;

可以看出ExponentialBackoffRetry 重連的時間間隔一般是隨着重試的次數遞增的,如果時間間隔計算出來大於默認的最大sleep時間的話,則去最大sleep時間。ExponentialBackoffRetry 除了時間的限制以外,還有最大重連次數的限制。而BoundedExponentialBackoffRetry策略只是讓用戶設置最大sleep時間而已。默認的最大時間是Integer.MAX_VALUE毫秒。

ZooKeeper節點操作

ZooKeeper 節點優點像文件系統的文件夾,每個節點都可以包含數據。但是ZooKeeper的節點是有生命周期的,這取決於節點的類型。在 ZooKeeper 中,節點類型可以分為持久節點(PERSISTENT )、臨時節點(EPHEMERAL),以及時序節點(SEQUENTIAL ),具體在節點創建過程中,一般是組合使用,可以生成以下 4 種節點類型。不同的組合可以應用到不同的業務場景中。

 

1. 持久化節點

持久化節點創建后,就一直存在,除非有刪除操作主動來刪除這個節點,持久化節點不會因為創建該節點的客戶端會話失效而消失。如果重復創建,客戶端會拋出NodeExistsException異常。

byte[] data = { 1, 2, 3 };
_client.create().withMode(CreateMode.PERSISTENT).forPath("/zktest/p1", data);

 

2. 臨時節點

創建臨時節點后,如果客戶端會話失效,那么這個節點會自動被ZooKeeper刪除。這里是客戶端失效,並不是客戶端斷開連接。因為ZooKeeper服務端和客戶端是用心跳維持狀態,會話留一點時間,這個時間是在創建連接的時候可以設置sessionTimeoutMs參數:

CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);

創建臨時節點的代碼如下:

_client.create().withMode(CreateMode.EPHEMERAL).forPath("/zktest/e1", data);

 

3. 持久化時序節點

_client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/zktest/ps1", data);

上述代碼執行兩次,你會發現客戶端不會報NodeExistsException異常,ZooKeeper會為你創建2個節點,ZooKeeper在每個父節點會為他的第一級子節點維護一份時序,會記錄每個子節點創建的先后順序。在創建子節點的時候,可以設置這個屬性,那么在創建節點過程中,ZooKeeper會自動為給定節點名加上一個數字后綴,作為新的節點名。

image

 

4. 臨時時序節點

持久化時序節點不同的就是節點會在會話失效的時候回消失。

_client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/zktest/es1", data);

 

5. 設置和獲取節點數據

//設置節點數據
_client.setData().forPath("/zktest/ps1", data);
//獲取節點數據
byte[] data2 = _client.getData().forPath("/zktest/ps1");

 

分布式鎖

使用數據庫、Redis、文件系統都可以實現分布式鎖,同樣ZooKeeper也可以用來實現分布式鎖。Curator提供了InterProcessMutex類來幫助我們實現分布式鎖,其內部就是使用的EPHEMERAL_SEQUENTIAL類型節點。

 

public void test() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);

    _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
    _client.start();

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

    for (int i = 0; i < 5; i++) {
        fixedThreadPool.submit(new Runnable() {

            @Override
            public void run() {

                while (true) {
                    try {
                        dowork();
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

private void dowork() throws Exception {

    InterProcessMutex ipm = new InterProcessMutex(_client, "/zktest/distributed_lock");

    try {
        ipm.acquire();

        _logger.info("Thread ID:" + Thread.currentThread().getId() + " acquire the lock");

        Thread.sleep(1000);

        _logger.info("Thread ID:" + Thread.currentThread().getId() + " release the lock");
    } catch (Exception e) {

    } finally {
        ipm.release();
    }
}

執行結果如下圖:

image

 

acquire()方法,會在給定的路徑下面創建臨時時序節點的時序節點。然后它會和父節點下面的其他節點比較時序。如果客戶端創建的臨時時序節點的數字后綴最小的話,則獲得該鎖,函數成功返回。如果沒有獲得到,即,創建的臨時節點數字后綴不是最小的,則啟動一個watch監聽上一個(排在前面一個的節點)。主線程使用object.wait()進行等待,等待watch觸發的線程notifyAll(),一旦上一個節點有事件產生馬上再次出發時序最小節點的判斷。

release()方法就是釋放鎖,內部實現就是刪除創建的EPHEMERAL_SEQUENTIAL節點。

Leader選舉

選舉可以用來實現Master-Slave模式,也可以用來實現主備切換等功能。Curator提供兩種方式實現選舉:LeaderSelector 和 LeaderLatch。兩種方法都可以使用,LeaderLatch語法較為簡單一點,LeaderSelector控制度更高一些。

使用LeaderSelector:

public void test() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);

    _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
    _client.start();

    dowork();

}

private void dowork() {

    LeaderSelectorListener listener = new

    LeaderSelectorListenerAdapter() {
        public void takeLeadership(CuratorFramework client) throws Exception {
            logger.info("Take the lead.");

            Thread.sleep(10000);

            logger.info("Relinquish the lead.");
        }

    };

    LeaderSelector selector = new LeaderSelector(_client, "/zktest/leader", listener);
    selector.autoRequeue();
    selector.start();
}

 

LeaderSelector的內部使用分布式鎖InterProcessMutex實現, 並且在LeaderSelector中添加一個Listener,當獲取到鎖的時候執行回調函數takeLeadership。函數執行完成之后就調用InterProcessMutex.release()釋放鎖,也就是放棄Leader的角色。

 

使用LeaderLatch:

public void test() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);

    _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
    _client.start();

    dowork();

}

private void dowork() {
    LeaderLatch leader = new LeaderLatch(_client, "/zktest/leader");
    leader.addListener(new LeaderLatchListener() {

        @Override
        public void isLeader() {
            // TODO Auto-generated method stub
            logger.info("Take the lead.");

            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            logger.info("Relinquish the lead.");
        }

        @Override
        public void notLeader() {
            // TODO Auto-generated method stub
            logger.info("I am not Leader");
        }
    });

    try {
        leader.start();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

 

同樣是實現Leader選舉的LeaderLatch並沒有通過InterProcessMutex實現,它使用了原生的創建EPHEMERAL_SEQUENTIAL節點的功能再次實現了一遍。同樣的在isLeader方法中需要實現Leader的業務需求,但是一旦isLeader方法返回,就相當於Leader角色放棄了,重新進入選舉過程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM