Zookeeper學習筆記(上)


Zookeeper學習筆記

本篇主要是一些基本的介紹和API的使用介紹, 有些只是記錄了知識點,而沒有完全在筆記中詳細解釋, 需要自行查找資料補充相關概念
主要參考了課程中的內容: Zookeeper分布式系統開發實戰


Zookeeper 概述

簡介:

  • 定義 : 分布式系統, 保證系統高可用, 並且高性能
  • 高性能, 分布式的, 分布應用協調服務
  • 使用文件系統目錄樹作為數據模型

ZK典型用用場景:

  • 注冊訂閱
  • 負載均衡
  • 命名服務
  • 分布式協調/通知
  • 集群管理

ZK基本概念:

  • 角色:在集群中的角色
    • leader
    • follower
    • observer:無投票權決定leader,只能獲取數據
  • 會話 session
  • 數據節點: 樹形數據結構
  • 數據節點分類
    • 持久節點
    • 臨時節點
    • 順序節點
  • 數據節點版本:
    • version : znode 數據版本
    • cversion : znode 的子節點版本
    • aversion
  • Watcher
  • ACL: access control lists

數據模型:

樹形結構, ACL(訪問控制)

Zookeeper 單機模式

安裝: 解壓縮

啟動

  • 默認端口 2181
  • windows命令: bin/zkServer.cmd
  • unix命令: bin/zkServer.sh

Zookeeper 的命令

ls path [watch] //查看節點, 使用了 watch 之后有數據變化會收到通知

create

-s 為順序節點
-e 為臨時節點

get 獲取數據內容和一些屬性

set 更新節點數據內容

set path data [version] version //為指定要被更新的數據版本,如已被更新, 則操作失敗

detele path [version] 為指定要被刪除的數據版本,如已被更新, 則操作失敗

Zookeeper 中的 Watch:

Zookeeper 中的 ACL:

每個節點的 ACL 不能繼承, 需要單獨設置

schema:各種訪問控制策略

  • world
  • auth
  • digest
  • ip
  • ...

auth 的使用:

預先通過 addauth digest user:password 進行添加用戶
setAcl /node2 auth:node2u:111111:crdwa
使用時, 需要通過 addauth digest user:password 進行設置之后, 才能訪問相應的節點

ZK 集群介紹:

分布式系統的理解

  1. 基本概念: 多台計算機;彼此進行交互,通過網絡進行通訊;共同目標

  2. 傳統IT系統滿足業務需求:

    • 性能
    • 可用性
    • 數據安全
      均是通過機器增加, 代碼優化進行處理
  3. 分布式系統滿足業務需求:

    通過增加機器(橫向擴展)

    缺點:

    • 數據一致性的難度
    • 系統復雜性(軟件結構,運維,開發調試)
    • 不可靠因素增加
    • 安全性

    保證數據一致性:

     數據復制
     集中存儲(NAS , 分布式緩存等)
    

    Sharding:

     業務拆分
     數據拆分
    

ZK集群的設置:

所有集群的 zoo.cfg 中的集群配置一致

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:12888:13888
server.3=127.0.0.1:22888:23888

在數據文件目錄中有一個文件 myid , 內容為 server.X 對應的數字, 如果是 server.1 對應機器上, myid 內容即為1

Zookeeper API:

Zookeeper原生API

  1. Zookeeper 的構造函數進行服務端的連接(異步過程)

  2. create() 創建節點

    同步調用時: 如果已經存在則直接拋出異常

    異步回調時: 如果已經存在, 則返回 name 屬性為 null , 正常創建則會返回創建的節點名稱 , 回調需要實現 StringCallback 接口

    創建節點時需要設置 ACL 策略 : 可以使用預先定義的或者自定義的

  3. delete() 刪除節點

  4. getChildren 獲取子節點

  5. getData : 獲取節點數據

  6. setData : 修改數據

  7. exists : 節點是否存在

以上API需要注意的是, 可以有個同步調用或者異步調用的方式, 另外可以設置 watcher

Curator:

特色: Fluent 風格API :方法級聯和方法鏈的實現

  1. 連接

    新增連接超時(原有的超時為超過多少時間沒有心跳消息而超時)

    實現重試策略

    實現指數倍增sleep時間

    ExponentialBackoffRetry

     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
     client = CuratorFrameworkFactory.builder()
     		.connectString("192.168.1.195:2181")
     		.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
     		.namespace("base").build();
     client.start();
    
  2. 創建節點:

     client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", "liubx11:111111".getBytes());
     client.create().creatingParentsIfNeeded()
     		.withMode(CreateMode.PERSISTENT).withACL(Ids.CREATOR_ALL_ACL)
     		.forPath(path, data);
    
  3. 刪除節點

      client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path);
    
  4. 獲取數據 : getData

     Stat stat = new Stat();
     byte[] data = client.getData().forPath(path);
     System.out.println(stat.toString());
    
  5. 更新數據 : setData

     client.setData().withVersion(version).forPath(path, data);
    
  6. 讀取子節點 : getChildren

     List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator");
    
  7. Watcher 使用:

    • NodeCache:

    監聽數據節點內容更新

    通過 NodeCacheListener 進行

    變動數據通過 NodeCache 獲取

     public void addNodeDataWatcher(String path) throws Exception {
     	final NodeCache nodeC = new NodeCache(client, path);
     	nodeC.start(true);
     	nodeC.getListenable().addListener(new NodeCacheListener() {
     		public void nodeChanged() throws Exception {
     			String data = new String(nodeC.getCurrentData().getData());
     			System.out.println("path=" + nodeC.getCurrentData().getPath()
     					+ ":data=" + data);
     		}
     	});
     }
    
    • PathChildernCache

    監聽指定節點的子節點變化情況:新增,子節點數據更新, 刪除

    StartMode 的區別: NORMAL , POST_ INITIALIZED_EVENT

     final PathChildrenCache cache = new PathChildrenCache(this.client,
     		path, true);
     cache.start(StartMode.POST_INITIALIZED_EVENT);
     System.out.println(cache.getCurrentData().size());
     cache.getListenable().addListener(new PathChildrenCacheListener() {
     	public void childEvent(CuratorFramework client,
     			PathChildrenCacheEvent event) throws Exception {
     		if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
     			System.out.println("客戶端子節點cache初始化數據完成");
     			System.out.println("size="+cache.getCurrentData().size());
     		}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
     			System.out.println("添加子節點:"+event.getData().getPath());
     			System.out.println("修改子節點數據:"+new String(event.getData().getData()));
     		}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
     			System.out.println("刪除子節點:"+event.getData().getPath());
     		}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
     			System.out.println("修改子節點數據:"+event.getData().getPath());
     			System.out.println("修改子節點數據:"+new String(event.getData().getData()));
     		}
     	}
     });
    
  8. 異步執行:

    通過調用inBackground() 方法實現, 需要傳入回調函數, 或者執行線程池等..

    回調函數需要實現 BackgroundCallback 接口

    會有一個 CuratorEvent 的事件, 可以看到事件狀態和返回碼

     	//刪除的異步執行
     	client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version)
     		.inBackground(new DeleteCallBack()).forPath(path);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM