03.Curator深入使用



1.Apache Curator簡介

    Curator提供了一套Java類庫,可以更容易的使用ZooKeeper。ZooKeeper本身提供了Java Client的訪問類,但是API太底層,不宜使用,易出錯。Curator提供了三個組件。Curator client用來替代ZOoKeeper提供的類,它封裝了底層的管理並提供了一些有用的工具。Curator framework提供了高級的API來簡化ZooKeeper的使用。它增加了很多基於ZooKeeper的特性,幫助管理ZooKeeper的連接以及重試操作。Curator Recipes提供了使用ZooKeeper的一些通用的技巧(方法)。除此之外,Curator Test提供了基於ZooKeeper的單元測試工具。
    所謂技巧(Recipes),也可以稱之為解決方案,或者叫實現方案,是指ZooKeeper的使用方法,比如分布式的配置管理,Leader選舉等。
     Curator最初由Netflix的Jordan Zimmerman開發。20117月在github上基於Apache 2.0開源協議開源。之后發布了多個版本,並被廣泛的應用。 Curator作為Apache ZooKeeper天生配套的組件。ZooKeeper的Java開發者自然而然的會選擇它在項目中使用。
1.Curator組件概覽
  • Recipes:通用ZooKeeper技巧("recipes")的實現. 建立在Curator Framework之上
  • Framework:簡化zookeeper使用的高級. 增加了很多建立在zooper之上的特性. 管理復雜連接處理和重試操作
  • Utilities:各種工具類
  • Client:ZooKeeper本身提供的類的替代者。負責底層的開銷以及一些工具
  • Errors:Curator怎樣來處理錯誤和異常
  • Extensions:curator-recipes包實現了通用的技巧,這些技巧在ZooKeeper文檔中有介紹。為了避免是這個包(package)變得巨大,recipes/applications將會放入一個獨立的extension包下。並使用命名規則curator-x-name
2.Maven/Artifacts
    Curator編譯好的類庫被發布到Maven Center中。Curator包含幾個artifact. 你可以根據你的需要在你的項目中加入相應的依賴。對於大多數開發者來說,引入curator-recipes這一個就足夠了。
     
     
     
             
  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-client</artifactId>
  4. <version>2.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-framework</artifactId>
  9. <version>2.9.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.curator</groupId>
  13. <artifactId>curator-recipes</artifactId>
  14. <version>2.9.0</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.curator</groupId>
  18. <artifactId>curator-x-discovery</artifactId>
  19. <version>2.9.0</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.curator</groupId>
  23. <artifactId>curator-examples</artifactId>
  24. <version>2.9.0</version>
  25. </dependency>

2.Apache Curator Recipes

    Curator實現了 ZooKeeper recipes文檔中列出的所有技巧(除了兩段提交two phase commit)。下面介紹R ecipes 的使用,參考官網: http://curator.apache.org/curator-recipes/index.html
1.Elections(選舉)
  • Leader Latch - 在分布式計算中,leader選舉是在幾台節點中指派單一的進程作為任務組織者的過程。在任務開始前, 所有的網絡節點都不知道哪一個節點會作為任務的leader或coordinator. 一旦leader選舉算法被執行, 網絡中的每個節點都將知道一個特別的唯一的節點作為任務leader
  • Leader Election - 初始的leader選舉實現
2.Locks()
  • Shared Reentrant Lock - 全功能的分布式鎖。任何一刻不會有兩個client同時擁有鎖
  • Shared Lock - 與Shared Reentrant Lock類似但是不是重入的
  • Shared Reentrant Read Write Lock - 類似Java的讀寫鎖,但是是分布式的
  • Shared Semaphore - 跨JVM的計數信號量
  • Multi Shared Lock - 將多個鎖看成整體,要不全部acquire成功,要不acquire全部失敗。release也是釋放全部鎖
3.Barriers(障礙)
  • Barrier - 分布式的barriers。會阻塞全部的節點的處理,直到條件滿足,所有的節點會繼續執行
  • Double Barrier - 雙barrier允許客戶端在一個計算開始點和結束點保持同步。當足夠的進程加入barrier,進程開始它們的計算,當所有的進程完成計算才離開
4.Counters(計數器)
  • Shared Counter - 管理一個共享的整數integer.所有監控同一path的客戶端都會得到最新的值(ZK的一致性保證)
  • Distributed Atomic Long - 嘗試原子增加的計數器首先它嘗試樂觀鎖.如果失敗,可選的InterProcessMutex會被采用.不管是optimistic 還是 mutex,重試機制都被用來嘗試增加值
5.Caches(緩存)
  • Path Cache - Path Cache是用來監控子節點的。每當一個子節點“曾加、更新或刪除”,將會觸發事件,事件就是注冊了的PathChildrenCacheListener實例執行。Path Cache的功能主要由PathChildrenCache類提供
  • Node Cache - 用於監控節點,當節點數據被修改或節點被刪除,就會觸發事件,事件就是注冊了的NodeCacheListener實例執行。Node Cache的功能主要由NodeCache類提供
6.Nodes(節點)
  • Persistent Ephemeral Node - 臨時節點,可以通過連接或會話中斷一個臨時節點。
7.Queues(隊列)
  • Distributed Queue - 分布式的ZK隊列
  • Distributed Id Queue - 分布式的ZK隊列,它支持分配ID來添加到隊列中的項目的替換版本
  • Distributed Priority Queue - 分布式優先級的ZK隊列
  • Distributed Delay Queue - 分布式的延遲ZK隊列
  • Simple Distributed Queue - 一個簡單的替代自帶的ZK分布式隊列(Distributed Queue)
其具體的實現示例,可參考 官網: http://curator.apache.org/curator-recipes/index.html

3.Apache Curator Framework

    Curator framework提供了高級API, 極大的簡化了ZooKeeper的使用。 它在ZooKeeper基礎上增加了很多特性,可以管理與ZOoKeeper的連接和重試機制。這些特性包括:
  • 自動連接管理:有些潛在的錯誤情況需要讓ZooKeeper client重建連接和重試。Curator可以自動地和透明地處理這些情況
  • Cleaner API:簡化原始的ZooKeeper方法,事件等 提供現代的流式接口
1.產生Curator framework實例
    使用CuratorFrameworkFactory產生framework實例。 CuratorFrameworkFactory 既提供了factory方法也提供了builder來創建實例。 CuratorFrameworkFactory是線程安全的。你應該在應用中為單一的ZooKeeper集群共享唯一的CuratorFramework實例。
     工廠方法(newClient())提供了一個簡單的方式創建實例。Builder可以使用更多的參數控制生成的實例。一旦生成framework實例, 必須調用start方法啟動它。應用結束時應該調用close方法關閉它。
2.CuratorFramework API
    CuratorFramework 使用流程風格的接口。 代碼勝於說教:
      
      
      
              
  1. client.create().forPath("/head", new byte[0]);
  2. client.delete().inBackground().forPath("/head");
  3. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
  4. client.getData().watched().inBackground().forPath("/test");
CuratorFramework類重要方法說明
  • create()        開始創建操作, 可以調用額外的方法(比如方式mode 或者后台執行background) 並在最后調用forPath()指定要操作的ZNode
  • delete()        開始刪除操作. 可以調用額外的方法(版本或者后台處理version or background)並在最后調用forPath()指定要操作的ZNode
  • checkExists()   開始檢查ZNode是否存在的操作. 可以調用額外的方法(監控或者后台處理)並在最后調用forPath()指定要操作的ZNode
  • getData()       開始獲得ZNode節點數據的操作. 可以調用額外的方法(監控、后台處理或者獲取狀態watch, background or get stat) 並在最后調用forPath()指定要操作的ZNode
  • setData()       開始設置ZNode節點數據的操作. 可以調用額外的方法(版本或者后台處理) 並在最后調用forPath()指定要操作的ZNode
  • getChildren()   開始獲得ZNode的子節點列表。 以調用額外的方法(監控、后台處理或者獲取狀態watch, background or get stat) 並在最后調用forPath()指定要操作的ZNode
  • inTransaction() 開始是原子ZooKeeper事務. 可以復合create, setData, check, and/or delete 等操作然后調用commit()作為一個原子操作提交
3.通知 Notifications
    服務於后台操作和監控(watch)的通知通過ClientListener接口發布。你通過CuratorFramework實例的addListener方法可以注冊監聽器。
其事件觸發時間說明:
  • CuratorListenable:當使用后台線程操作時,后台線程執行完成就會觸發,例如:client.getData().inBackground().forPath("/test");后台獲取節點數據,獲取完成之后觸發。
  • ConnectionStateListenable:當連接狀態變化時觸發。
  • UnhandledErrorListenable:當后台操作發生異常時觸發。
CuratorListenable事件觸發返回的數據如下:
     
     
     
             
事件類型 事件返回數據
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GET_DATA getResultCode(), getPath(), getStat() and getData()
SET_DATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
SYNC getResultCode(), getStat()
GET_ACL getResultCode(), getACLList()
SET_ACL getResultCode()
TRANSACTION getResultCode(), getOpResults()
WATCHED getWatchedEvent()
GET_CONFIG getResultCode(), getData()
RECONFIG getResultCode(), getData()
4.命名空間
    你可以使用命名空間Namespace避免多個應用的節點的名稱沖突。 CuratorFramework提供了命名空間的概念,這樣CuratorFramework會為它的API調用的path加上命名空間:
      
      
      
              
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
5.臨時客戶端
     Curator還提供了臨時的CuratorFramework:CuratorTempFramework,一定時間不活動后連接會被關閉。創建builder時不是調用build()而是調用buildTemp()。3分鍾不活動連接就被關閉,你也可以指定不活動的時間。
      
      
      
              
CuratorTempFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")// 連接串
.retryPolicy(new RetryNTimes(10, 5000))// 重試策略
.connectionTimeoutMs(100) // 連接超時
.sessionTimeoutMs(100) // 會話超時
.buildTemp(100, TimeUnit.MINUTES); // 臨時客戶端並設置連接時間
它只提供了下面幾個方法:
      
      
      
              
public void close();
public CuratorTransaction inTransaction() throws Exception;
public TempGetDataBuilder getData() throws Exception;
6.Retry策略
    retry策略可以改變retry的行為。 它抽象出RetryPolicy接口, 包含一個方法public boolean allowRetry(int retryCount, long elapsedTimeMs);。 在retry被嘗試執行前, allowRetry()被調用,並且將當前的重試次數和操作已用時間作為參數. 如果返回true, retry被執行。否則異常被拋出。
Curator本身提供了幾個策略:
  • ExponentialBackoffRetry:重試一定次數,每次重試sleep更多的時間
  • RetryNTimes:重試N次
  • RetryOneTime:重試一次
  • RetryUntilElapsed:重試一定的時間

4.Apache Curator Utilities

    Curator提供了一組工具類和方法用來測試基於Curator的應用。 並且提供了操作ZNode輔助類以及其它一些數據結構
1.Test Server
    curator-test提供了TestingServer類。 這個類創建了一個本地的, 同進程的ZooKeeper服務器用來測試。
     
     
     
             
  1. public static void main(String[] args) throws Exception
  2. {
  3. TestingServer server = new TestingServer();
  4. CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
  5. client.getConnectionStateListenable().addListener(new ConnectionStateListener()
  6. {
  7. @Override
  8. public void stateChanged(CuratorFramework client, ConnectionState newState)
  9. {
  10. System.out.println("連接狀態:" + newState.name());
  11. }
  12. });
  13. client.start();
  14. System.out.println(client.getChildren().forPath("/"));
  15. client.create().forPath("/test");
  16. System.out.println(client.getChildren().forPath("/"));
  17. CloseableUtils.closeQuietly(client);
  18. CloseableUtils.closeQuietly(server);
  19. System.out.println("OK!");
  20. }
2.Test Cluster
     curator-test提供了TestingCluster類。 這個類創建了一個內部的ZooKeeper集群用來測試。
     
     
     
             
  1. public static void main(String[] args) throws Exception
  2. {
  3. TestingCluster cluster = new TestingCluster(3);
  4. cluster.start();
  5. for (TestingZooKeeperServer server : cluster.getServers())
  6. {
  7. System.out.println(server.getInstanceSpec());
  8. }
  9. cluster.stop();
  10. CloseableUtils.closeQuietly(cluster);
  11. System.out.println("OK!");
  12. }
3.ZKPaths
提供了各種靜態方法來操作ZNode:
  • getNodeFromPath: 從一個全路徑中得到節點名, 比如 "/one/two/three" 返回 "three"
  • mkdirs: 確保所有的節點都已被創建
  • getSortedChildren: 得到一個給定路徑的子節點, 按照sequence number排序
  • makePath: 給定父路徑和子節點,創建一個全路徑
4.EnsurePath
    確保一個特定的路徑被創建。當它第一次使用時,一個同步ZKPaths.mkdirs(ZooKeeper, String)調用被觸發來確保完整的路徑都已經被創建。后續的調用將不是同步操作. 用法:
    
    
    
            
  1. EnsurePath ensurePath = new EnsurePath(aFullPathToEnsure);
  2. ...
  3. String nodePath = aFullPathToEnsure + "/foo";
  4. ensurePath.ensure(zk); // first time syncs and creates if needed
  5. zk.create(nodePath, ...);
  6. ...
  7. ensurePath.ensure(zk); // subsequent times are NOPs
  8. zk.create(nodePath, ...);
注意: 此方法namespace會參與路徑名字的創建。
5.Blocking Queue Consumer( 阻塞隊列消費者)
     請參看Distributed Queue 和 Distributed Priority Queue。 提供JDK BlockingQueue類似的行為。
6.Queue Sharder
     由於zookeeper傳輸層的限制,單一的隊列如果超過10K的元素會被分割(break)。這個類為多個分布式隊列提供了一個facade。它監控隊列,如果一個隊列超過這個閾值,一個新的隊列就被創建。在這些隊列中Put是分布式的。
7.Reaper and ChildReaper
Reaper
    可以用來刪除鎖的父路徑。定時檢查路徑被加入到reaper中。當檢查時,如果path沒有子節點/路徑,此路徑將被刪除。每個應用中CLient應該只創建一個reaper實例。必須將lock path加到這個readper中。reaper會定時的檢查刪除它們。
ChildReaper
    用來清除父節點下所有的空節點。定時的調用getChildren()並將空節點加入到內部管理的reaper中。
注意:應該考慮使用LeaderSelector來運行Reapers,因為它們不需要在每個client運行

5.Apache Curator Client

    Curator client使用底層的API, 強烈推薦你是用Curator Framework代替使用CuratorZookeeperClient
1.背景
CuratorZookeeperClient 是ZooKeeper client的包裝類。但是提供了更簡單方式, 而且可以減少錯誤的發生。它提供了下列的特性:
  • 持續的連接管理 - ZooKeeper有很多的關於連接管理的警告(你可以到ZooKeeper FAQ查看細節)。CuratorZookeeperClient 可以自動的管理這些事情。
  • retry - 提供一個方式處理retry(重試)。
  • Test ZooKeeper server - 提供一個進程內的ZooKeeper測試服務器用來測試和實驗。
2.方法
  • Constructor - 創建一個給定ZooKeeper集群的連接。 你可以傳入一個可選的watcher. 必須提供Retry策略
  • getZooKeeper() - 返回管理的ZooKeeper實例. 重要提示: a) 它會花費些許時間等待連接來完成, 在使用其它方法之前你應該校驗連接是否完成. b) 管理的ZooKeeper實例可以根據特定的事件而改變。 不要持有實例太長時間. 總是調用getZooKeeper()得到一個新的實例
  • isConnected() - 返回ZooKeeper client當前連接狀態
  • blockUntilConnectedOrTimedOut() - block知道連接成功或者超時
  • close() - 關閉連接
  • setRetryPolicy() - 改變retry(重試)策略
  • newRetryLoop() - 分配一個新的Retry Loop(重試循環)
3.Retry Loop(重試循環)
    由於各種各樣的原因,在zookeeper集群上的操作難免遇到失敗的情況。最佳實踐表明應該提供重試機制。Retry Loop為此而生。 每個操作都被包裝在一個Retry Loop中。下面是一個典型的處理流程:
    
    
    
            
  1. RetryLoop retryLoop = client.newRetryLoop();
  2. while ( retryLoop.shouldContinue() )
  3. {
  4. try
  5. {
  6. // perform your work
  7. ...
  8. // it's important to re\-get the ZK instance as there may have been an error and the instance was re\-created
  9. ZooKeeper zk = client.getZookeeper();
  10. retryLoop.markComplete();
  11. }
  12. catch ( Exception e )
  13. {
  14. retryLoop.takeException(e);
  15. }
  16. }
    Retry Loop維護一定數量的retry, 它還決定一個錯誤是否可以要執行retry。 假如一個錯誤需要retry,Retry策略被調用來決定retry是要要執行,執行多少次才放棄。
    很方便地,RetryLoop 提供了一個靜態方法使用Callable來執行一個完整retry loop。
    
    
    
            
  1. RetryLoop.callWithRetry(client, new Callable<Void>()
  2. {
  3. @Override
  4. public Void call() throws Exception
  5. {
  6. // do your work here - it will get retried if needed
  7. return null;
  8. }
  9. });
4.Retry策略(重試策略)
    retry策略可以改變retry的行為。它抽象出RetryPolicy接口,包含一個方法 public boolean allowRetry ( int retryCount , long elapsedTimeMs ) 。在retry被嘗試執行前,allowRetry()被調用,並且將當前的重試次數和操作已用時間作為參數.如果返回true, retry被執行。否則異常被拋出。
Curator本身提供了幾個策略(在 com.netflix.curator.retry 包下):
  • ExponentialBackoffRetry:重試一定次數,每次重試sleep更多的時間
  • RetryNTimes:重試N次
  • RetryOneTime:重試一次
  • RetryUntilElapsed:重試一定的時間
-------------------------------------------------------------------------------------------------------------------------------




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM