關於zookeeper的原理解析,可以參見zookeeper核心原理詳解,本文所述大多數實踐基於對zookeeper原理的首先理解。
Curator是Netflix公司開源的一個Zookeeper客戶端,目前是apache頂級項目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量,相當於netty之於socket編程。提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架。官網為http://curator.apache.org/
除此之外,Curator中還提供了Zookeeper各種應用場景(Recipe,如共享鎖服務、Master選舉機制和分布式計算器等)的抽象封裝。所以說啊,不管是做底層庫還是應用,用戶體驗真的很重要。
關於zookeeper的java客戶端
Zookeeper的官方客戶端提供了基本的操作,比如,創建會話、創建節點、讀取節點、更新數據、刪除節點和檢查節點是否存在等。但對於開發人員來說,Zookeeper提供的基本操縱還是有一些不足之處。典型的缺點為:
(1)Zookeeper的Watcher是一次性的,每次觸發之后都需要重新進行注冊;
(2)Session超時之后沒有實現重連機制;
(3)異常處理繁瑣,Zookeeper提供了很多異常,對於開發人員來說可能根本不知道該如何處理這些異常信息;
(4)只提供了簡單的byte[]數組的接口,沒有提供針對對象級別的序列化;
(5)創建節點時如果節點存在拋出異常,需要自行檢查節點是否存在;
(6)刪除節點無法實現級聯刪除;
因此,產生了兩款主流的三方zk客戶端,ZkClient和Curator。第一個主流的三方zk客戶端是ZkClient,由Datameer的工程師開發,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反復注冊等功能。像dubbo等框架對其也進行了集成使用。
雖然ZkClient對原生API進行了封裝,但也有它自身的不足之處:
- 幾乎沒有參考文檔;
- 異常處理簡化(拋出RuntimeException);
- 重試機制比較難用;
- 沒有提供各種使用場景的實現;
注:除此之外,很多依賴zookeeper的中間件或大數據組件都配備了與之相適應的zookeeper客戶端,例如hbase、hadoop、fabric8等。
因此,除了早期集成外,目前新的框架和系統很少使用ZkClient,因此本文詳細解析curator。如果讀者對zkclient感興趣,可以參考https://www.jianshu.com/p/d6de2d21d744去,其官網為https://github.com/sgroschupf/zkclient,已經基本不活躍了、更新極少且star不過千。
curator依賴添加
Curator的Maven依賴如下:一般直接使用curator-recipes就行了,如果需要自己封裝一些底層些的功能的話,例如增加連接管理重試機制等,則可以引入curator-framework包。client是低級api。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId>
<!--All of the recipes listed on the ZooKeeper recipes doc (except two phase commit).--> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId>
<!-- High-level API that greatly simplifies using ZooKeeper. --> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId>
<!-- Low-level API --> <version>${apache-curator.version}</version> </dependency>
最新版本可以從https://mvnrepository.com/artifact/org.apache.curator/curator-client查閱,不過需要注意的是,curator和zookeeper本身的依賴(尤其是zookeeper 3.4和3.5不兼容,導致的客戶端也是不一樣)對應關系。目前絕大多數使用2.x的版本。
典型的zk場景
Client操作
利用Curator提供的客戶端API,可以完全實現在zkCli.sh原生客戶端的各種功能。值得注意的是,Curator采用流式風格API。准確的說是類似JPA化。由於針對zk/redis等的操作都相當簡單,因此這種模式在這種場景下是比較合適的。如下:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; /** * Curator framework's client test. * Output: * $ create /zktest hello * $ ls / * [zktest, zookeeper] * $ get /zktest * hello * $ set /zktest world * $ get /zktest * world * $ delete /zktest * $ ls / * [zookeeper] */ public class CuratorClientTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "10.20.30.17:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Client API test // 2.1 Create node String data1 = "hello"; print("create", ZK_PATH, data1); client.create(). creatingParentsIfNeeded(). forPath(ZK_PATH, data1.getBytes()); // 2.2 Get node and data print("ls", "/"); print(client.getChildren().forPath("/")); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.3 Modify data String data2 = "world"; print("set", ZK_PATH, data2); client.setData().forPath(ZK_PATH, data2.getBytes()); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.4 Remove node print("delete", ZK_PATH); client.delete().forPath(ZK_PATH); print("ls", "/"); print(client.getChildren().forPath("/")); } private static void print(String... cmds) { StringBuilder text = new StringBuilder("$ "); for (String cmd : cmds) { text.append(cmd).append(" "); } System.out.println(text.toString()); } private static void print(Object result) { System.out.println( result instanceof byte[] ? new String((byte[]) result) : result); } }
詳細的CuratorFramework功能及使用說明可參見https://curator.apache.org/curator-framework/index.html。
監聽器
Curator提供了三種Watcher(Cache)來監聽結點的變化:
- Path Cache:監視一個路徑下1)孩子結點的創建、2)刪除,3)以及結點數據的更新。產生的事件會傳遞給注冊的PathChildrenCacheListener。
- Node Cache:監視一個結點的創建、更新、刪除,並將結點的數據緩存在本地。
- Tree Cache:Path Cache和Node Cache的“合體”,監視路徑下的創建、更新、刪除事件,並緩存路徑下所有孩子結點的數據。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.RetryNTimes; /** * Curator framework watch test. */ public class CuratorWatcherTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Register watcher PathChildrenCache watcher = new PathChildrenCache( client, ZK_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); } }); watcher.start(StartMode.BUILD_INITIAL_CACHE); System.out.println("Register zk watcher successfully!"); Thread.sleep(Integer.MAX_VALUE); } }
輸出如下:
Java: zk client start successfully! Java: Register zk watcher successfully! zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121] zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121] zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]
下列兩個系列稱為Recipe(專題,關於這個recipe應該如何翻譯,LZ做了研究,直譯是菜譜,肯定不對,也有叫做攻略的,貌似也不正確,所以叫專題可能確實更合適),完整的curator recipe實現可參見https://curator.apache.org/curator-recipes/index.html。
分布式協調
一般我們稱分布式鎖的時候,指的是短時的分布式鎖,因此一般采用redis實現,而zk下的稱之為分布式協調更合理,因為它通常時間更長。比如分布式編程時,比如最容易碰到的情況就是應用程序在線上多機部署,於是當多個應用同時訪問某一資源時,就需要某種機制去協調它們。例如,現在一台應用正在rebuild緩存內容,要臨時鎖住某個區域暫時不讓訪問;又比如調度程序每次只想一個任務被一台應用執行等等。大多數的分布式協調采用臨時節點+watch機制實現。除了直接采用原始的監聽器自己實現外,curator實現了分布式的IPM(進程間鎖)。Curator的機制為:使用我們提供的lock路徑的結點作為全局鎖,這個結點的數據類似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次獲得鎖時會生成這種串,釋放鎖時清空數據。由於內部采用zookeeper的臨時順序節點特性,一旦客戶端失去連接后,則就會自動清除該節點,redis則只能等待超時。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; /** * Curator framework's distributed lock test. */ public class CuratorDistrLockTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
當然實際中會更加復雜,比如只是某些接口需要全局單點,但是服務的粒度又沒有拆分到獨立的微服務。另外,客戶端宕機后鎖是否自動釋放也是要考慮的,否則其他節點就無法接管。InterProcessMutex的實現分析可以參考:https://www.jianshu.com/p/5fa6a1464076
Leader選舉
在分布式系統中,不少系統也采用和zk本身一樣的leader/follower架構,因此存在leader選舉的問題,例如es/kafka(注:在一般分布式系統中,並不會使用到該特性)。curator就包含了對應的解決方法。Curator提供了LeaderSelector監聽器實現Leader選舉功能。同一時刻,只有一個Listener會進入takeLeadership()方法,說明它是當前的Leader。注意:當Listener從takeLeadership()退出時就說明它放棄了“Leader身份”,這時Curator會利用Zookeeper再從剩余的Listener中選出一個新的Leader。autoRequeue()方法使放棄Leadership的Listener有機會重新獲得Leadership,如果不設置的話放棄了的Listener是不會再變成Leader的。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; /** * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */ public class CuratorLeaderTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }