Zookeeper是Apache下的項目之一,傾向於對大型應用的協同維護管理工作。IBM則給出了IBM對ZooKeeper的認知: Zookeeper 分布式服務框架是 Apache Hadoop 的一個子項目,它主要是用來解決分布式應用中經常遇到的一些數據管理問題,如:統一命名服務、狀態同步服務、集群管理、分布式應用配置項的管理等。總之,可以用“協調”這個核心的詞來形容它的作用。關於它能干嗎,你可以看看 “Zookeeper能干什么?”。
特征
-
我們可以把Zookeeper理解為一個精簡的文件系統(和Linux文件系統結構非常相似),其每一個節點稱為znode,znode下可以存放子節點,也可以直接對節點進行賦值存值。
-
Zookeeper被應用與一些集群上,提高集群的高可用。它可以幫助你避免單點故障,使你的系統更加可靠。
-
Zookeeper的集群我們可以通俗的理解為,一個有Leader的團隊,團隊中各個成員的數據都是一致的。團隊中的Leader采用選舉算法推舉,所以可以保證在Leader出現問題的時候,又會選舉出新的Leader。(fast paxos 選舉算法大家可以深入了解下)
-
Zookeeper使用路徑來描述節點,節點可以被看做是一個目錄,也可以被看做是一個文件,它同時具有兩者的特點。
-
Zookeeper的Watch機制也是它的最大被應用的原因。當我們有很多客戶端連接到Zookeeper時,當被設置了Watch的數據發生了改變的時候,則服務器將這個改變發送給設置了Watch的客戶端,通知它們。所以我們經常用它來做業務系統的統一配置管理。使用zk的Watch要特別注意一點就是它的“一次性觸發器”(最后的Java例子中有模擬這點)。
集群部署
1. 下載
官網:http://zookeeper.apache.org/releases.html
下載:zookeeper-3.4.8.tar.gz
2. 安裝
因為資源有限,所以我在同一個服務器上面創建3個目錄 server1、server2、server3 來模擬3台服務器集群。
cd server1
tar -zxvf zookeeper-3.4.8.tar.gz
mkdir data
mkdir dataLog
data 為數據目錄,dataLog 為日志目錄。
3. 配置
cd zookeeper-3.4.8/conf
創建文件 zoo.cfg,內容如下:
tickTime=2000 initLimit=5 syncLimit=2 dataDir=/opt/zookeeper/server1/data dataLogDir=/opt/zookeeper/server1/dataLog clientPort=2181 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
tickTime:zookeeper中使用的基本時間單位, 毫秒值。
initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個 tickTime 時間間隔數。這里設置為5表名最長容忍時間為 5 * 2000 = 10 秒。
syncLimit:這個配置標識 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 2 * 2000 = 4 秒。
dataDir 和 dataLogDir 看配置就知道干嗎的了,不用解釋。
clientPort:監聽client連接的端口號,這里說的client就是連接到Zookeeper的代碼程序。
server.{myid}={ip}:{leader服務器交換信息的端口}:{當leader服務器掛了后, 選舉leader的端口}
maxClientCnxns:對於一個客戶端的連接數限制,默認是60,這在大部分時候是足夠了。但是在我們實際使用中發現,在測試環境經常超過這個數,經過調查發現有的團隊將幾十個應用全部部署到一台機器上,以方便測試,於是這個數字就超過了。
修改zoo.cfg非常簡單,然后還需要創建myid文件:
cd server1
echo 1 > myid
然后拷貝server1為server2和server3,並修改其中的zoo.cfg配置,當然也要修改myid的內容為2和3。
下面給出3個server的zoo.cfg 內容:
# server1 tickTime=2000 initLimit=5 syncLimit=2 dataDir=/opt/zookeeper/server1/data dataLogDir=/opt/zookeeper/server1/dataLog clientPort=2181 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
# server2 tickTime=2000 initLimit=5 syncLimit=2 dataDir=/opt/zookeeper/server2/data dataLogDir=/opt/zookeeper/server2/dataLog clientPort=2182 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
# server3 tickTime=2000 initLimit=5 syncLimit=2 dataDir=/opt/zookeeper/server3/data dataLogDir=/opt/zookeeper/server3/dataLog clientPort=2183 server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2889:3889 server.3=127.0.0.1:2890:3890
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
這里做下說明:因為我們是在同一個機器上模擬的集群,所以要注意server端口號和clientPort不要重復了,不然會出現端口沖突。所以,如果我們是3個不同的機器上做的3個server,那么我們的zoo.cfg配置都是一樣的(注意server.{myid}=后面的IP地址使用具體的IP地址,如192.168.0.88)。還有就是,每一個server的myid內容都不能一樣,這也可以理解為不同server的標識。
4. 啟動
進入 zookeeper-3.4.8/bin 目錄,使用 ./zkServer.sh start 啟動zk服務。(你也可以使用 ./zkServer.sh start myzoo.cfg 指定配置文件啟動,這在自動化運維的時候很有用)
使用 tail -f zookeeper.out 查看日志。
要說的是:在啟動第一個的時候,日志中會出現一堆錯誤,仔細一看就能明白,是因為另外2個server還沒有啟動它連接不上的錯誤。然后當我們啟動第二個server的時候,日志中的錯誤將會減少。最后我們把所有server都啟動起來后,日志中便沒有錯誤了。
5. 測試
隨便進入一個zk目錄,連接一個server測試。
cd zookeeper-3.4.8/bin
zkCli.sh -server 127.0.0.1:2181
如果你要連接別的服務器,請指定具體的IP地址。
幾個基本命令說明:
ls 查看指定節點中包含的子節點(如:ls / 或 ls /app1/server1)
create 創建節點並賦值
get 讀取節點內容
set 改變節點內容
delete 刪除節點
注意zk中所有節點都基於路徑確定,如你要刪除 /app1/server1/nodeA 的命令為:
delete /app1/server1/nodeA
下面是基本操作截圖:
Java程序Demo
創建一個Maven工程
打開pom.xml文件添加zookeeper依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
- 1
- 2
- 3
- 4
- 5
- 1
- 2
- 3
- 4
- 5
創建Demo.java,代碼如下:
package com.shanhy.demo.zookeeper; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * Zookeeper測試 * * @create 2016年3月10日 */ public class Test { // 會話超時時間,設置為與系統默認時間一致 private static final int SESSION_TIMEOUT = 30 * 1000; // 創建 ZooKeeper 實例 private ZooKeeper zk; // 創建 Watcher 實例 private Watcher wh = new Watcher() { /** * Watched事件 */ public void process(WatchedEvent event) { System.out.println("WatchedEvent >>> " + event.toString()); } }; // 初始化 ZooKeeper 實例 private void createZKInstance() throws IOException { // 連接到ZK服務,多個可以用逗號分割寫 zk = new ZooKeeper("192.168.19.130:2181,192.168.19.130:2182,192.168.19.130:2183", Test.SESSION_TIMEOUT, this.wh); } private void ZKOperations() throws IOException, InterruptedException, KeeperException { System.out.println("\n1. 創建 ZooKeeper 節點 (znode : zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent"); zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("\n2. 查看是否創建成功: "); System.out.println(new String(zk.getData("/zoo2", this.wh, null)));// 添加Watch // 前面一行我們添加了對/zoo2節點的監視,所以這里對/zoo2進行修改的時候,會觸發Watch事件。 System.out.println("\n3. 修改節點數據 "); zk.setData("/zoo2", "shanhy20160310".getBytes(), -1); // 這里再次進行修改,則不會觸發Watch事件,這就是我們驗證ZK的一個特性“一次性觸發”,也就是說設置一次監視,只會對下次操作起一次作用。 System.out.println("\n3-1. 再次修改節點數據 "); zk.setData("/zoo2", "shanhy20160310-ABCD".getBytes(), -1); System.out.println("\n4. 查看是否修改成功: "); System.out.println(new String(zk.getData("/zoo2", false, null))); System.out.println("\n5. 刪除節點 "); zk.delete("/zoo2", -1); System.out.println("\n6. 查看節點是否被刪除: "); System.out.println(" 節點狀態: [" + zk.exists("/zoo2", false) + "]"); } private void ZKClose() throws InterruptedException { zk.close(); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { Test dm = new Test(); dm.createZKInstance(); dm.ZKOperations(); dm.ZKClose(); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
我想代碼不用解釋了,該注釋的里面都注釋了。
下面有一種特殊的情況的處理思路:
有server1、server2、server3這三個服務,在client去連接zk的時候,指向server1初始化的過程中是沒有問題的,然而剛剛初始化完成,准備去連接server1的時候,server1因為網絡等原因掛掉了。
然而對client來說,它會拿server1的配置去請求連接,這時肯定會報連接被拒絕的異常以致啟動退出。
所以優雅的解決這個問題的方法思路就是“在連接的時候判斷連接狀態,如果未連接成功,程序自動使用其他連接去請求連接”,這樣來避開這種罕見的異常問題。
代碼如下:
// 初始化 ZooKeeper 實例 private void createZKInstance() throws IOException { // 連接到ZK服務,多個可以用逗號分割寫 zk = new ZooKeeper("192.168.19.130:2181,192.168.19.130:2182,192.168.19.130:2183", Test.SESSION_TIMEOUT, this.wh); if(!zk.getState().equals(States.CONNECTED)){ while(true){ if(zk.getState().equals(States.CONNECTED)){ break; } try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
上面的代碼是基於zk提供的庫的API來你使用的,為了更易於使用,有人寫了開源的zkclient,我們可以直接使用它來操作zk。
zkclient 開源地址:https://github.com/sgroschupf/zkclient
maven 依賴配置:
<!--zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
zkClient 針對 zk 的一次性watcher,做了重新封裝,然后定義了 stateChanged、znodeChanged、dataChanged 三種監聽器。
- 監聽children變化
- 監聽節點數據變化
- 監聽連接狀態變化
代碼如下:
package com.shanhy.demo.zookeeper; import java.util.List; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.DataUpdater; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.Watcher.Event.KeeperState; /** * ZkClient的使用測試 * * @author 單紅宇(365384722) * @myblog http://blog.csdn.net/catoop/ * @create 2016年3月11日 */ public class ZkClientTest { public static void main(String[] args) { ZkClient zkClient = new ZkClient("192.168.19.130:2181,192.168.19.130:2182,192.168.19.130:2183"); String node = "/myapp"; // 訂閱監聽事件 childChangesListener(zkClient, node); dataChangesListener(zkClient, node); stateChangesListener(zkClient); if (!zkClient.exists(node)) { zkClient.createPersistent(node, "hello zookeeper"); } System.out.println(zkClient.readData(node)); zkClient.updateDataSerialized(node, new DataUpdater<String>() { public String update(String currentData) { return currentData + "-123"; } }); System.out.println(zkClient.readData(node)); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 訂閱children變化 * * @param zkClient * @param path * @author SHANHY * @create 2016年3月11日 */ public static void childChangesListener(ZkClient zkClient, final String path) { zkClient.subscribeChildChanges(path, new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println("clildren of path " + parentPath + ":" + currentChilds); } }); } /** * 訂閱節點數據變化 * * @param zkClient * @param path * @author SHANHY * @create 2016年3月11日 */ public static void dataChangesListener(ZkClient zkClient, final String path){ zkClient.subscribeDataChanges(path, new IZkDataListener(){ public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("Data of " + dataPath + " has changed."); } public void handleDataDeleted(String dataPath) throws Exception { System.out.println("Data of " + dataPath + " has changed."); } }); } /** * 訂閱狀態變化 * * @param zkClient * @author SHANHY * @create 2016年3月11日 */ public static void stateChangesListener(ZkClient zkClient){ zkClient.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { System.out.println("handleStateChanged"); } public void handleSessionEstablishmentError(Throwable error) throws Exception { System.out.println("handleSessionEstablishmentError"); } public void handleNewSession() throws Exception { System.out.println("handleNewSession"); } }); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
ZkClient 做了便捷的包裝,對Watch做了增強處理。
subscribeChildChanges實際上是通過exists和getChildren關注了兩個事件。這樣當create(“/path”)時,對應path上通過getChildren注冊的listener也會被調用。另外subscribeDataChanges實際上只是通過exists注冊了事件。因為從上表可以看到,對於一個更新,通過exists和getData注冊的watcher要么都會觸發,要么都不會觸發。
關於session超時的問題,ZkClient 貌似還是有對 Session Expired 處理的,在ZkClient.processStateChanged方法中。雖然能重新連接,但是連接上是一個新的 session,原有創建的ephemeral znode和watch會被刪除,程序上你可能需要處理這個問題。
最后說幾點關於ZkClient的注意事項:
1. 創建節點的時候一定要先判斷節點是否存在,如果直接使用zkclient創建一個已經存在的節點,則會拋出異常。
2. 使用zkclient創建節點的時候,path描述的路徑,預新增的最終節點之前的所有父節點都必須要存在,否則會拋出異常。所以根據自己需要做好處理。