ZooKeeper API的基本使用


針對ZooKeeper的會話創建,節點創建、刪除,數據讀取、更新,權限控制等API進行簡單的驗證。

1、新建maven工程,添加依賴

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
            <scope>test</scope>
        </dependency>

2、新建單元測試進行驗證

/**
 * 軟件版權:流沙~~
 * 修改日期   修改人員     修改說明
 * =========  ===========  =====================
 * 2020/1/14    liusha   新增
 * =========  ===========  =====================
 */
package com.sand.zookeeper;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 功能說明:ZooKeeper Api 測試類
 * 開發人員:@author liusha
 * 開發日期:2020/1/14 9:58
 * 功能描述:會話創建,節點創建、刪除,數據讀取、更新,權限控制等
 */
public class ZooKeeperApi implements Watcher {
  private static Stat stat = new Stat();
  private static ZooKeeper zooKeeper = null;
  private static final String host = "127.0.0.1:2181";
  private static CountDownLatch countDownLatch = new CountDownLatch(1);
  private static final String hosts = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

  /**
   * ZooKeeper CreateMode節點類型說明:
   * 1.PERSISTENT:持久型
   * 2.PERSISTENT_SEQUENTIAL:持久順序型
   * 3.EPHEMERAL:臨時型
   * 4.EPHEMERAL_SEQUENTIAL:臨時順序型
   * <p>
   * 1、2種類型客戶端斷開后不會消失
   * 3、4種類型客戶端斷開后超時時間內沒有新的連接節點將會消失
   */

  /**
   * ZooKeeper ZooDefs.Ids權限類型說明:
   * OPEN_ACL_UNSAFE:完全開放的ACL,任何連接的客戶端都可以操作該屬性znode
   * CREATOR_ALL_ACL:只有創建者才有ACL權限
   * READ_ACL_UNSAFE:只能讀取ACL
   */

  /**
   * ZooKeeper EventType事件類型說明:
   * NodeCreated:節點創建
   * NodeDataChanged:節點的數據變更
   * NodeChildrenChanged:子節點的數據變更
   * NodeDeleted:子節點刪除
   */

  /**
   * ZooKeeper KeeperState狀態類型說明:
   * Disconnected:連接失敗
   * SyncConnected:連接成功
   * AuthFailed:認證失敗
   * Expired:會話過期
   * None:初始狀態
   */

