Zookeeper學習筆記
本篇主要是一些基本的介紹和API的使用介紹, 有些只是記錄了知識點,而沒有完全在筆記中詳細解釋, 需要自行查找資料補充相關概念
主要參考了課程中的內容: Zookeeper分布式系統開發實戰
Zookeeper 概述
簡介:
- 定義 : 分布式系統, 保證系統高可用, 並且高性能
- 高性能, 分布式的, 分布應用協調服務
- 使用文件系統目錄樹作為數據模型
ZK典型用用場景:
- 注冊訂閱
- 負載均衡
- 命名服務
- 分布式協調/通知
- 集群管理
ZK基本概念:
- 角色:在集群中的角色
- leader
- follower
- observer:無投票權決定leader,只能獲取數據
- 會話 session
- 數據節點: 樹形數據結構
- 數據節點分類
- 持久節點
- 臨時節點
- 順序節點
- 數據節點版本:
- version : znode 數據版本
- cversion : znode 的子節點版本
- aversion
- Watcher
- ACL: access control lists
數據模型:
樹形結構, ACL(訪問控制)
Zookeeper 單機模式
安裝: 解壓縮
啟動
- 默認端口 2181
- windows命令: bin/zkServer.cmd
- unix命令: bin/zkServer.sh
Zookeeper 的命令
ls path [watch] //查看節點, 使用了 watch 之后有數據變化會收到通知
create
-s 為順序節點
-e 為臨時節點
get 獲取數據內容和一些屬性
set 更新節點數據內容
set path data [version] version //為指定要被更新的數據版本,如已被更新, 則操作失敗
detele path [version] 為指定要被刪除的數據版本,如已被更新, 則操作失敗
Zookeeper 中的 Watch:
Zookeeper 中的 ACL:
每個節點的 ACL 不能繼承, 需要單獨設置
schema:各種訪問控制策略
- world
- auth
- digest
- ip
- ...
auth 的使用:
預先通過 addauth digest user:password 進行添加用戶
setAcl /node2 auth:node2u:111111:crdwa
使用時, 需要通過 addauth digest user:password 進行設置之后, 才能訪問相應的節點
ZK 集群介紹:
分布式系統的理解
-
基本概念: 多台計算機;彼此進行交互,通過網絡進行通訊;共同目標
-
傳統IT系統滿足業務需求:
- 性能
- 可用性
- 數據安全
均是通過機器增加, 代碼優化進行處理
-
分布式系統滿足業務需求:
通過增加機器(橫向擴展)
缺點:
- 數據一致性的難度
- 系統復雜性(軟件結構,運維,開發調試)
- 不可靠因素增加
- 安全性
保證數據一致性:
數據復制 集中存儲(NAS , 分布式緩存等)Sharding:
業務拆分 數據拆分
ZK集群的設置:
所有集群的 zoo.cfg 中的集群配置一致
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:12888:13888
server.3=127.0.0.1:22888:23888
在數據文件目錄中有一個文件 myid , 內容為 server.X 對應的數字, 如果是 server.1 對應機器上, myid 內容即為1
Zookeeper API:
Zookeeper原生API
-
Zookeeper 的構造函數進行服務端的連接(異步過程)
-
create() 創建節點
同步調用時: 如果已經存在則直接拋出異常
異步回調時: 如果已經存在, 則返回 name 屬性為 null , 正常創建則會返回創建的節點名稱 , 回調需要實現 StringCallback 接口
創建節點時需要設置 ACL 策略 : 可以使用預先定義的或者自定義的
-
delete() 刪除節點
-
getChildren 獲取子節點
-
getData : 獲取節點數據
-
setData : 修改數據
-
exists : 節點是否存在
以上API需要注意的是, 可以有個同步調用或者異步調用的方式, 另外可以設置 watcher
Curator:
特色: Fluent 風格API :方法級聯和方法鏈的實現
-
連接
新增連接超時(原有的超時為超過多少時間沒有心跳消息而超時)
實現重試策略
實現指數倍增sleep時間
ExponentialBackoffRetry
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString("192.168.1.195:2181") .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("base").build(); client.start(); -
創建節點:
client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", "liubx11:111111".getBytes()); client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).withACL(Ids.CREATOR_ALL_ACL) .forPath(path, data); -
刪除節點
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path); -
獲取數據 : getData
Stat stat = new Stat(); byte[] data = client.getData().forPath(path); System.out.println(stat.toString()); -
更新數據 : setData
client.setData().withVersion(version).forPath(path, data); -
讀取子節點 : getChildren
List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator"); -
Watcher 使用:
- NodeCache:
監聽數據節點內容更新
通過 NodeCacheListener 進行
變動數據通過 NodeCache 獲取
public void addNodeDataWatcher(String path) throws Exception { final NodeCache nodeC = new NodeCache(client, path); nodeC.start(true); nodeC.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { String data = new String(nodeC.getCurrentData().getData()); System.out.println("path=" + nodeC.getCurrentData().getPath() + ":data=" + data); } }); }- PathChildernCache
監聽指定節點的子節點變化情況:新增,子節點數據更新, 刪除
StartMode 的區別: NORMAL , POST_ INITIALIZED_EVENT
final PathChildrenCache cache = new PathChildrenCache(this.client, path, true); cache.start(StartMode.POST_INITIALIZED_EVENT); System.out.println(cache.getCurrentData().size()); cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){ System.out.println("客戶端子節點cache初始化數據完成"); System.out.println("size="+cache.getCurrentData().size()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ System.out.println("添加子節點:"+event.getData().getPath()); System.out.println("修改子節點數據:"+new String(event.getData().getData())); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ System.out.println("刪除子節點:"+event.getData().getPath()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("修改子節點數據:"+event.getData().getPath()); System.out.println("修改子節點數據:"+new String(event.getData().getData())); } } }); -
異步執行:
通過調用inBackground() 方法實現, 需要傳入回調函數, 或者執行線程池等..
回調函數需要實現 BackgroundCallback 接口
會有一個 CuratorEvent 的事件, 可以看到事件狀態和返回碼
//刪除的異步執行 client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version) .inBackground(new DeleteCallBack()).forPath(path);
