背景
前段時間學習了zookeeper后,在新的項目中剛好派上了用場,我在項目中主要負責分布式任務調度模塊的開發,對我自己來說是個不小的挑戰。
分布式的任務調度,技術上我們選擇了zookeeper,具體的整個分布式任務調度的架構選擇會另起一篇文章進行介紹。
本文主要是介紹自己在項目中zookeeper的一些擴展使用,希望可以對大家有所幫助。
項目中使用的zookeeper版本3.3.3,對應的文檔地址: http://zookeeper.apache.org/doc/trunk/
擴展一:優先集群
先來點背景知識:
1.zookeeper中的server機器之間會組成leader/follower集群,1:n的關系。采用了paxos一致性算法保證了數據的一致性,就是leader/follower會采用通訊的方式進行投票來實現paxns。
2.zookeeper還支持一種observer模式,提供只讀服務不參與投票,提升系統,對應文檔: http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html
我們項目特性的決定了我們需要進行跨機房操作,比如杭州,美國,香港,青島等多個機房之間進行數據交互。
跨機房之間對應的網絡延遲都比較大,比如中美機房走海底光纜有ping操作200ms的延遲,杭州和青島機房有70ms的延遲。
為了提升系統的網絡性能,我們在部署zookeeper網絡時會在每個機房部署節點,多個機房之間再組成一個大的網絡保證數據一致性。(zookeeper千萬別再搞多個集群)
最后的部署結構就會是:
- 杭州機房 >=3台 (構建leader/follower的zk集群)
- 青島機房 >=1台 (構建observer的zk集群)
- 美國機房 >=1台 (構建observer的zk集群)
- 香港機房 >=1台 (構建observer的zk集群)

- 先使用美國機房的集群ip初始化一次zk client
- 通過反射方式,強制在初始化后的zk client中的server列表中又加入杭州機房的機器列表
1.ZooKeeper zk = null;
2. try {
3. zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() {
4.
5. public void asyncProcess(WatchedEvent event) {
6. //do nothing
7. }
8.
9. });
10. if (serveraddrs.size() > 1) {
11. // 強制的聲明accessible
12. ReflectionUtils.makeAccessible(clientCnxnField);
13. ReflectionUtils.makeAccessible(serverAddrsField);
14. // 添加第二組集群列表
15. for (int i = 1; i < serveraddrs.size(); i++) {
16. String cluster = serveraddrs.get(i);
17. // 強制獲取zk中的地址信息
18. ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
19. List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils
20. .getField(serverAddrsField, cnxn);
21. // 添加第二組集群列表
22. serverAddrs.addAll(buildServerAddrs(cluster));
23. }
24. }
25. }
擴展二:異步Watcher處理
最早在看zookeeper的代碼時,一直對它的watcher處理比較滿意,使用watcher推送數據可以很方便的實現分布式鎖的功能。
zookeeper的watcher實現原理也挺簡單的,就是在zookeeper client和zookeeper server上都保存一份對應的watcher對象。每個zookeeper機器都會有一份完整的node tree數據和watcher數據,每次leader通知follower/observer數據發生變更后,每個zookeeper server會根據自己節點中的watcher事件推送給響應的zookeeper client,每個zk client收到后再根據內存中的watcher引用,進行回調。
這里會有個問題,就是zk client在處理watcher時,回凋的過程是一個串行的執行過程,所以單個watcher的處理慢會影響整個列表的響應。
可以看一下ClientCnxn類中的EventThread處理,該線程會定時消費一個queue的數據,挨個調用processEvent(Object event) 進行回調處理。
擴展代碼:
1.public abstract class AsyncWatcher implements Watcher {
2.
3. private static final int DEFAULT_POOL_SIZE = 30;
4. private static final int DEFAULT_ACCEPT_COUNT = 60;
5.
6. private static ExecutorService executor = new ThreadPoolExecutor(
7. 1,
8. DEFAULT_POOL_SIZE,
9. 0L,
10. TimeUnit.MILLISECONDS,
11. new ArrayBlockingQueue(
12. DEFAULT_ACCEPT_COUNT),
13. new NamedThreadFactory(
14. "Arbitrate-Async-Watcher"),
15. new ThreadPoolExecutor.CallerRunsPolicy());
16.
17. public void process(final WatchedEvent event) {
18. executor.execute(new Runnable() {//提交異步處理
19.
20. @Override
21. public void run() {
22. asyncProcess(event);
23. }
24. });
25.
26. }
27.
28. public abstract void asyncProcess(WatchedEvent event);
29.
30.}
- zookeeper針對watcher的調用是以單線程串行的方式進行處理,容易造成堵塞影響,monitor的數據同步及時性
- AsyncWatcher為采取的一種策略為當不超過acceptCount=60的任務時,會采用異步線程的方式處理。如果超過60任務,會變為原先的單線程串行的模式
擴展三:重試處理
這個也不多說啥,看一下相關文檔就清楚了
- http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
- http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A3
1.public interface ZooKeeperOperation<T> {
2.
3. public T execute() throws KeeperException, InterruptedException;
4.}
5.
6.
7./** 8. * 包裝重試策略 9. */
10. public <T> T retryOperation(ZooKeeperOperation<T> operation) throws KeeperException,
11. InterruptedException {
12. KeeperException exception = null;
13. for (int i = 0; i < maxRetry; i++) {
14. try {
15. return (T) operation.execute();
16. } catch (KeeperException.SessionExpiredException e) {
17. logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e);
18. throw e;
19. } catch (KeeperException.ConnectionLossException e) { //特殊處理Connection Loss
20. if (exception == null) {
21. exception = e;
22. }
23. logger.warn("Attempt " + i + " failed with connection loss so "
24. + "attempting to reconnect: " + e, e);
25.
26. retryDelay(i);
27. }
28. }
29.
30. throw exception;
31. }
注意點:Watcher原子性
在使用zookeeper的過程中,需要特別注意一點就是注冊對應watcher事件時,如果當前的節點已經滿足了條件,比如exist的watcher,它不會觸發你的watcher,而會等待下一次watcher條件的滿足。
它的watcher是一個一次性的監聽,而不是一個永久的訂閱過程。所以在watcher響應和再次注冊watcher過程並不是一個原子操作,編寫多線程代碼和鎖時需要特別注意
總結
zookeepr是一個挺不錯的產品,源代碼寫的也非常不錯,大量使用了queue和異步Thread的處理模式,真是一個偉大的產品。
雲棲社區站內文章,如需轉載,請保留作者和出處(雲棲社區),並郵件通知雲棲社區(yqeditor@list.alibaba-inc.com)。