  /**
   * 接收事件通知
   *
   * @param event 事件通知
   */
  @Override
  public void process(WatchedEvent event) {
    System.out.println("Receive WatchedEvent:" + event);
    try {
      if (Event.KeeperState.SyncConnected == event.getState()) {
        System.out.println("通知:會話連接成功");
        if (Event.EventType.None == event.getType() && null == event.getPath()) {
          System.out.println("進入會話初始狀態");
          // 釋放所有等待的線程
          countDownLatch.countDown();
        } else if (event.getType() == Event.EventType.NodeCreated) {
          System.out.println("節點創建通知:" + event.getPath());
          zooKeeper.exists(event.getPath(), true);
        } else if (event.getType() == Event.EventType.NodeDataChanged) {
          System.out.println("節點的數據變更通知:" + new String(zooKeeper.getData(event.getPath(), true, stat)));
          System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion());
          zooKeeper.exists(event.getPath(), true);
        } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
          System.out.println("子節點的數據變更通知:" + zooKeeper.getChildren(event.getPath(), true));
          zooKeeper.exists(event.getPath(), true);
        } else if (event.getType() == Event.EventType.NodeDeleted) {
          System.out.println("節點刪除通知:" + event.getPath());
          zooKeeper.exists(event.getPath(), true);
        } else {
          System.out.println("未知事件通知類型:" + event.getType());
          zooKeeper.exists(event.getPath(), true);
        }
      } else if (Event.KeeperState.Disconnected == event.getState()) {
        System.out.println("通知:會話連接失敗");
      } else if (Event.KeeperState.AuthFailed == event.getState()) {
        System.out.println("通知:會話認證失敗");
      } else if (Event.KeeperState.Expired == event.getState()) {
        System.out.println("通知:會話過期");
      } else {
        System.out.println("未知的通知狀態:" + event.getState());
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  /**
   * 創建會話(最基礎的實例)
   *
   * @throws Exception Exception
   */
  @Test
  public void constructor_usage_simple() throws Exception {
    zooKeeper = new ZooKeeper(hosts, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper.state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    System.out.println("ZooKeeper session會話創建完成。");
  }

  /**
   * 創建會話(可復用sessionId的實例)
   *
   * @throws Exception Exception
   */
  @Test
  public void constructor_usage_SID_PWD() throws Exception {
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper.state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    long sessionId = zooKeeper.getSessionId();
    byte[] sessionPasswd = zooKeeper.getSessionPasswd();
    System.out.println(String.format("首次獲取sessionId:%s,sessionPasswd:%s", sessionId, sessionPasswd));
    // 使用不正確的sessionId
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi(), 1L, "123".getBytes());
    System.out.println("ZooKeeper.state err session:" + zooKeeper.getState());
    // 使用正確的sessionId
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi(), sessionId, sessionPasswd);
    System.out.println("ZooKeeper.state session:" + zooKeeper.getState());
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 創建節點(同步)
   *
   * @throws Exception Exception
   */
  @Test
  public void create_API_sync() throws Exception {
    String path = "/zk-create-znode-test-";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper.state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();

    String path1 = zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    System.out.println("節點創建成功:" + path1);
    String path2 = zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("節點創建成功:" + path2);
  }

  /**
   * 創建節點(異步)
   * 同步接口創建節點時需要考慮接口拋出異常的情況,
   * 異步接口的異常體現在回調函數的ResultCode響應碼中,比同步接口更健壯。
   *
   * @throws Exception Exception
   */
  @Test
  public void create_API_async() throws Exception {
    String path = "/zk-create-znode-test-";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper.state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();

    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
        new CreateCallBack(), "ZooKeeper async create znode.");
    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
        new CreateCallBack(), "ZooKeeper async create znode.");
    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
        new CreateCallBack(), "ZooKeeper async create znode.");
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 創建節點異步回調
   */
  class CreateCallBack implements AsyncCallback.StringCallback {
    /**
     * @param rc   服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期
     * @param path 調用接口時傳入的節點路徑(原樣輸出)
     * @param ctx  調用接口時傳入的ctx值(原樣輸出)
     * @param name 實際在服務端創建的節點名
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
      System.out.println("創建結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",name=" + name);
      switch (rc) {
        case 0:
          System.out.println("節點創建成功:" + name);
          break;
        case -4:
          System.out.println("客戶端與服務端連接已斷開");
          break;
        case -110:
          System.out.println("指定節點已存在");
          break;
        case -112:
          System.out.println("會話已過期");
          break;
        default:
          System.out.println("服務端響應碼" + rc + "未知");
          break;
      }
    }
  }

  /**
   * 刪除節點(同步)
   * 注:只允許刪除葉子節點,不能直接刪除根節點
   *
   * @throws Exception Exception
   */
  @Test
  public void delete_API_sync() throws Exception {
    String path = "/zk-delete-znode-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.delete(path, -1);
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 刪除節點(異步)
   * 注:只允許刪除葉子節點,不能直接刪除根節點
   *
   * @throws Exception Exception
   */
  @Test
  public void delete_API_async() throws Exception {
    String path = "/zk-delete-znode-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.delete(path, -1, new DeleteCallBack(), "ZooKeeper async delete znode");
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 刪除節點異步回調
   */
  class DeleteCallBack implements AsyncCallback.VoidCallback {
    /**
     * @param rc   服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期
     * @param path 調用接口時傳入的節點路徑(原樣輸出)
     * @param ctx  調用接口時傳入的ctx值(原樣輸出)
     */
    @Override
    public void processResult(int rc, String path, Object ctx) {
      System.out.println("刪除結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx);
      switch (rc) {
        case 0:
          System.out.println("節點刪除成功");
          break;
        case -4:
          System.out.println("客戶端與服務端連接已斷開");
          break;
        case -112:
          System.out.println("會話已過期");
          break;
        default:
          System.out.println("服務端響應碼" + rc + "未知");
          break;
      }
    }
  }

  /**
   * 獲取子節點(同步)
   *
   * @throws Exception Exception
   */
  @Test
  public void getChildren_API_sync() throws Exception {
    String path = "/zk-getChildren-sync-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    zooKeeper.create(path + "/children1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    List<String> childrenList = zooKeeper.getChildren(path, true);
    System.out.println("獲取子節點:" + childrenList);
    zooKeeper.create(path + "/children2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 獲取子節點(異步)
   *
   * @throws Exception Exception
   */
  @Test
  public void getChildren_API_async() throws Exception {
    String path = "/zk-getChildren-async-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    zooKeeper.create(path + "/children1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.getChildren(path, true, new ChildrenCallBack(), "異步獲取子節點");
    zooKeeper.create(path + "/children2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 獲取子節點異步回調
   */
  class ChildrenCallBack implements AsyncCallback.Children2Callback {
    /**
     * @param rc           服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期
     * @param path         調用接口時傳入的節點路徑(原樣輸出)
     * @param ctx          調用接口時傳入的ctx值(原樣輸出)
     * @param childrenList 子節點列表
     * @param stat         節點狀態,由服務器端響應的新stat替換
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> childrenList, Stat stat) {
      System.out.println("獲取結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",childrenList=" + childrenList + ",stat=" + stat);
      switch (rc) {
        case 0:
          System.out.println("子節點獲取成功:" + childrenList);
          break;
        case -4:
          System.out.println("客戶端與服務端連接已斷開");
          break;
        case -112:
          System.out.println("會話已過期");
          break;
        default:
          System.out.println("服務端響應碼" + rc + "未知");
          break;
      }
    }
  }

  /**
   * 獲取節點數據(同步)
   * 更新節點數據(同步)
   *
   * @throws Exception Exception
   */
  @Test
  public void getData_API_sync() throws Exception {
    String path = "/zk-getData-sync-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    zooKeeper.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    System.out.println("節點數據:" + new String(zooKeeper.getData(path, true, stat)));
    System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion());
    zooKeeper.setData(path, "test".getBytes(), -1);
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 獲取節點數據(異步)
   * 更新節點數據(同步)
   *
   * @throws Exception Exception
   */
  @Test
  public void getData_API_async() throws Exception {
    String path = "/zk-getData-async-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    zooKeeper.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.getData(path, true, new DataCallBack(), "異步獲取節點數據");
    System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion());
    Stat stat1 = zooKeeper.setData(path, "test".getBytes(), -1);
    System.out.println("czxid=" + stat1.getCzxid() + ",mzxid=" + stat1.getMzxid() + ",version=" + stat1.getVersion());
    Stat stat2 = zooKeeper.setData(path, "test123".getBytes(), stat1.getVersion());
    System.out.println("czxid=" + stat2.getCzxid() + ",mzxid=" + stat2.getMzxid() + ",version=" + stat2.getVersion());
    try {
      zooKeeper.setData(path, "test123456".getBytes(), stat1.getVersion());
    } catch (KeeperException e) {
      System.out.println("Error Code:" + e.code() + "," + e.getMessage());
    }
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 獲取節點數據異步回調
   */
  class DataCallBack implements AsyncCallback.DataCallback {
    /**
     * @param rc   服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期
     * @param path 調用接口時傳入的節點路徑(原樣輸出)
     * @param ctx  調用接口時傳入的ctx值(原樣輸出)
     * @param data 節點數據
     * @param stat 節點狀態,由服務器端響應的新stat替換
     */
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
      System.out.println("獲取結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + new String(data) + ",stat=" + stat);
      System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion());
      switch (rc) {
        case 0:
          System.out.println("節點數據獲取成功:" + new String(data));
          break;
        case -4:
          System.out.println("客戶端與服務端連接已斷開");
          break;
        case -112:
          System.out.println("會話已過期");
          break;
        default:
          System.out.println("服務端響應碼" + rc + "未知");
          break;
      }
    }
  }

  /**
   * 更新節點數據(異步)
   *
   * @throws Exception Exception
   */
  @Test
  public void setData_API_async() throws Exception {
    String path = "/zk-setData-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    System.out.println("ZooKeeper state:" + zooKeeper.getState());
    // 所有線程執行完畢
    countDownLatch.await();
    zooKeeper.exists(path, true);
    zooKeeper.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.setData(path, "test123456".getBytes(), -1, new StatCallBack(), "異步更新節點數據");
    Thread.sleep(Integer.MAX_VALUE);
  }

  /**
   * 更新節點數據異步回調
   */
  class StatCallBack implements AsyncCallback.StatCallback {
    /**
     * @param rc   服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期
     * @param path 調用接口時傳入的節點路徑(原樣輸出)
     * @param ctx  調用接口時傳入的ctx值(原樣輸出)
     * @param stat 節點狀態,由服務器端響應的新stat替換
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
      System.out.println("更新結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat);
      System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion());
      switch (rc) {
        case 0:
          System.out.println("節點數據設置成功");
          break;
        case -4:
          System.out.println("客戶端與服務端連接已斷開");
          break;
        case -112:
          System.out.println("會話已過期");
          break;
        default:
          System.out.println("服務端響應碼" + rc + "未知");
          break;
      }
    }
  }

  /**
   * 權限控制
   *
   * @throws Exception Exception
   */
  @Test
  public void auth_control_API() throws Exception {
    String path = "/zk-setData-test";
    zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi());
    zooKeeper.addAuthInfo("digest", "zoo:true".getBytes());
    zooKeeper.create(path, "init".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
    // 1)無權限信息訪問
//    ZooKeeper zooKeeper1 = new ZooKeeper(host, 5000, new ZooKeeperApi());
//    System.out.println("訪問結果:" + new String(zooKeeper1.getData(path, true, stat)));
    // 2)錯誤權限信息訪問
//    ZooKeeper zooKeeper2 = new ZooKeeper(host, 5000, new ZooKeeperApi());
//    zooKeeper2.addAuthInfo("digest", "zoo:false".getBytes());
//    System.out.println("訪問結果:" + new String(zooKeeper2.getData(path, true, stat)));
    // 3)正確權限信息訪問
    ZooKeeper zooKeeper3 = new ZooKeeper(host, 5000, new ZooKeeperApi());
    zooKeeper3.addAuthInfo("digest", "zoo:true".getBytes());
    System.out.println("訪問結果:" + new String(zooKeeper3.getData(path, true, stat)));
    Thread.sleep(Integer.MAX_VALUE);
  }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM