下面記錄一下Zookeeper使用相關的入門知識,包括基本命令、四種數據節點、watcher監聽、訪問控制列表ACL和相關API操作。
介紹
Zookeeper是基於Google關於Chubby Lock的論文來開源實際的,主要用於分布式架構進行協調和管理,在大數據框架中作為一個“潤滑劑”而存在,協調分布式框架的使用變得更加順暢。
主要作用如下。
- 節點主備切換、上下線感知:高可用hadoop集群中,active狀態的namenode宕機后,standby狀態的namenode需要在zkfc進程下,進行主備切換,實現故障轉移。
- 配置管理:zookeeper也是一個文件系統,可以放配置文件,如hadoop中的一些配置信息,如節點名可以是配置文件全路徑名,節點信息就是配置的具體信息。
基本命令
Zookeeper集群啟動、停止和查看狀態的基本命令。
# 啟動ZooKeeper集群;在ZooKeeper集群中的每個節點執行以下命令
${ZK_HOME}/bin/zkServer.sh start
# 停止ZooKeeper集群
${ZK_HOME}/bin/zkServer.sh stop
# 查看集群狀態#
${ZK_HOME}/bin/zkServer.sh status
客戶端連接zookeeper server。
[hadoop@node03 /root]$ zkCli.sh -server node01:2181,node02:2181,node03:2181
連接后,可以查看客戶端命令,整體命令不多,注意有[watch]選項的,都是可以設置watch監聽的命令。
# help查看命令提示
[zk: node01:2181,node02:2181,node03:2181(CLOSED) 55] help
ZooKeeper -server host:port cmd args
stat path [watch] # 有[watch]選項的,都是可以設置watch監聽的命令
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
常用命令說明。
命令 | 說明 |
---|---|
stat path [watch] | 查看path節點的狀態 |
set path data | 修改path的數據 |
ls path [watch] | 查看節點下有哪些子節點 |
ls2 path [watch] | 相當於ls+stat |
history | 查看當前session輸入過的命令 |
delete path | 刪除節點 |
rmr path | 刪除節點 |
get path [watch] | 查看節點數據 |
create [-s] [-e] path data acl | 創建節點,需指定數據 |
quit | 退出命令行 |
close | 關閉session |
connect host:port | 連接某個服務器,可以設置多個連接地址 |
簡單的使用命令,實現節點創建、修改、查看和刪除。
# 查看節點信息
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 57] ls /
[clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
# 創建一個節點/node
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 58] create /node node-data
Created /node
# 查看是否創建新節點
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 61] ls /
[clyang, yangchaolin0000000003, news, node, ycl, zookeeper, spark, yang, hbase, zk_test]
查看節點數據。
# 查看節點數據
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 59] get /node
node-data
cZxid = 0x1600000002
ctime = Sat May 16 13:22:35 CST 2020
mZxid = 0x1600000002
mtime = Sat May 16 13:22:35 CST 2020
pZxid = 0x1600000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
修改節點數據。
# 修改節點數據
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 62] set /node node-newdata
cZxid = 0x1600000002
ctime = Sat May 16 13:22:35 CST 2020
mZxid = 0x1600000003
mtime = Sat May 16 13:23:21 CST 2020
pZxid = 0x1600000002
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 0
# 修改成功
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 63] get /node
node-newdata
cZxid = 0x1600000002
ctime = Sat May 16 13:22:35 CST 2020
mZxid = 0x1600000003
mtime = Sat May 16 13:23:21 CST 2020
pZxid = 0x1600000002
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 0
刪除節點。
# 刪除節點
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 64] delete /node
# 已刪除
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 65] ls /
[clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
znode
(1)基本信息
zookeeper中的節點樹,類似linux文件系統的文件目錄樹,只是zookeeper中節點和子節點都是和數據綁定在一起的,看上去一個節點有文件(存儲數據)又有目錄(包含子節點)的屬性。節點可以存儲少量的數據,byte~kb級別,主要存儲狀態信息、配置信息、位置信息等,為了區分和其他文件系統中的區別,用znode來表示。
znode中的數據是固定結構的數據,包含數據版本、ACL和時間戳等信息,每次znode中數據發生變化,對應的版本號會更新,如czxid、mzxid和pzxid。從上面get命令獲取數據可以看出,會將對應的數據版本信息也一並獲取。
znode中數據的讀寫是具有原子性的,讀寫都是整個數據的讀和更新替換,當然每個節點都有ACL(訪問控制列表),這個列表限制了哪些用戶只能做哪些操作,主要是create、read、write、delete和admin的操作。
下面說明一下使用get命令得到的信息代表什么。
[zk: node01:2181(CONNECTED) 6] get /ycl
yangchaolin
cZxid = 0x100000000a
ctime = Thu May 14 20:49:01 CST 2020
mZxid = 0x1000000017
mtime = Thu May 14 21:11:34 CST 2020
pZxid = 0x1000000016
cversion = 2
dataVersion = 4
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
說明如下。
屬性 | 值 | 說明 |
---|---|---|
cZxid | 0x100000000a | 創建節點的事務id |
ctime | Thu May 14 20:49:01 CST 2020 | 創建節點的時間 |
mZxid | 0x1000000017 | 修改節點數據的事務id |
mtime | Thu May 14 21:11:34 CST 2020 | 修改節點數據的時間 |
pZxid | 0x1000000016 | 子節點個數變化的事務id |
cversion | 2 | 子節點個數變化次數 |
dataVersion | 4 | 數據版本,修改了4次 |
aclVersion | 0 | 節點權限的變化次數 |
ephemeralOwner | 0x0 | 0代表持久,如果帶上session id就是臨時 |
dataLength | 11 | 數據的字節數,11位 |
numChildren | 0 | 子節點的個數 |
事務通常是一個64位的數字。由epoch+counter組成,各32位。以cZxid為例,0x100000000a是十六進制,后8位"0000000a"是counter,代表是第10次創建節點。剩下的"10"那就代表epoch,換成10進制是16,這個跟version-2目錄下的currentEpoch一致,類似改朝換代的紀元,說明改朝換代到16代君主了。
[hadoop@node01 ~]$ cat currentEpoch
16
(2)持久節點、臨時節點、有序節點
Zookeeper中默認創建節點是持久節點,也有臨時節點,只要創建臨時節點的session是active的狀態,這個臨時節點就不會自動刪除。還有一個是有序節點,zookeeper會在創建有序節點時自動在節點后面追加一個整形數字。
- 持久節點
create命令默認就是創建有序節點,需要顯示的刪除。
# 創建持久節點p-node
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 68] create /p-node ''
Created /p-node
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 69] ls /
[clyang, yangchaolin0000000003, news, ycl, p-node, zookeeper, spark, yang, hbase, zk_test]
# 顯示刪除持久節點
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 70] delete /p-node
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 71] ls /
[clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
- 臨時節點
臨時節點的生命周期跟客戶端session綁定,一旦session失效,臨時節點被刪除。
客戶端1創建一個臨時節點。
# 創建一個臨時節點,使用-e參數
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 72] create -e /e-node ''
Created /e-node
客戶端2查看這個臨時節點。
# 能查看到
[zk: node01:2181(CONNECTED) 1] ls /
[clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, e-node, hbase, zk_test]
客戶端1斷開和集群的連接。
# 斷開
[zk: node01:2181,node02:2181,node03:2181(CONNECTED) 73] close
2020-05-16 14:51:04,451 [myid:] - INFO [main:ZooKeeper@684] - Session: 0x1721c11c5610000 closed
[zk: node01:2181,node02:2181,node03:2181(CLOSED) 74] 2020-05-16 14:51:04,451 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
客戶端2查看這個臨時節點已經自動刪除。
# 斷開前
[zk: node01:2181(CONNECTED) 1] ls /
[clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, e-node, hbase, zk_test]
# 斷開后
[zk: node01:2181(CONNECTED) 2] ls /
[clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
- 有序節點
有序節點,可以在創建時不必考慮是否已經存在,系統會自動在后面增添整形數字,保證znode名字的唯一,配合臨時節點,可以用作分布式鎖。
# 創建有序節點
[zk: node01:2181(CONNECTED) 1] create -s /s-node ''
Created /s-node0000000024
# 再次創建,序號自增1
[zk: node01:2181(CONNECTED) 3] create -s /s-node ''
Created /s-node0000000025
Watcher
watcher是客戶端在服務器端注冊的事件監聽器,如果被監聽的節點有更新(創建、修改、刪除節點)操作,watcher會觸發,通知客戶端更新的細節,觸發后watcher會移除。
注意,使用命令的方式創建的watcher是一個單次觸發的操作,但在3.6.0版本,即使watcher觸發后,也不會移除。如果是3.6.0之前的版本,要有多次觸發的效果,需要使用后面的API來實現。
設置watcher的命令,上面命令可以看出,使用stat、ls、ls2和get可以設置。
- 節點更新的監聽
client1監聽某一個節點。
[zk: node01:2181(CONNECTED) 1] ls /ycl watch
client2在這個節點下創建一個子節點。
[zk: node02:2181(CONNECTED) 13] create /ycl/child-node ''
Created /ycl/child-node
client1上查看結果。
[zk: node01:2181(CONNECTED) 2]
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/ycl
- 節點上下線的監聽
client1下創建一個臨時節點。
[zk: node01:2181(CONNECTED) 2] create -e /e-node ''
Created /e-node
client2在這個節點上創建一個監聽。
[zk: node02:2181(CONNECTED) 15] ls /e-node watch
client1模擬節點下線。
[zk: node01:2181(CONNECTED) 3] close
2020-05-16 22:58:58,650 [myid:] - INFO [main:ZooKeeper@684] - Session: 0x2721c9db94c0000 closed
[zk: node01:2181(CLOSED) 4] 2020-05-16 22:58:58,651 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
client2查看結果。
[zk: node02:2181(CONNECTED) 16]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDeleted path:/e-node
ACL
默認情況下,節點的讀寫權限是沒有限制的,但是如果想某些關鍵的節點數據受到保護,這就需要用到ACL了。
ACL(Access Control List)可以設置客戶端對zookeeper服務器上節點的權限范圍,有如下5種權限類型,所有權限都擁有簡稱為crwda。
類型 | 說明 |
---|---|
CREATE(c) | 創建子節點的權限 |
READ(r) | 獲取節點及子節點列表的權限 |
WRITE(w) | 更新節點數據的權限 |
DELETE(d) | 刪除子節點的權限 |
ADMIN(a) | 設置節點ACL權限 |
如何使用呢?zookeeper提供了對應的操作方法。
命令 | 說明 |
---|---|
setAcl path acl | 設置訪問控制列表 |
getAcl path | 查看path的ACL設置 |
addauth scheme auth | 添加認證用戶 |
里面有scheme,這個是權限模式,是鑒權的策略,有如下幾種可以選擇,常用digest。
權限模式方案 | 說明 |
---|---|
world | 默認方式,相當於全世界都能訪問,只有一個用戶:anynone |
auth | 使用已添加認證的用戶認證 |
digest | 使用“用戶名:密碼”這種方式認證 |
ip | 使用ip地址認證 |
下面實操一下,為測試節點添加權限並測試,以下操作都是基於文末博文。
- world權限模式
設置方式
setAcl <path> world:anyone:<acl>
客戶端實操
# 創建節點
[zk: node01:2181(CONNECTED) 19] create /world-node ''
Created /world-node
# 默認是world策略,並且擁有5種權限
[zk: node01:2181(CONNECTED) 20] getAcl /world-node
'world,'anyone
: cdrwa
# 可以通過命令設置world策略
[zk: node01:2181(CONNECTED) 22] setAcl /world-node world:anyone:cdrwa
cZxid = 0x180000000d
ctime = Sun May 17 11:00:58 CST 2020
mZxid = 0x180000000d
mtime = Sun May 17 11:00:58 CST 2020
pZxid = 0x180000000d
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
- auth權限模式
設置方式
# 添加認證用戶
addauth digest <user>:<password>
# 設置方式
setAcl <path> auth:<user>:<acl>
客戶端實操
# 創建節點
[zk: node01:2181(CONNECTED) 23] create /auth-node ''
Created /auth-node
# 默認是world策略
[zk: node01:2181(CONNECTED) 24] getAcl /auth-node
'world,'anyone
: cdrwa
# 添加認證用戶
[zk: node01:2181(CONNECTED) 25] addauth digest clyang:123
# 設置策略auth,權限為rw
[zk: node01:2181(CONNECTED) 26] setAcl /auth-node auth:clyang:rw
cZxid = 0x1800000010
ctime = Sun May 17 11:07:38 CST 2020
mZxid = 0x1800000010
mtime = Sun May 17 11:07:38 CST 2020
pZxid = 0x1800000010
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
# 查看策略
[zk: node01:2181(CONNECTED) 27] getAcl /auth-node
'digest,'clyang:9SAlEXaEiGIBs1CVSgXIYgCAyO0=
: rw
# 斷開連接
[zk: node01:2181(CONNECTED) 28] close
2020-05-17 11:09:54,886 [myid:] - INFO [main:ZooKeeper@684] - Session: 0x1721ef0e8600000 closed
[zk: node01:2181(CLOSED) 29] 2020-05-17 11:09:54,886 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
# 重新連接
[zk: node01:2181(CLOSED) 29] connect
2020-05-17 11:09:59,573 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=node01:2181 sessionTimeout=30000
# 查看節點,發現無權限
[zk: node01:2181(CONNECTED) 30] get /auth-node
Authentication is not valid : /auth-node
# 需要重新添加認證用戶
[zk: node01:2181(CONNECTED) 31] addauth digest clyang:123
# 再次查看節點,發現可以查看
[zk: node01:2181(CONNECTED) 32] get /auth-node
''
cZxid = 0x1800000010
ctime = Sun May 17 11:07:38 CST 2020
mZxid = 0x1800000010
mtime = Sun May 17 11:07:38 CST 2020
pZxid = 0x1800000010
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
# 創建子節點,失敗,因為只有rw權限,沒有c的權限
[zk: node01:2181(CONNECTED) 33] create /auth-node/childnode ''
Authentication is not valid : /auth-node/childnode
- digest權限模式
設置方式
setAcl <path> digest:<user>:<password>:<acl>
這里的密碼,需要通過SHA1和BASE64處理,需要先通過如下命令先查看明文密碼對應的密文密碼。
echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64
得到密文
[hadoop@node01 /kkb/install/zookeeper-3.4.5-cdh5.14.2/zkdatas/version-2]$ echo -n clyang:123 |openssl dgst -binary -sha1 | openssl base64
# 生成的密文
9SAlEXaEiGIBs1CVSgXIYgCAyO0=
客戶端實操
# 創建節點
[zk: node01:2181(CONNECTED) 34] create /digest-node ''
Created /digest-node
# 默認是world策略
[zk: node01:2181(CONNECTED) 35] getAcl /digest-node
'world,'anyone
: cdrwa
# 設置策略digest,權限全有,使用上面生成的密文
[zk: node01:2181(CONNECTED) 36] setAcl /digest-node digest:clyang:9SAlEXaEiGIBs1CVSgXIYgCAyO0=:cdrwa
cZxid = 0x1800000015
ctime = Sun May 17 12:02:00 CST 2020
mZxid = 0x1800000015
mtime = Sun May 17 12:02:00 CST 2020
pZxid = 0x1800000015
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
# 查看策略
[zk: node01:2181(CONNECTED) 37] getAcl /digest-node
'digest,'clyang:9SAlEXaEiGIBs1CVSgXIYgCAyO0=
: cdrwa
# 查看數據,ok,因為剛開始添加的認證用戶還有效
[zk: node01:2181(CONNECTED) 38] get /digest-node
''
cZxid = 0x1800000015
ctime = Sun May 17 12:02:00 CST 2020
mZxid = 0x1800000015
mtime = Sun May 17 12:02:00 CST 2020
pZxid = 0x1800000015
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
# 斷開連接
[zk: node01:2181(CONNECTED) 39] close
2020-05-17 12:04:38,033 [myid:] - INFO [main:ZooKeeper@684] - Session: 0x1721ef0e8600001 closed
[zk: node01:2181(CLOSED) 40] 2020-05-17 12:04:38,033 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
# 重新連接
[zk: node01:2181(CLOSED) 40] connect
2020-05-17 12:04:43,545 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=node01:2181 sessionTimeout=30000
# 查看無權限
[zk: node01:2181(CONNECTED) 41] get /digest-node
Authentication is not valid : /digest-node
# 添加認證用戶
[zk: node01:2181(CONNECTED) 42] addauth digest clyang:123
# 可以查看節點
[zk: node01:2181(CONNECTED) 43] get /digest-node
''
cZxid = 0x1800000015
ctime = Sun May 17 12:02:00 CST 2020
mZxid = 0x1800000015
mtime = Sun May 17 12:02:00 CST 2020
pZxid = 0x1800000015
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
- ip權限模式
設置方式
setAcl <path> ip:<ip>:<acl>
客戶端實操
# 創建節點
[zk: node01:2181(CONNECTED) 44] create /ip-node ''
Created /ip-node
# 默認是world策略
[zk: node01:2181(CONNECTED) 45] getAcl /ip-node
'world,'anyone
: cdrwa
# 設置策略為ip
[zk: node01:2181(CONNECTED) 46] setAcl /ip-node ip:192.168.200.100:cdrwa
cZxid = 0x1800000019
ctime = Sun May 17 12:10:49 CST 2020
mZxid = 0x1800000019
mtime = Sun May 17 12:10:49 CST 2020
pZxid = 0x1800000019
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
# 設置成功
[zk: node01:2181(CONNECTED) 47] getAcl /ip-node
'ip,'192.168.200.100
: cdrwa
# 本機ip是192.168.200.110,提示訪問不到
[zk: node01:2181(CONNECTED) 48] get /ip-node
Authentication is not valid : /ip-node
換成ip為192.168.200.100的client來訪問,沒有問題。
[zk: node01:2181(CONNECTED) 0] get /ip-node
''
cZxid = 0x1800000019
ctime = Sun May 17 12:10:49 CST 2020
mZxid = 0x1800000019
mtime = Sun May 17 12:10:49 CST 2020
pZxid = 0x1800000019
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
API
包含兩種風格的編程方式,一種是zookeeper原生的,一種是curator封裝后的。
(1)原生 API
使用原生api,實現類似命令操作的基本功能。
里面用到CountDownLatch線程遞減鎖,在線程計數歸零之前,線程會被阻塞。
package com.boe.zk;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZookeeperDemo {
private ZooKeeper zk=null;
//連接zookeeper
@Before
public void connect() throws IOException, InterruptedException {
/**
* connectString 連接的地址+端口
* sessionTimeout 4s~40s之間
* Watcher 監控者
* zk連接過程使用netty,底層基於NIO,是一個非阻塞連接
*/
//在監控到之前,test線程不能繼續往下走
final CountDownLatch cdl=new CountDownLatch(1);
zk=new ZooKeeper("192.168.200.100:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()== Event.KeeperState.SyncConnected){
System.out.println("連接zk成功....");
cdl.countDown();
}
}
});
//cdl計數減到0后,cdl的await才會解除阻塞,執行下面的方法
//如果不用CountDownLatch,會出現先打印"finish~~~",然后打印"連接zk成功...."
cdl.await();
System.out.println("finish~~~");
}
//創建節點 create
@Test
public void createNode() throws KeeperException, InterruptedException {
/**
* path 路徑
* data 數據
* acl 權限策略
* createMode 節點類型
*/
String s = zk.create("/yang", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(s);//返回值是節點的路徑,主要用於順序節點
System.out.println("創建節點success");
}
//刪除節點 delete
@Test
public void deleteNode() throws KeeperException, InterruptedException {
/**
* path 路徑
* version dataversion版本號
*/
zk.delete("/yang",-1);
//強制刪除 版本號可以給-1,-1代表忽略版本校驗
System.out.println("刪除節點success");
}
//修改數據 set
@Test
public void setNode() throws KeeperException, InterruptedException {
Stat stat = zk.setData("/yang", "hello zookeeper".getBytes(), 0);
System.out.println(stat.toString());
System.out.println("修改數據成功");
}
//獲取節點 get
@Test
public void getNode() throws KeeperException, InterruptedException {
Stat s=new Stat();
byte[] data = zk.getData("/yang", null, s);
System.out.println(new String(data));
}
//獲取子節點 ls
@Test
public void getChild() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/video", null);
for(String s: children){
//返回子節點的節點名
System.out.println("子節點為:"+children);
}
}
//創建某個節點是否存在
@Test
public void exist() throws KeeperException, InterruptedException {
//返回值是節點信息
Stat stat = zk.exists("/video", null);
System.out.println(stat);
System.out.println(stat==null? "節點不存在":"節點存在");
}
}
以創建一個節點為例,從控制台可以看到正常的創建了節點"/yang"。另外發現連接zookeeper集群成功后,才打印"finish~~~",這是使用了線程遞減鎖的原因。
也可以實現監聽。
package com.boe.zk;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class ZookeeperDemo2 {
//連接zk
private ZooKeeper zk=null;
@Before
public void connect() throws IOException, InterruptedException {
final CountDownLatch cdl=new CountDownLatch(1);
zk=new ZooKeeper("192.168.200.100:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected){
System.out.println("連接zk成功....");
cdl.countDown();
}
}
});
cdl.await();
System.out.println("finish");
}
//監控節點是否被改變
@Test
public void dataChange() throws KeeperException, InterruptedException {
//連接和監控是非阻塞的,無論是否有變化,都會向下執行
final CountDownLatch cdl=new CountDownLatch(1);
zk.getData("/yang", new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDataChanged){
System.out.println("數據節點數據在"+new Date()+"被改變");
cdl.countDown();
}
}
},
null);
//監控到了再結束
cdl.await();
System.out.println("監控結束");
}
//監控子節點個數變化
@Test
public void childrenChanged() throws KeeperException, InterruptedException {
final CountDownLatch cdl=new CountDownLatch(1);
zk.getChildren("/video", new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType()==Event.EventType.NodeChildrenChanged){
System.out.println("子節點個數發生"+new Date()+"變化");
cdl.countDown();
}
}
},null);
//監控到了再結束
cdl.await();
System.out.println("監控結束");
}
//監控節點的增刪變化
@Test
public void nodeChanged() throws KeeperException, InterruptedException {
final CountDownLatch cdl=new CountDownLatch(1);
zk.getChildren("/video", new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType()==Event.EventType.NodeCreated){
System.out.println("節點在"+new Date()+"創建");
cdl.countDown();
}else if(event.getType()==Event.EventType.NodeDeleted){
System.out.println("節點在"+new Date()+"刪除");
cdl.countDown();
}
}
},null);
//監控到了再結束
cdl.await();
System.out.println("監控結束");
}
}
以監控節點是否被改變為例,啟動后,當客戶端對"/yang"數據進行更新后,控制台打印出監聽信息。
(2)curator API
curator對zookeeper的api做了封裝,提供簡單易用的api,編程風格是鏈式編程。
也可以實現基本的命令操作功能,此外監聽部分可以重復監聽,如果是命令只能生效一次,這個是有區別的地方。
package com.boe.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
/**
* 測試使用java api來完成zookeeper常用操作
*/
public class CuratorClientTest {
//zookeeper集群信息
private static final String ZK_SERVER_ADDRESS="node01:2181,node02:2181,node03:2181";
//zookeeper節點名稱
private static final String ZK_NODE="/yang";
//連接zookeeper的客戶端對象
private static CuratorFramework client=null;
//初始化
public static void init(){
//測試連接重試策略
RetryNTimes retryPolicy =new RetryNTimes(10,3000);//失敗重試10次,每次間隔3秒
//新建連接對象,需要兩個參數,連接集群信息和連接策略
client= CuratorFrameworkFactory.newClient(ZK_SERVER_ADDRESS,retryPolicy);
//啟動客戶端,連接到zookeeper集群
client.start();
//打印
System.out.println("zookeeper client connecting successful");
}
//關閉和集群的連接,體驗鏈式編程
public static void quit(){
System.out.println("close session with zookeeper");
client.close();
}
//創建永久節點
public static void createPersistentZNode() throws Exception {
//節點數據
String znodedata="hello zookeeper,I am persistent node";
//創建
client.create().
creatingParentsIfNeeded().
withMode(CreateMode.PERSISTENT).//節點類型指定
forPath("/p_node",znodedata.getBytes());
}
//創建臨時節點
public static void createEphemeralZNode() throws Exception {
//節點數據
String znodedata="hello zookeeper,I am Ephemeral node";
//創建
client.create().
creatingParentsIfNeeded().
withMode(CreateMode.EPHEMERAL).
forPath("/e_node",znodedata.getBytes());
}
//修改節點
public static void modifyZnodeData() throws Exception {
//修改后節點數據
String newdata="hello zookeeper,I am new data";
//修改數據
printCmd("set",ZK_NODE,newdata);
client.setData().forPath(ZK_NODE,newdata.getBytes());
printCmd("get",ZK_NODE);
print(client.getData().forPath(ZK_NODE));
}
//刪除節點
public static void deleteZNode() throws Exception {
printCmd("delete",ZK_NODE);
client.delete().forPath(ZK_NODE);
//再查詢
printCmd("ls","/");
print(client.getChildren().forPath("/"));
}
//監聽節點,這個監聽可以反復監聽,如果是命令中添加一個watch,只能監聽一次
public static void watchZNode() throws Exception {
//使用TreeCache,需要傳入client和path,為啥使用TreeCache,參考如下如下官方文檔說明
/**
* A utility that attempts to keep all data from all children of a ZK path locally cached.
* This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc.
* You can register a listener that will get notified when changes occur
*/
TreeCache treeCache=new TreeCache(client,ZK_NODE);//就是監聽ZK_NODE的節點
//設置監聽器和處理過程
treeCache.getListenable().addListener(new TreeCacheListener() {//匿名內部類
@Override
public void childEvent(CuratorFramework cliet, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(data!=null){
switch (event.getType()){
case NODE_ADDED:
System.out.println("NODE_ADDED: "+data.getPath()+"數據為:"+new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println("NODE_UPDATED: "+data.getPath()+"數據為:"+new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("NODE_REMOVED: "+data.getPath()+"數據為:"+new String(data.getData()));
break;
default:
break;
}
} else{
System.out.println("data is null:"+event.getType());
}
}
});
//開始監聽
treeCache.start();
//持續監聽一段時間
Thread.sleep(60000);
//關閉監聽
treeCache.close();
}
//查詢節點
public static void queryZNode() throws Exception {
printCmd("ls","/");
//進入路徑
print(client.getChildren().forPath("/"));
//查詢
printCmd("get",ZK_NODE);
print(client.getData().forPath(ZK_NODE));
}
//輸出命令
public static void printCmd(String... cmds){
StringBuilder stringBuilder=new StringBuilder("$ ");
for (String cmd : cmds) {
stringBuilder.append(cmd).append(" ");//每個命令之間用空格隔開
}
System.out.println(stringBuilder.toString());
}
//如果是數組,輸出數組內容,不是就直接輸出對象
public static void print(Object result){
System.out.println(result instanceof byte[]? new String((byte[]) result):result);
}
//main方法
public static void main(String[] args) throws Exception {
init();
//createPersistentZNode();
//createEphemeralZNode();
//Thread.sleep(10000);//10s后臨時節點在會話結束后刪除
//queryZNode();
//modifyZnodeData();
//deleteZNode();
watchZNode();
quit();
}
}
以監聽為例,當在client端多次修改"/yang",控制台可以監聽到做了什么修改,並且可以監聽多次。
以上,理解不一定正確,學習就是一個不斷認識和糾錯的過程。
參考博文:
(1)https://zookeeper.apache.org/doc/current/zookeeperOver.html 官網介紹
(2)https://www.cnblogs.com/youngchaolin/p/12113065.html 安裝
(3)https://blog.csdn.net/liuxiao723846/article/details/79391650 ACL
(4)https://www.cnblogs.com/youngchaolin/p/12904014.html 事務id