通過Python客戶端對Zookeeper的基本操作
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys from kazoo.client import KazooClient def main(): try: nodePath = "/zktest" host = "172.16.48.171" port = "2181" timeout = 100 zkc = KazooClient(hosts=host + ':' + port, timeout=timeout) zkc.start() # 判斷節點是否存在 if zkc.exists(nodePath+"/test111"): print nodePath + "/test111", "存在" else: # 建立節點,成功后返回新節點路徑 childrenPath = zkc.create(nodePath+"/test111", "test111") print "創建節點:", childrenPath, "成功。" # 創建臨時節點,連接斷開則節點自動刪除 zkc.create(nodePath+"/test999", "test999", ephemeral=True) # 獲取節點數據和節點數據,返回2個值,一個是節點數據,一個是節點stat,這是個ZnodeStat對象,它其實是節點屬性,一共有12個屬性 dataAndStat = zkc.get(nodePath) data = dataAndStat[0] print "數據為:", data stat = dataAndStat[1] print "數據版本號為:", stat.version print "數據長度為:", stat.data_length print "子節點數量:", stat.numChildren # 更新節點數據,數據最大為1MB超過則報錯,成功后返回 ZnodeStat 對象 stat = zkc.set(nodePath, value="test222") print "數據版本號為:", stat.version # 刪除節點,后面的參數用於控制是否遞歸刪除,默認是False,但是這樣就會有一個問題,如果該節點下有子節點則本次刪除失敗,你需要先刪除 # 它下面的所有子節點才行 if zkc.exists(nodePath+"/test111"): result = zkc.delete(nodePath+"/test111", recursive=False) if result: print "刪除節點成功。" print nodePath + " 的子節點為:", zkc.get_children(nodePath) zkc.close() zkc.stop() except Exception as err: print err.message if __name__ == "__main__": try: main() finally: sys.exit()
Watcher事件操作
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import time
from kazoo.client import KazooClient
from kazoo.client import ChildrenWatch
from kazoo.client import DataWatch
"""
Watcher可以通過兩種方式設置,一種是在調用ZK客戶端方法的時候傳遞進去,比如 zk.get_children("/node", watch=FUN),但是這種方法是一次性的
也就是觸發一次就沒了,如果你還想繼續監聽一個事件就需要再次注冊。
另外一種方法是通過高級API實現,監控數據或者節點變化,它只需要我們注冊一次。一次性事件關注是zookeeper默認的即便在JAVA客戶端里也是,這種高級別
API在JAVA里是zkclient,而在Python里面就是kazoo。高級API其實是對低級API的封裝,對用戶來講更加好用。
"""
__metaclass__ = type
class zkWatcherTest:
def __init__(self, host, port, timeout=10):
self._nodename = ''
self._host = host
self._port = port
self._timeout = timeout
self._zk = KazooClient(hosts=self._host + ':' + self._port, timeout=self._timeout)
self._zk.start()
self._lastNodeList = []
def start(self, zkPath):
self._lastNodeList = self._zk.get_children(zkPath)
try:
ChildrenWatch(client=self._zk, path=zkPath, func=self._NodeChange)
DataWatch(client=self._zk, path=zkPath, func=self._DataChange)
# 這里的死循環就是為了不讓程序退出,你可以把時間設置長一點觀察,其實即便沒有到60秒的睡眠時間,如果
# 子節點或者節點數量發生變化也會收到通知。這里的wathch底層就是在節點上設置監聽器,然后捕捉事件,如果有
# 事件觸發就調用你傳遞的方法來處理。
while True:
time.sleep(60)
print "OK"
except Exception as err:
print err.message
def _NodeChange(self, children):
"""
處理子節點變化
:param children: 這個參數並不需要你傳遞進來,因為把這個方法傳遞進ChiledrenWatcher,會返回一個當前子節點列表
:return:
"""
# print children
# 如果新節點列表長度大於上次獲取的節點列表長度,說明有增加
if len(children) > len(self._lastNodeList):
for node in children:
if node not in self._lastNodeList:
print "新增加的節點為:", str(node)
self._lastNodeList = children
else:
for node in self._lastNodeList:
if node not in children:
print "刪除的節點為:", str(node)
self._lastNodeList = children
def _DataChange(self, data, stat):
"""
處理節點的數據變化
:param data:
:param stat:
:return:
"""
print "數據發生變化"
print "數據為:", data
print "數據長度:", stat.dataLength
print "數據版本號version:", stat.version
print "cversion:", stat.cversion
print "子節點數量:", stat.numChildren
def main():
try:
zkwt =zkWatcherTest(host="172.16.48.171", port="2181")
zkwt.start("/zktest")
except Exception as err:
print err.message
if __name__ == "__main__":
try:
main()
finally:
sys.exit()
關於Watcher,網上很多帖子都是通過裝飾器的方式實現的,其實我上面的方式和裝飾器是一樣的,只是形式不同罷了。功能都能實現,只是用裝飾器有時候會不方便。
