Zookeeper框架Curator使用


本文參考自https://blog.csdn.net/wo541075754/article/details/69138878?utm_source=gold_browser_extension

     https://www.cnblogs.com/java-zhao/p/7350945.html

簡介

Curator是Netflix公司開源的一套Zookeeper客戶端框架。了解過Zookeeper原生API都會清楚其復雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反復注冊Watcher和NodeExistsException等。目前已經作為Apache的頂級項目出現,是最流行的Zookeeper客戶端之一。從編碼風格上來講,它提供了基於Fluent的編程風格支持。

除此之外,Curator還提供了Zookeeper的各種應用場景:Recipe、共享鎖服務、Master選舉機制和分布式計數器等。 

具體信息可以參考Apache官網提供的關於Curator的資料信息

版本

目前Curator有2.x.x和3.x.x兩個系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,並且提供了一些諸如動態重新配置、watch刪除等新特性。

項目組件

名稱 描述
Recipes Zookeeper典型應用場景的實現,這些實現是基於Curator Framework。
Framework Zookeeper API的高層封裝,大大簡化Zookeeper客戶端編程,添加了例如Zookeeper連接管理、重試機制等。
Utilities 為Zookeeper提供的各種實用程序。
Client Zookeeper client的封裝,用於取代原生的Zookeeper客戶端(ZooKeeper類),提供一些非常有用的客戶端特性。
Errors Curator如何處理錯誤,連接問題,可恢復的例外等。

Maven依賴

Curator的jar包已經發布到Maven中心,由以下幾個artifact的組成。根據需要選擇引入具體的artifact。但大多數情況下只用引入curator-recipes即可。

 

GroupID/Org ArtifactID/Name 描述
org.apache.curator curator-recipes 所有典型應用場景。需要依賴client和framework,需設置自動獲取依賴。
org.apache.curator curator-framework 同組件中framework介紹。
org.apache.curator curator-client 同組件中client介紹。
org.apache.curator curator-test 包含TestingServer、TestingCluster和一些測試工具。
org.apache.curator curator-examples 各種使用Curator特性的案例。
org.apache.curator curator-x-discovery 在framework上構建的服務發現實現。
org.apache.curator curator-x-discoveryserver 可以喝Curator Discovery一起使用的RESTful服務器。
org.apache.curator curator-x-rpc Curator framework和recipes非java環境的橋接。

 

根據上面的描述,開發人員大多數情況下使用的都是curator-recipes的依賴,此依賴的maven配置如下:

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

由於版本兼容原因,采用了2.x.x的最高版本。

案例及功能說明

創建會話

Curator的創建會話方式與原生的API和ZkClient的創建方式區別很大。Curator創建客戶端是通過CuratorFrameworkFactory工廠類來實現的。其中,此工廠類提供了三種創建客戶端的方法。 
前兩種方法是通過newClient來實現,僅參數不同而已。

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

使用上面方法創建出一個CuratorFramework之后,需要再調用其start()方法完成會話創建。 
實例代碼:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
                5000,1000,retryPolicy);
client.start();

其中參數RetryPolicy提供重試策略的接口,可以讓用戶實現自定義的重試策略。默認提供了以下實現,分別為ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed。

進一步查看源代碼可以得知,其實這兩種方法內部實現一樣,只是對外包裝成不同的方法。它們的底層都是通過第三個方法builder來實現的。 
實例代碼:

RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
            .connectString("hadoop1:2181,hadoop2:2181,hadoop3:2181")
            .sessionTimeoutMs(3000)
            .connectionTimeoutMs(5000)
            .retryPolicy(retryPolicy)
            .build();
client.start();
 

參數:

  • connectString:zk的server地址,多個server之間使用英文逗號分隔開
  • connectionTimeoutMs:連接超時時間,如上是30s,默認是15s
  • sessionTimeoutMs:會話超時時間,如上是50s,默認是60s
  • retryPolicy:失敗重試策略
    • ExponentialBackoffRetry:構造器含有三個參數 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:初始的sleep時間,用於計算之后的每次重試的sleep時間,
        • 計算公式:當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
      • maxRetries:最大重試次數
      • maxSleepMs:最大sleep時間,如果上述的當前sleep計算出來比這個大,那么sleep用這個時間
    • 其他,查看org.apache.curator.RetryPolicy接口的實現類
    • start()會阻塞到會話創建成功為止。

重試策略

上面的例子中使用到了ExponentialBackoffRetry重試策略實現。此策略先給定一個初始化sleep時間baseSleepTimeMs,在此基礎上結合重試次數,通過以下代碼計算當前需要的sleep時間:

long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
if ( sleepMs > maxSleepMs ){
            sleepMs = maxSleepMs;
 }

隨着重試次數的增加,計算出的sleep時間也會越來越大。如果超過maxSleepMs則使用maxSleepMs的時間。其中maxRetries限制了最大的嘗試次數。

創建節點

Curator創建節點的方法也是基於Fluent風格編碼,原生API中的參數很多都轉化為一層層的方法調用來進行設置。下面簡單介紹一下常用的幾個節點創建場景。 
(1)創建一個初始內容為空的節點

client.create().forPath(path);

Curator默認創建的是持久節點,內容為空。 
(2)創建一個包含內容的節點

client.create().forPath(path,"我是內容".getBytes());

Curator和ZkClient不同的是依舊采用Zookeeper原生API的風格,內容使用byte[]作為方法參數。 
(3)創建臨時節點,並遞歸創建父節點

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);

此處Curator和ZkClient一樣封裝了遞歸創建父節點的方法。在遞歸創建父節點時,父節點為持久節點。

刪除節點

刪除節點的方法也是基於Fluent方式來進行操作,不同類型的操作調用新增不同的方法調用即可。 
(1)刪除一個子節點

client.delete().forPath(path);

(2)刪除節點並遞歸刪除其子節點

client.delete().deletingChildrenIfNeeded().forPath(path);

(3)指定版本進行刪除

client.delete().withVersion(1).forPath(path);

如果此版本已經不存在,則刪除異常,異常信息如下。

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for

(4)強制保證刪除一個節點

client.delete().guaranteed().forPath(path);

只要客戶端會話有效,那么Curator會在后台持續進行刪除操作,直到節點刪除成功。比如遇到一些網絡異常的情況,此guaranteed的強制刪除就會很有效果。

讀取數據

讀取節點數據內容API相當簡單,Curator提供了傳入一個Stat,使用節點當前的Stat替換到傳入的Stat的方法,查詢方法執行完成之后,Stat引用已經執行當前最新的節點Stat。

// 普通查詢
client.getData().forPath(path);
// 包含狀態查詢
Stat stat = new Stat();
client.getData().storingStatIn(stat()).forPath(path);

更新數據

更新數據,如果未傳入version參數,那么更新當前最新版本,如果傳入version則更新指定version,如果version已經變更,則拋出異常。

// 普通更新
client.setData().forPath(path,"新內容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);

版本不一直異常信息:

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for

異步接口

在使用以上針對節點的操作API時,我們會發現每個接口都有一個inBackground()方法可供調用。此接口就是Curator提供的異步調用入口。對應的異步處理接口為BackgroundCallback。此接口指提供了一個processResult的方法,用來處理回調結果。其中processResult的參數event中的getType()包含了各種事件類型,getResultCode()包含了各種響應碼。

重點說一下inBackground的以下接口:

public T inBackground(BackgroundCallback callback, Executor executor);

此接口就允許傳入一個Executor實例,用一個專門線程池來處理返回結果之后的業務邏輯。

完整代碼

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.Executors;

public class CuratorTest {

    private static RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("hadoop1:2181,hadoop2:2181,hadoop3:2181")
            .sessionTimeoutMs(3000)
            .connectionTimeoutMs(5000)
            .retryPolicy(retryPolicy)
            .build();

    public static void main(String[] args) throws Exception{
        /**
         * 創建會話
         * */
        client.start();

        /**
         * 同步創建節點
         * 注意:
         *      1.除非指明創建節點的類型,默認是持久節點
         *      2.ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,
         *          只有最后的數據節點才是指定類型的節點,其父節點是持久節點
         * */

        //創建一個初始內容為空的節點
        client.create().forPath("/China");
        //創建一個初始內容不為空的節點
        client.create().forPath("/Korea","jinzhengen".getBytes());
        //創建一個初始內容為空的臨時節點
        client.create().withMode(CreateMode.EPHEMERAL).forPath("America");
        //創建一個初始內容不為空的臨時節點,可以實現遞歸創建
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                .forPath("Japan","xiaoriben".getBytes());


        /**
         *  異步創建節點
         *
         * 注意:如果自己指定了線程池,那么相應的操作就會在線程池中執行,如果沒有指定,
         *   那么就會使用Zookeeper的EventThread線程對事件進行串行處理
         * */
        client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:"
                + event.getResultCode() + ",type:" + event.getType());
            }
        },Executors.newFixedThreadPool(10)).forPath("/async-China");


        client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:"
                        + event.getResultCode() + ",type:" + event.getType());
            }
        }).forPath("/async-America");

        /**
         * 獲取節點內容
         * */
        byte[] data = client.getData().forPath("/Korea");
        System.out.println(new String(data));
        //傳入一個舊的stat變量,來存儲服務端返回的最新的節點狀態信息
        byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/Korea");
        System.out.println(new String(data2));

        /**
         * 更新數據
         * */
        Stat stat = client.setData().forPath("/Korea");
        client.setData().withVersion(4).forPath("/Korea", "jinsanpangzi".getBytes());

        /**
         * 刪除節點
         * */
        //只能刪除葉子節點
        client.delete().forPath("/China");
        //刪除一個節點,並遞歸刪除其所有子節點
        client.delete().deletingChildrenIfNeeded().forPath("/aa");
        //強制指定版本進行刪除
        client.delete().withVersion(4).forPath("/Korea");
        //注意:由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),
        // 如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止
        client.delete().guaranteed().forPath("/America");
        
        
        Thread.sleep(Integer.MAX_VALUE);


    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM