zookeeper項目使用幾點小結


背景

  前段時間學習了zookeeper后,在新的項目中剛好派上了用場,我在項目中主要負責分布式任務調度模塊的開發,對我自己來說是個不小的挑戰。

  分布式的任務調度,技術上我們選擇了zookeeper,具體的整個分布式任務調度的架構選擇會另起一篇文章進行介紹。

 

  本文主要是介紹自己在項目中zookeeper的一些擴展使用,希望可以對大家有所幫助。

  項目中使用的zookeeper版本3.3.3,對應的文檔地址: http://zookeeper.apache.org/doc/trunk/

 

擴展一:優先集群

先來點背景知識:

1.zookeeper中的server機器之間會組成leader/follower集群,1:n的關系。采用了paxos一致性算法保證了數據的一致性,就是leader/follower會采用通訊的方式進行投票來實現paxns。

2.zookeeper還支持一種observer模式,提供只讀服務不參與投票,提升系統,對應文檔: http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html

 

我們項目特性的決定了我們需要進行跨機房操作,比如杭州,美國,香港,青島等多個機房之間進行數據交互。

跨機房之間對應的網絡延遲都比較大,比如中美機房走海底光纜有ping操作200ms的延遲,杭州和青島機房有70ms的延遲。 

 

為了提升系統的網絡性能,我們在部署zookeeper網絡時會在每個機房部署節點,多個機房之間再組成一個大的網絡保證數據一致性。(zookeeper千萬別再搞多個集群)

 

最后的部署結構就會是:

  • 杭州機房  >=3台 (構建leader/follower的zk集群)
  • 青島機房  >=1台 (構建observer的zk集群)
  • 美國機房  >=1台 (構建observer的zk集群)
  • 香港機房  >=1台 (構建observer的zk集群)


 
一句話概括就是: 在單個機房內組成一個投票集群,外圍的機房都會是一個observer集群和投票集群進行數據交互。 這樣部署的一些好處,大家可以細細體會一下
 
針對這樣的部署結構,我們會引入一個優先集群問題: 比如在美國機房的機器需要優先去訪問本機房的zk集群,訪問不到后才去訪問杭州機房。 
默認在zookeeper3.3.3的實現中,認為所有的節點都是對等的。並沒有對應的優先集群的概念,單個機器也沒有對應的優先級的概念。
 
擴展代碼:(比較暴力,采用反射的方式改變了zk client的集群列表)
  • 先使用美國機房的集群ip初始化一次zk client
  • 通過反射方式,強制在初始化后的zk client中的server列表中又加入杭州機房的機器列表

 

1.ZooKeeper zk = null;  
2.        try {  
3.            zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() {  
4.  
5.                public void asyncProcess(WatchedEvent event) {  
6.                    //do nothing 
7.                }  
8.  
9.            });  
10.            if (serveraddrs.size() > 1) {  
11.                // 強制的聲明accessible 
12.                ReflectionUtils.makeAccessible(clientCnxnField);  
13.                ReflectionUtils.makeAccessible(serverAddrsField);  
14.                // 添加第二組集群列表 
15.                for (int i = 1; i < serveraddrs.size(); i++) {  
16.                    String cluster = serveraddrs.get(i);  
17.                    // 強制獲取zk中的地址信息 
18.                    ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);  
19.                    List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils  
20.                            .getField(serverAddrsField, cnxn);  
21.                    // 添加第二組集群列表 
22.                    serverAddrs.addAll(buildServerAddrs(cluster));  
23.                }  
24.            }  
25.        }  

 

擴展二:異步Watcher處理

  最早在看zookeeper的代碼時,一直對它的watcher處理比較滿意,使用watcher推送數據可以很方便的實現分布式鎖的功能。

zookeeper的watcher實現原理也挺簡單的,就是在zookeeper client和zookeeper server上都保存一份對應的watcher對象。每個zookeeper機器都會有一份完整的node tree數據和watcher數據,每次leader通知follower/observer數據發生變更后,每個zookeeper server會根據自己節點中的watcher事件推送給響應的zookeeper client,每個zk client收到后再根據內存中的watcher引用,進行回調。

 

這里會有個問題,就是zk client在處理watcher時,回凋的過程是一個串行的執行過程,所以單個watcher的處理慢會影響整個列表的響應。 

可以看一下ClientCnxn類中的EventThread處理,該線程會定時消費一個queue的數據,挨個調用processEvent(Object event) 進行回調處理。

 

擴展代碼:

 

1.public abstract class AsyncWatcher implements Watcher {  
2.  
3.    private static final int       DEFAULT_POOL_SIZE    = 30;  
4.    private static final int       DEFAULT_ACCEPT_COUNT = 60;  
5.  
6.    private static ExecutorService executor             = new ThreadPoolExecutor(  
7.                                                                1,  
8.                                                                DEFAULT_POOL_SIZE,  
9.                                                                0L,  
10.                                                                TimeUnit.MILLISECONDS,  
11.                                                                new ArrayBlockingQueue(  
12.                                                                        DEFAULT_ACCEPT_COUNT),  
13.                                                                new NamedThreadFactory(  
14.                                                                        "Arbitrate-Async-Watcher"),  
15.                                                                new ThreadPoolExecutor.CallerRunsPolicy());  
16.  
17.    public void process(final WatchedEvent event) {  
18.        executor.execute(new Runnable() {//提交異步處理 
19.  
20.                    @Override  
21.                    public void run() {  
22.                        asyncProcess(event);  
23.                    }  
24.                });  
25.  
26.    }  
27.  
28.    public abstract void asyncProcess(WatchedEvent event);  
29.  
30.}  

 

說明:
  • zookeeper針對watcher的調用是以單線程串行的方式進行處理,容易造成堵塞影響,monitor的數據同步及時性
  • AsyncWatcher為采取的一種策略為當不超過acceptCount=60的任務時,會采用異步線程的方式處理。如果超過60任務,會變為原先的單線程串行的模式

擴展三:重試處理

這個也不多說啥,看一下相關文檔就清楚了

 

需要特殊處理下ConnectionLoss的異常,一種可恢復的異常。
 
重試處理:

1.public interface ZooKeeperOperation<T> {  
2.  
3.    public T execute() throws KeeperException, InterruptedException;  
4.}  
5.  
6.  
7./** 8. * 包裝重試策略 9. */  
10.    public <T> T retryOperation(ZooKeeperOperation<T> operation) throws KeeperException,  
11.            InterruptedException {  
12.        KeeperException exception = null;  
13.        for (int i = 0; i < maxRetry; i++) {  
14.            try {  
15.                return (T) operation.execute();  
16.            } catch (KeeperException.SessionExpiredException e) {  
17.                logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e);  
18.                throw e;  
19.            } catch (KeeperException.ConnectionLossException e) { //特殊處理Connection Loss 
20.                if (exception == null) {  
21.                    exception = e;  
22.                }  
23.                logger.warn("Attempt " + i + " failed with connection loss so "  
24.                        + "attempting to reconnect: " + e, e);  
25.  
26.                retryDelay(i);  
27.            }  
28.        }  
29.  
30.        throw exception;  
31.    }  

注意點:Watcher原子性

在使用zookeeper的過程中,需要特別注意一點就是注冊對應watcher事件時,如果當前的節點已經滿足了條件,比如exist的watcher,它不會觸發你的watcher,而會等待下一次watcher條件的滿足。

它的watcher是一個一次性的監聽,而不是一個永久的訂閱過程。所以在watcher響應和再次注冊watcher過程並不是一個原子操作,編寫多線程代碼和鎖時需要特別注意

總結

  zookeepr是一個挺不錯的產品,源代碼寫的也非常不錯,大量使用了queue和異步Thread的處理模式,真是一個偉大的產品。

 

 

雲棲社區站內文章,如需轉載,請保留作者和出處(雲棲社區),並郵件通知雲棲社區(yqeditor@list.alibaba-inc.com)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM