原理http://cailin.iteye.com/blog/2014486
其實原理簡單來說,就是要選舉leader,會生成一個zxid,然后分發給所有的server(所以這里一台server可以接受多台server給他發送要選舉leader的請求),然后各個server根據發送給自己的zxid,選擇一個值最大的,然后將這個選擇返回給發送這個zxid的server,只要這個server收到的答復大於等於2/n+1個(也就是超過半數的同意票),則表明自己當選為leader,然后會向所有server廣播自己已經成為leader。
概述
Zookeeper是Hadoop的一個子項目,它是分布式系統中的協調系統,可提供的服務主要有:配置服務、名字服務、分布式同步、組服務等。
它有如下的一些特點:
- 簡單
Zookeeper的核心是一個精簡的文件系統,它支持一些簡單的操作和一些抽象操作,例如,排序和通知。
- 豐富
Zookeeper的原語操作是很豐富的,可實現一些協調數據結構和協議。例如,分布式隊列、分布式鎖和一組同級別節點中的“領導者選舉”。
- 高可靠
Zookeeper支持集群模式,可以很容易的解決單點故障問題。
- 松耦合交互
不同進程間的交互不需要了解彼此,甚至可以不必同時存在,某進程在zookeeper中留下消息后,該進程結束后其它進程還可以讀這條消息。
- 資源庫
Zookeeper實現了一個關於通用協調模式的開源共享存儲庫,能使開發者免於編寫這類通用協議。
Getting Started
1.獨立模式安裝並啟動
create it in conf/zoo.cfg
tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181
initLimit=10
syncLimit=5
tickTime:指定了ZooKeeper的基本時間單位(以毫秒為單位)。
dataDir:存儲內存數據快照位置。
clientPort:監聽客戶端連接端口。
initLimmit:啟動zookeeper,從節點至主節點連接超時時間。(上面為10個tickTime)
syncLimit:zookeeper正常運行,若主從同步時間超過syncLimit,則丟棄該從節點。
配置完,啟動zookeeper
bin/zkServer.sh start
檢查是否運行:
echo ruok | nc localhost 2181
1.1連接到zookeeper
$ bin/zkCli.sh -server 127.0.0.1:2181
在shell命令喊中使用help查看命令列表
[zkshell: 0] help
如:
創建一個zookeeper節點:
create /zk_test my_data
獲取節點數據
get /zk_test
修改節點數據
set /zk_test junk
刪除節點
delete /zk_test
2.復制模式啟動zookeeper
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
http://blog.csdn.net/a906998248/article/details/50815031 ---集群啟動 具體步驟
注意:前一個端口用於leader和follower之間數據交換,后一個端口用於leader選舉。
ZooKeeper Programmer's Guide
1. zookeeper數據模型
類似於文件系統,所有的節點都是絕對路徑,將文件和目錄抽象成znode。和Unix中的文件系統路徑格式很想,但是只支持絕對路徑,不支持相對路徑,也不支持點號(”.”和”..”)。
1.1 znode
維護了數據和ACL改變的版本號,每一次數據改變版本號增加,當有一個client去執行update和delete時候,必須提供一個數據變更版本號,如果與數據不符合,則更新失敗。
1.2存儲小數據
一般不超過1M,大量數據會花費更多的時間和延遲來完成數據拷貝,由於網絡的消耗。
1.3 瞬時節點
隨着會話結束而刪除
When the session ends the znode is deleted. Because
of this behavior ephemeral znodes are not allowed to have children.
1.4操作的原子性
Znode的數據讀寫是原子的,要么讀或寫了完整的數據,要么就失敗,不會出現只讀或寫了部分數據。
1.5 順序znode
名稱中包含Zookeeper指定順序號的znode。若在創建znode時設置了順序標識,那么該znode被創建后,名字的后邊將會附加一串數字,該數字是由一個單調遞增的計數器來生成的。例如,創建節點時傳入的path是”/aa/bb”,創建后的則可能是”/aa/bb0002”,再次創建后是”/aa/bb0003”。
Znode的創建模式CreateMode有四種,分別是:EPHEMERAL(短暫的znode)、EPHEMERAL_SEQUENTIAL(短暫的順序znode)、PERSISTENT(持久的znode)和PERSISTENT_SEQUENTIAL(持久的順序znode)。如果您已經看過了上篇博文,那么這里的api調用應該是很好理解的,見:http://www.cnblogs.com/leocook/p/zk_0.html。
2 zookeeper中的時間
Zxid:zookeeper狀態的改變都會有事物id自增來維護。
Version numbers :znode的數據和ACL變更維護。
ticks:zookeeper集群部署時的時間單位。
3 zookeepr watches
可以在讀操作exists、getChildren和getData上設置觀察,在執行寫操作create、delete和setData將會觸發觀察事件,當然,在執行寫的操作時,也可以選擇是否觸發znode上設置的觀察器,具體可查看相關的api。
當觀察的znode被創建、刪除或其數據被更新時,設置在exists上的觀察將會被觸發;
當觀察的znode被刪除或數據被更新時,設置在getData上的觀察將會被觸發;
當觀察的znode的子節點被創建、刪除或znode自身被刪除時,設置在getChildren上的觀察將會被觸發,可通過觀察事件的類型來判斷被刪除的是znode還是它的子節點。
注意:在收到收到觸發事件到執行讀操作之間,znode的狀態可能會發生狀態,這點需要牢記。
4 ACL
ACL權限:
• CREATE: you can create a child node
• READ: you can get data from a node and list its children.
• WRITE: you can set data for a node
• DELETE: you can delete a child node
• ADMIN: you can set permissions
身份驗證模式:
• world 無驗證
• auth 只能使用sessionID
• digest username:password 驗證
• ip 客戶端IP驗證
• host 客戶端主機名驗證
添加驗證
可使用zk對象的addAuthInfo()方法來添加驗證模式,如使用digest模式進行身份驗證:zk.addAuthInfo(“digest”,”username:passwd”.getBytes());
在zookeeper對象被創建時,初始化會被添加world驗證模式。world身份驗證模式的驗證id是”anyone”。
若該連接創建了znode,那么他將會被添加auth身份驗證模式的驗證id是””,即空字符串,這里將使用sessionID進行驗證。
創建自定義驗證:創建ACL對象時,可用ACL類的構造方法ACL(int perms, Id id)
5 zookeeper 內部原理
- 選舉領導
集群中所有的zk實例會選舉出來一個“領導實例”(leader),其它實例稱之為“隨從實例”(follower)。如果leader出現故障,其余的實例會選出一台leader,並一起提供服務,若之前的leader恢復正常,便成為follower。選舉follower是一個很快的過程,性能影響不明顯。
Leader主要功能是協調所有實例實現寫操作的原子性,即:所有的寫操作都會轉發給leader,然后leader會將更新廣播給所有的follower,當半數以上的實例都完成寫操作后,leader才會提交這個寫操作,隨后客戶端會收到寫操作執行成功的響應。
- 原子廣播
上邊已經說到:所有的寫操作都會轉發給leader,然后leader會將更新廣播給所有的follower,當半數以上的實例都完成寫操作后,leader才會提交這個寫操作,隨后客戶端會收到寫操作執行成功的響應。這么來的話,就實現了客戶端的寫操作的原子性,每個寫操作要么成功要么失敗。邏輯和數據庫的兩階段提交協議很像。
Znode的每次寫操作都相當於數據庫里的一次事務提交,每個寫操作都有個全局唯一的ID,稱為:zxid(ZooKeeper Transaction)。ZooKeeper會根據寫操作的zxid大小來對操作進行排序,zxid小的操作會先執行。zk下邊的這些特性保證了它的數據一致性:
- 順序一致性
任意客戶端的寫操作都會按其發送的順序被提交。如果一個客戶端把某znode的值改為a,然后又把值改為b(后面沒有其它任何修改),那么任何客戶端在讀到值為b之后都不會再讀到a。
- 原子性
這一點再前面已經說了,寫操作只有成功和失敗兩種狀態,不存在只寫了百分之多少這么一說。
- 單一系統映像
客戶端只會連接host列表中狀態最新的那些實例。如果正在連接到的實例掛了,客戶端會嘗試重新連接到集群中的其他實例,那么此時滯后於故障實例的其它實例都不會接收該連接請求,只有和故障實例版本相同或更新的實例才接收該連接請求。
- 持久性
寫操作完成之后將會被持久化存儲,不受服務器故障影響。
- 及時性
在對某個znode進行讀操作時,應該先執行sync方法,使得讀操作的連接所連的zk實例能與leader進行同步,從而能讀到最新的類容。
注意:sync調用是異步的,無需等待調用的返回,zk服務器會保證所有后續的操作會在sync操作完成之后才執行,哪怕這些操作是在執行sync之前被提交的。
java構建zookeeper應用
http://www.cnblogs.com/ggjucheng/p/3370359.html
String create(String path, byte[] data, List<ACL> acl,CreateMode createMode) | 創建一個給定的目錄節點 path, 並給它設置數據,CreateMode 標識有四種形式的目錄節點,分別是 PERSISTENT:持久化目錄節點,這個目錄節點存儲的數據不會丟失;PERSISTENT_SEQUENTIAL:順序自動編號的目錄節點,這種目錄節點會根據當前已近存在的節點數自動加 1,然后返回給客戶端已經成功創建的目錄節點名;EPHEMERAL:臨時目錄節點,一旦創建這個節點的客戶端與服務器端口也就是 session 超時,這種節點會被自動刪除;EPHEMERAL_SEQUENTIAL:臨時自動編號節點 |
Stat exists(String path, boolean watch) | 判斷某個 path 是否存在,並設置是否監控這個目錄節點,這里的 watcher 是在創建 ZooKeeper 實例時指定的 watcher,exists方法還有一個重載方法,可以指定特定的watcher |
Stat exists(String path,Watcher watcher) | 重載方法,這里給某個目錄節點設置特定的 watcher,Watcher 在 ZooKeeper 是一個核心功能,Watcher 可以監控目錄節點的數據變化以及子目錄的變化,一旦這些狀態發生變化,服務器就會通知所有設置在這個目錄節點上的 Watcher,從而每個客戶端都很快知道它所關注的目錄節點的狀態發生變化,而做出相應的反應 |
void delete(String path, int version) | 刪除 path 對應的目錄節點,version 為 -1 可以匹配任何版本,也就刪除了這個目錄節點所有數據 |
List<String>getChildren(String path, boolean watch) | 獲取指定 path 下的所有子目錄節點,同樣 getChildren方法也有一個重載方法可以設置特定的 watcher 監控子節點的狀態 |
Stat setData(String path, byte[] data, int version) | 給 path 設置數據,可以指定這個數據的版本號,如果 version 為 -1 怎可以匹配任何版本 |
byte[] getData(String path, boolean watch, Stat stat) | 獲取這個 path 對應的目錄節點存儲的數據,數據的版本等信息可以通過 stat 來指定,同時還可以設置是否監控這個目錄節點數據的狀態 |
voidaddAuthInfo(String scheme, byte[] auth) | 客戶端將自己的授權信息提交給服務器,服務器將根據這個授權信息驗證客戶端的訪問權限。 |
Stat setACL(String path,List<ACL> acl, int version) | 給某個目錄節點重新設置訪問權限,需要注意的是 Zookeeper 中的目錄節點權限不具有傳遞性,父目錄節點的權限不能傳遞給子目錄節點。目錄節點 ACL 由兩部分組成:perms 和 id。 Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 幾種 而 id 標識了訪問目錄節點的身份列表,默認情況下有以下兩種: ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分別表示任何人都可以訪問和創建者擁有訪問權限。 |
List<ACL>getACL(String path,Stat stat) | 獲取某個目錄節點的訪問權限列表 |
關閉防火牆:Exception in thread “main” org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for
public class TestZk { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { // 創建一個與服務器的連接 ZooKeeper zk = new ZooKeeper("192.168.64.10:2181", ClientBase.CONNECTION_TIMEOUT, new Watcher() { // 監控所有被觸發的事件 public void process(WatchedEvent event) { System.out.println("已經觸發了" + event.getType() + "事件!"); } }); // 創建一個目錄節點 ==>已經觸發了 None 事件! zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 創建一個子目錄節點 zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // testRootData 此處false 不觸發事件 System.out.println(new String(zk.getData("/testRootPath", false, null))); // 取出子目錄節點列表 ==>[testChildPathOne] 在節點/testRootPath的getChildren上設置觀察 System.out.println(zk.getChildren("/testRootPath", true)); // 修改子目錄節點數據 由於上面的修改數據不觸發觀察 這邊不執行事件 zk.setData("/testRootPath/testChildPathOne", "modifyChildDataOne".getBytes(), -1); // 目錄節點狀態:[5,5,1281804532336,1281804532336,0,1,0,0,12,1,6] System.out.println("目錄節點狀態:[" + zk.exists("/testRootPath", true) + "]"); // 創建另外一個子目錄節點 ==>已經觸發了 NodeChildrenChanged 事件! zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // testChildDataTwo System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo", true, null))); // 刪除子目錄節點 已經觸發了 NodeDeleted 事件! zk.delete("/testRootPath/testChildPathTwo", -1); zk.delete("/testRootPath/testChildPathOne", -1); // 刪除父目錄節點 已經觸發了 NodeDeleted 事件! zk.delete("/testRootPath", -1); // 關閉連接 zk.close(); } }
運行結果
ZooKeeper 典型的應用場景
Zookeeper 從設計模式角度來看,是一個基於觀察者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上注冊的那些觀察者做出相應的反應,從而實現集群中類似 Master/Slave 管理模式,關於 Zookeeper 的詳細架構等內部細節可以閱讀 Zookeeper 的源碼
下面詳細介紹這些典型的應用場景,也就是 Zookeeper 到底能幫我們解決那些問題?下面將給出答案。
1 統一命名服務(Name Service)
分布式應用中,通常需要有一套完整的命名規則,既能夠產生唯一的名稱又便於人識別和記住,通常情況下用樹形的名稱結構是一個理想的選擇,樹形的名稱結構是一個有層次的目錄結構,既對人友好又不會重復。說到這里你可能想到了 JNDI,沒錯 Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結構關聯到一定資源上,但是 Zookeeper 的 Name Service 更加是廣泛意義上的關聯,也許你並不需要將名稱關聯到特定資源上,你可能只需要一個不會重復名稱,就像數據庫中產生一個唯一的數字主鍵一樣。
Name Service 已經是 Zookeeper 內置的功能,你只要調用 Zookeeper 的 API 就能實現。如調用 create 接口就可以很容易創建一個目錄節點。
2 配置管理(Configuration Management)
配置的管理在分布式應用環境中很常見,例如同一個應用系統需要多台 PC Server 運行,但是它們運行的應用系統的某些配置項是相同的,如果要修改這些相同的配置項,那么就必須同時修改每台運行這個應用系統的 PC Server,這樣非常麻煩而且容易出錯。
像這樣的配置信息完全可以交給 Zookeeper 來管理,將配置信息保存在 Zookeeper 的某個目錄節點中,然后將所有需要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每台應用機器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應用到系統中.
配置管理結構圖
3 集群管理(Group Membership)
Zookeeper 能夠很容易的實現集群管理的功能,如有多台 Server 組成一個服務集群,那么必須要一個“總管”知道當前集群中每台機器的服務狀態,一旦有機器不能提供服務,集群中其它集群必須知道,從而做出調整重新分配服務策略。同樣當增加集群的服務能力時,就會增加一台或多台 Server,同樣也必須讓“總管”知道。
Zookeeper 不僅能夠幫你維護當前的集群中機器的服務狀態,而且能夠幫你選出一個“總管”,讓這個總管來管理集群,這就是 Zookeeper 的另一個功能 Leader Election。
它們的實現方式都是在 Zookeeper 上創建一個 EPHEMERAL 類型的目錄節點,然后每個 Server 在它們創建目錄節點的父目錄節點上調用 getChildren(String path, boolean watch) 方法並設置 watch 為 true,由於是 EPHEMERAL 目錄節點,當創建它的 Server 死去,這個目錄節點也隨之被刪除,所以 Children 將會變化,這時 getChildren上的 Watch 將會被調用,所以其它 Server 就知道已經有某台 Server 死去了。新增 Server 也是同樣的原理。
Zookeeper 如何實現 Leader Election,也就是選出一個 Master Server。和前面的一樣每台 Server 創建一個 EPHEMERAL 目錄節點,不同的是它還是一個 SEQUENTIAL 目錄節點,所以它是個 EPHEMERAL_SEQUENTIAL 目錄節點。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節點,是因為我們可以給每台 Server 編號,我們可以選擇當前是最小編號的 Server 為 Master,假如這個最小編號的 Server 死去,由於是 EPHEMERAL 節點,死去的 Server 對應的節點也被刪除,所以當前的節點列表中又出現一個最小編號的節點,我們就選擇這個節點為當前 Master。這樣就實現了動態選擇 Master,避免了傳統意義上單 Master 容易出現單點故障的問題。
集群管理結構圖
Leader Election 關鍵代碼
package net.xulingbo.zookeeper; import org.apache.log4j.xml.DOMConfigurator; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; /** * * @author gaojiay * */ public class TestMainClient implements Watcher { protected static ZooKeeper zk = null; protected static Integer mutex; int sessionTimeout = 10000; protected String root; public TestMainClient(String connectString) { if(zk == null){ try { String configFile = this.getClass().getResource("/").getPath()+"net/xulingbo/zookeeper/log4j/log4j.xml"; DOMConfigurator.configure(configFile); System.out.println("創建一個新的連接:"); zk = new ZooKeeper(connectString, sessionTimeout, this); mutex = new Integer(-1); } catch (IOException e) { zk = null; } } } synchronized public void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } }
public class LeaderElection extends TestMainClient { public static final Logger logger = Logger.getLogger(LeaderElection.class); public LeaderElection(String connectString, String root) { super(connectString); this.root = root; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } } void findLeader() throws InterruptedException, UnknownHostException, KeeperException { byte[] leader = null; try { //獲取leader目錄數據 leader = zk.getData(root + "/leader", true, null); } catch (KeeperException e) { if (e instanceof KeeperException.NoNodeException) { logger.error(e); } else { throw e; } } if (leader != null) { //如果該集群中存在/leader目錄,說明master還沒有宕機 following(); //則該連接的主機為從節點 } else { //集群中沒有/leader目錄,則新建該臨時節點(最先執行該方法的zk客戶端將創建該leader目錄) String newLeader = null; byte[] localhost = InetAddress.getLocalHost().getAddress(); try { newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException e) { if (e instanceof KeeperException.NodeExistsException) { logger.error(e); } else { throw e; } } if (newLeader != null) { leading(); //成為leader } else { mutex.wait(); } } } @Override public void process(WatchedEvent event) { if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) { System.out.println("得到通知"); super.process(event); following(); } } void leading() { System.out.println("成為領導者"); } void following() { System.out.println("成為組成員"); } public static void main(String[] args) { TestMainServer.start(); String connectString = "localhost:" + TestMainServer.CLIENT_PORT; LeaderElection le = new LeaderElection(connectString, "/GroupMembers"); try { le.findLeader(); } catch (Exception e) { logger.error(e); } } }
4 Zookeeper 實現 Locks
/* 加鎖: ZooKeeper 將按照如下方式實現加鎖的操作: 1 ) ZooKeeper 調用 create ()方法來創建一個路徑格式為“ _locknode_/lock- ”的節點,此節點類型為sequence (連續)和 ephemeral (臨時)。也就是說,創建的節點為臨時節點,並且所有的節點連續編號,即“ lock-i ”的格式。 2 )在創建的鎖節點上調用 getChildren ()方法,來獲取鎖目錄下的最小編號節點,並且不設置 watch 。 3 )步驟 2 中獲取的節點恰好是步驟 1 中客戶端創建的節點,那么此客戶端獲得此種類型的鎖,然后退出操作。 4 )客戶端在鎖目錄上調用 exists ()方法,並且設置 watch 來監視鎖目錄下比自己小一個的連續臨時節點的狀態。 5 )如果監視節點狀態發生變化,則跳轉到第 2 步,繼續進行后續的操作,直到退出鎖競爭。 解鎖: ZooKeeper 解鎖操作非常簡單,客戶端只需要將加鎖操作步驟 1 中創建的臨時節點刪除即可 */ public class Locks extends TestMainClient { public static final Logger logger = Logger.getLogger(Locks.class); String myZnode; public Locks(String connectString, String root) { super(connectString); this.root = root; if (zk != null) { try { //創建鎖節點,並不設置觀察 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } } void getLock() throws KeeperException, InterruptedException{ List<String> list = zk.getChildren(root, false); String[] nodes = list.toArray(new String[list.size()]); //對鎖目錄下 的所有子節點排序 Arrays.sort(nodes); //判斷該zkclient創建的臨時順序節點是否為集群中最小的節點 if(myZnode.equals(root+"/"+nodes[0])){ doAction(); } else{ waitForLock(nodes[0]); } } //創建zk客戶端的臨時瞬時節點,並嘗試獲取鎖 void check() throws InterruptedException, KeeperException { myZnode = zk.create(root + "/lock_" , new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); getLock(); } void waitForLock(String lower) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); if(stat != null){ //發現最小的目錄節點還未被移除,則等待 mutex.wait(); } else{ getLock(); } } @Override //發現有節點移除,該等待狀態的客戶端被notify public void process(WatchedEvent event) { if(event.getType() == Event.EventType.NodeDeleted){ System.out.println("得到通知"); super.process(event); doAction(); } } /** * 執行其他任務 */ private void doAction(){ System.out.println("同步隊列已經得到同步,可以開始執行后面的任務了"); } public static void main(String[] args) { TestMainServer.start(); String connectString = "localhost:"+TestMainServer.CLIENT_PORT; Locks lk = new Locks(connectString, "/locks"); try { lk.check(); } catch (InterruptedException e) { logger.error(e); } catch (KeeperException e) { logger.error(e); } } }
5 同步隊列
同步隊列用 Zookeeper 實現的實現思路如下:
創建一個父目錄 /synchronizing,每個成員都監控標志(Set Watch)位目錄 /synchronizing/start 是否存在,然后每個成員都加入這個隊列,加入隊列的方式就是創建 /synchronizing/member_i 的臨時目錄節點,然后每個成員獲取 / synchronizing 目錄的所有目錄節點,也就是 member_i。判斷 i 的值是否已經是成員的個數,如果小於成員個數等待 /synchronizing/start 的出現,如果已經相等就創建 /synchronizing/start。
/** * 當一個隊列的成員都聚齊時,這個隊列才可用,否則一直等待所有成員到達,這種是同步隊列。 */ public class Synchronizing extends TestMainClient { int size; String name; public static final Logger logger = Logger.getLogger(Synchronizing.class); /** * 構造函數 * * @param connectString * 服務器連接 * @param root * 根目錄 * @param size * 隊列大小 */ Synchronizing(String connectString, String root, int size) { super(connectString); this.root = root; this.size = size; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } try { name = new String(InetAddress.getLocalHost().getCanonicalHostName() .toString()); } catch (UnknownHostException e) { logger.error(e); } } /** * 加入隊列 * * @return * @throws KeeperException * @throws InterruptedException */ void addQueue() throws KeeperException, InterruptedException { zk.exists(root + "/start", true); zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { List<String> list = zk.getChildren(root, false); //如果隊列中子節點數小於size,則等待,如果不小於size,則創建start目錄,其他client則觸發事件,執行doAction if (list.size() < size) { mutex.wait(); } else { zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } } @Override public void process(WatchedEvent event) { if (event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated) { System.out.println("得到通知"); super.process(event); doAction(); } } /** * 執行其他任務 */ private void doAction() { System.out.println("同步隊列已經得到同步,可以開始執行后面的任務了"); } public static void main(String args[]) { // 啟動Server TestMainServer.start(); String connectString = "localhost:" + TestMainServer.CLIENT_PORT; int size = 1; Synchronizing b = new Synchronizing(connectString, "/synchronizing", size); try { b.addQueue(); } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } }
6 FIFO隊列(生產者-消費者)
/** * 隊列按照 FIFO 方式進行入隊和出隊操作,例如實現生產者和消費者模型。 * * 實現的思路也非常簡單,就是在特定的目錄下創建 SEQUENTIAL 類型的子目錄 * /queue_i,這樣就能保證所有成員加入隊列時都是有編號的,出隊列時通過 getChildren( ) * 方法可以返回當前所有的隊列中的元素,然后消費其中最小的一個,這樣就能保證 FIFO。 */ public class FIFOQueue extends TestMainClient { public static final Logger logger = Logger.getLogger(FIFOQueue.class); /** * Constructor * * @param connectString * @param root */ FIFOQueue(String connectString, String root) { super(connectString); this.root = root; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } } /** * 生產者 * * @param i * @return */ boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } /** * 消費者 * * @return * @throws KeeperException * @throws InterruptedException */ int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { // 對root的子節點設置監聽 List<String> list = zk.getChildren(root, true); // 如果沒有任何子節點,則wait if (list.size() == 0) { mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tempValue = new Integer(s.substring(7)); if (tempValue < min) min = tempValue; } byte[] b = zk.getData(root + "/element" + min, false, stat); // 獲取到子節點數據之后 執行delete,並觸發事件,執行所有cliet的notify zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } @Override public void process(WatchedEvent event) { super.process(event); } public static void main(String args[]) { // 啟動Server TestMainServer.start(); String connectString = "localhost:" + TestMainServer.CLIENT_PORT; FIFOQueue q = new FIFOQueue(connectString, "/app1"); int i; Integer max = new Integer(5); System.out.println("Producer"); for (i = 0; i < max; i++) try { q.produce(10 + i); } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } for (i = 0; i < max; i++) { try { int r = q.consume(); System.out.println("Item: " + r); } catch (KeeperException e) { i--; logger.error(e); } catch (InterruptedException e) { logger.error(e); } } } }
參考:http://www.cnblogs.com/leocook/p/zk_0.html