原生Zookeeper API 和 第三方框架工具包Zkclient


序言
直接使用zookeeper的api實現業務功能比較繁瑣。因為要處理session loss,session expire等異常,在發生這些異常后進行重連。

又因為ZK的watcher是一次性的,如果要基於wather實現發布/訂閱模式,還要自己包裝一下,將一次性訂閱包裝成持久訂閱。

另外如果要使用抽象級別更高的功能,比如分布式鎖,leader選舉等,還要自己額外做很多事情。

這里介紹下ZK的第三方客戶端包裝小工具,可以分別解決上述小問題。

在使用ZooKeeper過程中發現,用原生方法實現某些業務功能很麻煩。例如:

1) 如何實現持久的Watcher注冊

ZooKeeper的Watcher是一次性的,用過了需要再注冊;

2) 如何解決session的超時問題

生產環境中如果網絡出現不穩定情況,那么這種情況出現的更加明顯;

3) 如何實現領導選舉

集群情況下可能需要實現stand by,一個服務掛了,另一個需要接替的效果;

4) 如何實現節點數據的封裝

項目中一般都會使用對象,而ZooKeeper只能存放文本類的數據。

ZKClient卻能把這些問題解決了:

1) 持久的Watcher注冊問題:

ZKClient框架將事件重新定義分為了stateChanged、znodeChanged、dataChanged三種情況,用戶可以注冊這三種情況下的監聽器(znodeChanged和dataChanged和路徑有關),而不是注冊Watcher。

ZKClient框架中ZooKeeper只注冊了類ZkClient(實現了Watcher),由ZkClient統一處理WatchedEvent,根據WatchedEvent分發到三種情況的處理方法,處理方法在尋找到監聽器后會將他send到ZkEventThread的BlockingQueue中,由

ZkEventThread以線程的方式執行(個人覺得這個部分寫的還是挺精巧的)。

zkClient將一次性watcher包裝為持久watcher。后者的具體做法是簡單的在watcher回調中,重新讀取數據的同時再注冊相同的watcher實例。

zkClient簡單的使用樣例如下:

public static void testzkClient(final String serverList) {
        ZkClient zkClient4subChild = new ZkClient(serverList);
        zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List currentChilds) throws Exception {
                System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
            }
        });

上面是訂閱children變化,下面是訂閱數據變化

ZkClient zkClient4subData = new ZkClient(serverList);
        zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println(prefix() + "Data of " + dataPath + " has changed");
            }
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(prefix() + dataPath + " has deleted");
            }
        });

訂閱連接狀態的變化:

ZkClient zkClient4subStat = new ZkClient(serverList);
        zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleNewSession() throws Exception {
                System.out.println(prefix() + "handleNewSession()");
            }
            @Override
            public void handleStateChanged(KeeperState stat) throws Exception {
                System.out.println(prefix() + "handleStateChanged,stat:" + stat);
            }
        });

zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。

比如subscribeChildChanges實際上是通過exists和getChildren關注了兩個事件。這樣當create(“/path”)時,對應path上通過getChildren注冊的listener也會被調用。

另外subscribeDataChanges實際上只是通過exists注冊了事件。因為從上表可以看到,對於一個更新,通過exists和getData注冊的watcher要么都會觸發,要么都不會觸發。

2) session的超時問題:

ZKClient框架里會經常看見一些while語句,是由這些while語句完成的,比如ZkClient.retryUntilConnected方法

(感謝紫川的反饋,此條可能存在描述性問題。經校對:ZkClient貌似還是有對Session Expired 處理的,在ZkClient.processStateChanged方法中。雖然能重新連接,但是連接上是一個新的 session,原有創建的ephemeral znode和watch會被

刪除,程序上你可能需要處理這個問題。歡迎大家提出意見,萬分感謝)

3) 領導選舉實現:

選舉實現邏輯這里講的很清楚

ZKClient框架提供了DistributedQueue可以對offer方法做適當修改來實現則個功能


4) 節點數據的封裝:

ZKClient框架提供了ZkSerializer來進行序列化和反序列化,貌似挺有用的。DataUpdater可以用來更新節點數據,進行znode數據轉換。

總的來說ZKClient是一個不錯的ZooKeeper調用工具,減少不少的開發量,設計精巧,收貨不小。

ZKClient下載地址:
GitHub工程(zkClient官方地址)

 

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM