zookeeper、ZK安裝、ZK配置、ZK使用


-----------------------------目錄-----------------------------------

第一部分:zookeeper簡介

第二部分:zookeeper環境搭建

  1、單機環境

  2、集群環境

第三部分:zookeeper基本使用

  1、java原生zk客戶端api操作

  2、zkClient客戶端操作(推薦)

  3、curator客戶端操作(推薦)

第四部分:zookeeper應用場景

第五部分:zookeeper深入進階

第六部分:zookeeper源碼分析

-----------------------------目錄-----------------------------------

 

第一部分:zookeeper簡介

1、 zookeeper基本概念

zookeeper是一個開源的分布式協調服務,其設計目標是將那些復雜並且容易出差錯的分布式一致性服務封裝起來,構成一個高效可靠的原語集,並提供給用戶一些簡單的接口,zookeeper是一個典型的分布式一致性的解決方案(CP模式),分布式應用程序可以基於它實現數據訂閱/發布、負載均衡,命名服務、集群管理、分布式鎖和分布式隊列等功能。

2、基本概念

@1、集群角色

通常在分布式系統中,構成一個集群中的每一台機器都有自己的角色,典型的是master/slave模式(主備模式),這種情況下能夠處理寫操作的機器成為master機器,把所有通過一步復制方式獲取最新數據並且提供服務的機器為slave機器。
在zookeeper中沒有是用主備模式,引入 Leader、Follower、Observer三種角色,在zk集群中所有的機器通過Leader選舉來選Leader,Leader服務器為客戶端提供讀寫服務,Follower和Observer都能提供讀服務,唯一的區別是Observer不參與Leader選舉,不參與寫操作的過半寫成功。

@2、會話session

Session指的是客戶端的會話, 一個客戶端鏈接是指客戶端和服務端之間的一個Tcp長連接,ZK對外服務端口默認為2181,客戶端啟動的時候會與服務器建立一個TCP連接,從第一次鏈接建立開始客戶端會話的聲明周期也開始了,通過鏈接,客戶端能夠檢測心跳和服務端保持有效會話,也能夠想zk服務器發送請求並接受響應,同時還能夠通過該鏈接接受來自服務器的Watch事件通知。

@3、數據節點znode

在zk中節點分兩類:第一類是指構成集群的機器,我們稱為機器節點。第二種是指數據模型中的數據單元,稱為數據節點znode。zk的數據存儲在內存中, 數據模型是樹型,有斜杠(/)進行分割路徑,就是一個znode,每個znode上都會保存自己的數據內容,同時還會保存一系列屬性信息。

@4、版本

zk在每個數據節點都會存儲數據,每個節點都會維護一個價stat的數據結構,記錄了三個數據版本: version(當前版本)、cversion(當前節點子節點的版本),aversion(當前節點的ACL版本) 

@5、ACL

Zookeeper采用ACL(Access Control Lists)策略來進行權限控制,定義一下五種權限:

  CREATE:創建子節點權限

  READ:獲取節點數據和子節點列表的權限

  WRITE:更新節點數據的權限

  DELETE:刪除子節點的權限

  ADMIN:設置節點ACL的權限

注意的是create和delete兩盒權限是針對子節點的權限控制

第二部分:zookeeper環境搭建

 zookeeper安裝有一下三種方式

1、單機模式:zk只運行在一台服務器上,適用測試環境

2、集群模式:zk運行在一個集群環境中,適用於線上生產環境

3、偽集群模式:一台服務器上運行多個zk實例監聽不同的端口

這里的環境搭建適用了兩個模式,單機模式和偽集群模式

一、單機模式(Linux)

1、下載穩定版本https://zookeeper.apache.org/releases.html

下面是下載命令(注意版本號),也可以登錄上面地址進行本地下載,在上傳服務器

wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

2、解壓

tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz

3、進入apache-zookeeper-3.6.1目錄創建data文件,用於持久話數據的文件夾

cd apache-zookeeper-3.6.1

mkdir data

4、修改配置文件名稱

zk默認讀取zoo.cfg文件,提供參考文件zoo_sample.cfg文件。

cd conf
cp zoo_sample.cfg zoo.cfg

5、修改zoo.cfg文件中的dataDir屬性,定位到剛創建的data文件

dataDir=/user/apache-zookeeper-3.6.1/data

6、啟動服務

cd ../bin

./zkServer.sh start

上面start 命令為啟動,stop為停止,status為查看zk啟動狀態以及身份

看到下圖提示啟動成功:

 二、偽集群模式

    點擊--->>> ZK安裝、ZK配置、ZK集群部署

第三部分:zookeeper基本使用

java代碼調用zookeeper原生Api進行增刪改查節點以及數據的代碼

一、java調用原生zk客戶端

引入pom文件

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.9</version>
        </dependency>

 1、創建zk的客戶端連接

 主要是獲取zk對象,並進行操作監聽。等同於zk開啟客戶端命令./zkCli.sh。

 需要注意的是監聽類實現Watcher接口,重寫process 方法

package city.albert.api;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/7/29  6:39 PM
 */
public class OpenSession implements Watcher {

    /**
     * 應為zk的監聽為獨立線程,需要保持主線程main方法不會直接結束使用CountDownLatch,設置線程數為1
     */
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 建立會話
     * 客戶端創建一個連接鏈接zk
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException {

        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new OpenSession());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
    }

    /**
     * 處理來自服務器端的watcher
     *
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println(watchedEvent.toString());
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("process 執行了。。。");
            countDownLatch.countDown();
        }
    }
}

2、創建zk的數據節點

由於代碼篇幅問題,下面先介紹重點代碼然后附帶測試的源代碼~~注意哦!

類似在客戶端中執行: create  /my_persist2  123

重點代碼:

 private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
        /**
         * path 創建節點路徑
         * data[]  節點值的字節數組
         *  acl  節點權限4中類型
         *       ANYONE_ID_UNSAFE  標識任何人
         *       AUTH_IDS    僅僅這個id才可以設置ACL,他將被客戶端驗證ID替換
         *       OPEN_ACL_UNSAFE  開放的ACL(常用)
         *       CREATOR_ALL_ACL  此ACL授予創建者身份驗證ID的所有權限
         *  createMode  創建節點的4中類型
         *        PERSISTENT            創建持久節點
         *        PERSISTENT_SEQUENTIAL 創建持久順序節點
         *        EPHEMERAL             創建臨時幾點
         *        EPHEMERAL_SEQUENTIAL  創建臨時順序節點
         *
         */
        String persistent = zooKeeper.create("/my_persistent", "創建持久節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String persistentSequential = zooKeeper.create("/my_persistent_sequential", "創建持久順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String ephemeral = zooKeeper.create("/my_ephemeral", "創建臨時節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "創建臨時順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("持久節點 "+persistent);
        System.out.println("持久順序節點 "+persistentSequential);
        System.out.println("臨時節點 "+ephemeral);
        System.out.println("臨時順序節點 "+ephemeralSequential);
    }

完整的test代碼

package city.albert.api;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/7/30  3:30 PM
 */
public class CreateZNodeBySession implements Watcher {

   private static   ZooKeeper zooKeeper;

    /**
     * 建立會話
     * 客戶端創建一個連接鏈接zk
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        //獲取zk鏈接會話對象    ip+端口,超時時間   watcher回調函數
         zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new CreateZNodeBySession());

        System.out.println(zooKeeper.getState());

        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 創建節點
     *
     * @param zooKeeper
     */
    private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
        /**
         * path 創建節點路徑
         * data[]  節點值的字節數組
         *  acl  節點權限4中類型
         *       ANYONE_ID_UNSAFE  標識任何人
         *       AUTH_IDS    僅僅這個id才可以設置ACL,他將被客戶端驗證ID替換
         *       OPEN_ACL_UNSAFE  開放的ACL(常用)
         *       CREATOR_ALL_ACL  此ACL授予創建者身份驗證ID的所有權限
         *  createMode  創建節點的4中類型
         *        PERSISTENT            創建持久節點
         *        PERSISTENT_SEQUENTIAL 創建持久順序節點
         *        EPHEMERAL             創建臨時幾點
         *        EPHEMERAL_SEQUENTIAL  創建臨時順序節點
         *
         */
        String persistent = zooKeeper.create("/my_persistent", "創建持久節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String persistentSequential = zooKeeper.create("/my_persistent_sequential", "創建持久順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String ephemeral = zooKeeper.create("/my_ephemeral", "創建臨時節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "創建臨時順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("持久節點 "+persistent);
        System.out.println("持久順序節點 "+persistentSequential);
        System.out.println("臨時節點 "+ephemeral);
        System.out.println("臨時順序節點 "+ephemeralSequential);
    }


    /**
     * 處理來自服務器端的watcher
     *
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println(watchedEvent.toString());
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("process 執行了。。。");

            //創建節點
            try {
                createZNode(zooKeeper);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}
View Code

3、獲取zk的數據節點以及子節點 

由於代碼篇幅問題,下面先介紹重點代碼,然后附帶測試的源代碼~~注意哦!

類似命令:

   ls /    獲取當前節點多有節點

   ls  /my_persist2  獲取當前節點所有子節點(下面getChildren方法)

   getAllChildrenNumber  獲取當前節點下子節點個數

   get  /my_persist2   獲取節點數據值(下面getData方法)

重點代碼:

   public void getData() throws KeeperException, InterruptedException {
        /**
         * 獲取某個節點數據api
         *  path,  獲取數據的節點路徑
         *  watcher,  是否開啟監聽,
         *   stat    節點狀態   null為最新版本數據
         */

        byte[] data = zooKeeper.getData("/my_persistent", false, null);
        System.out.println(new String(data));
        /**
         * 獲取某個節點的子節點方法
         *  path,  獲取數據的節點路徑
         *  watcher,  是否開啟監聽,
         *          當開啟監聽在節點變更的時候Watcher.process會返回: watchedEvent.getType()==Event.EventType.NodeChildrenChanged 類型,
         *          監聽通知成功監聽失效(因為通知是1次性有效,需要反復注冊)
         */
        List<String> children = zooKeeper.getChildren("/", true, null);
        System.out.println(children);

    }

下面為測試源碼獲取節點數據的

package city.albert.api;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.List;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/7/30  3:30 PM
 */
public class GetZNodeBySession implements Watcher {

    private static ZooKeeper zooKeeper;

    /**
     * 建立會話
     * 客戶端創建一個連接鏈接zk
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        //獲取zk鏈接會話對象    ip+端口,超時時間   watcher回調函數
        zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new GetZNodeBySession());

        System.out.println(zooKeeper.getState());

        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 處理來自服務器端的watcher
     *
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {

        if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            try {
                getData();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(watchedEvent.toString());
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("process 執行了。。。");

            //獲取zk節點

            try {
                getData();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public void getData() throws KeeperException, InterruptedException {
        /**
         * 獲取某個節點數據api
         *  path,  獲取數據的節點路徑
         *  watcher,  是否開啟監聽,
         *   stat    節點狀態   null為最新版本數據
         */

        byte[] data = zooKeeper.getData("/my_persistent", false, null);
        System.out.println(new String(data));
        /**
         * 獲取某個節點的子節點方法
         *  path,  獲取數據的節點路徑
         *  watcher,  是否開啟監聽,
         *          當開啟監聽在節點變更的時候Watcher.process會返回: watchedEvent.getType()==Event.EventType.NodeChildrenChanged 類型,
         *          監聽通知成功監聽失效(因為通知是1次性有效,需要反復注冊)
         */
        List<String> children = zooKeeper.getChildren("/", true, null);
        System.out.println(children);

    }
}
View Code

4、更新zk的數據節點

由於代碼篇幅問題,下面先介紹重點代碼,然后附帶測試的源代碼~~注意哦!

類似操作:set  /my_persist2  678

重點代碼:

private void updateData() throws KeeperException, InterruptedException {

        /**
         * 更新節點內容
         *  path, 節點路徑
         *  data, 更新節點數據內容
         *  version  更新版本 -1默認最新版本
         */
        zooKeeper.setData("/my_persist2","687".getBytes(),-1);
    }

下面為獲取節點數據的測試源碼

package city.albert.api;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.List;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/7/30  3:30 PM
 */
public class UpdateZNodeBySession implements Watcher {

    private static ZooKeeper zooKeeper;

    /**
     * 建立會話
     * 客戶端創建一個連接鏈接zk
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        //獲取zk鏈接會話對象    ip+端口,超時時間   watcher回調函數
        zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new UpdateZNodeBySession());

        System.out.println(zooKeeper.getState());

        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 處理來自服務器端的watcher
     *
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println(watchedEvent.toString());
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("process 執行了。。。");

            //獲取zk節點
            try {
                updateData();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void updateData() throws KeeperException, InterruptedException {

        /**
         * 更新節點內容
         *  path, 節點路徑
         *  data, 更新節點數據內容
         *  version  更新版本 -1默認最新版本
         */
        zooKeeper.setData("/my_persist2","687".getBytes(),-1);
    }


}
View Code

5、刪除zk的數據節點

由於代碼篇幅問題,下面先介紹重點代碼,然后附帶測試的源代碼~~注意哦!

類似操作:delete  /my_persist2 

private void deleteData() throws KeeperException, InterruptedException {

        /**
         * 刪除節點內容
         *  path, 節點路徑
         *  version  更新版本 -1默認最新版本
         */
        zooKeeper.delete("/my_persist2", -1);
    }

下面為獲取節點數據的測試源碼

package city.albert.api;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/7/30  3:30 PM
 */
public class DeleteZNodeBySession implements Watcher {

    private static ZooKeeper zooKeeper;

    /**
     * 建立會話
     * 客戶端創建一個連接鏈接zk
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        //獲取zk鏈接會話對象    ip+端口,超時時間   watcher回調函數
        zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new DeleteZNodeBySession());

        System.out.println(zooKeeper.getState());

        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 處理來自服務器端的watcher
     *
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println(watchedEvent.toString());
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("process 執行了。。。");

            //獲取zk節點
            try {
                deleteData();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void deleteData() throws KeeperException, InterruptedException {

        /**
         * 刪除節點內容
         *  path, 節點路徑
         *  version  更新版本 -1默認最新版本
         */
        zooKeeper.delete("/my_persist2", -1);
    }


}
View Code

二、使用zkClient客戶端連接

1、引入pom文件

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>

2、創建zkClient對象以及操作

操作包括:

  創建節點、順序節點、臨時節點、臨時順序節點、遞歸創建節點。

  獲取子節點列表,以及注冊監聽。

  修改節點值

  判斷節點是否存在

  讀取節點值

  刪除節點、遞歸刪除節點

package city.albert.api;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/8/5  5:17 PM
 */
public class ZKSession {
    public static void main(String[] args) throws InterruptedException {
        //借助zkClient創建會話 ,ZkClient借助zk原生spi的異步建立連接進行了實現為同步
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");

        //創建節點可以實現原生的方法遞歸調用創建createPersistent(path,true)方法非遞歸,先創建父節點在創建子節點
        //創建節點
        zkClient.create("/temp1", "11111", CreateMode.PERSISTENT);
        zkClient.create("/temp1/z1", "11111", CreateMode.PERSISTENT);
        //(遞歸)創建持久節點
        zkClient.createPersistent("/zkClient/zk1", true);
        //創建持久順序節點
        zkClient.createPersistentSequential("/zkClient/zk2", "z2");
        //創建臨時節點
        zkClient.createEphemeral("/zkClient/zk3", "z3");
        //創建臨時順序節點
        zkClient.createEphemeralSequential("/zkClient/zk4", "z4");

        //獲取子節點,同時注冊監聽
        List<String> temp1 = zkClient.getChildren("/temp1");
        System.out.println("temp1 =" + temp1);
        List<String> zc = zkClient.getChildren("/zkClient");
        System.out.println("zkClient =" + zc);

        //為節點注冊監聽
        zkClient.subscribeChildChanges("/zkClient", new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                System.out.println("監聽節點:" + s);
                System.out.println("監聽剩余節點:" + list);
            }
        });

        //修改節點
        zkClient.writeData("/zkClient/zk3", "zzzz");
        //修改節點為異步事件
        TimeUnit.SECONDS.sleep(2);

        //判斷節點是否存在
        boolean exists = zkClient.exists("/zkClient/zk3");
        System.out.println("是否存在:" + exists);


        //讀取節點值
        Object o = zkClient.readData("/zkClient/zk3");
        System.out.println("讀取值:" + o);

        //刪除節點
        zkClient.delete("/temp1/z1");
        zkClient.delete("/temp1");
        //(遞歸) 刪除節點 ,父節點為/zkClient
        zkClient.deleteRecursive("/zkClient");
    }
}

3、創curator對象以及操作

curator客戶端使用fluent風格編程,是netflix公司開源

 1、引入pom文件

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

 2

package city.albert.api;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/8/5  6:24 PM
 */
public class CuratorSession {

    public static void main(String[] args) throws Exception {

        //重試策略 ExponentialBackoffRetry基於ackoff重試,RetryNTimes重連n次策略,RetryForever永遠重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //創建客戶端對象
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                //鏈接超時時間
                .connectionTimeoutMs(30000)
                //創建session鏈接時間
                .sessionTimeoutMs(50000)
                //重試策略
                .retryPolicy(retryPolicy)
                //獨立的命名空間,父節點為/base
                .namespace("base").build();

        //創建鏈接
        client.start();


        //創建節點 ,默認創建持久節點
        //1、創建初始空節點
        client.create().forPath("/temp");
        //2、創建有內容節點
        client.create().forPath("/temp/tm", "內容".getBytes());
        client.create().forPath("/temp/tm/t1", "內容".getBytes());
        //3、遞歸創建父節點並且指定創建類型,EPHEMERAL是臨時節點,還有持久節點,持久、臨時順序節點。
        client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/temp2/tm2", "內容2".getBytes());

        //獲取節點值與狀態信息
        byte[] bytes = client.getData().forPath("/temp/tm/t1");
        System.out.println("節點值:"+new String(bytes));
        //獲取狀態信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/temp/tm/t1");
        System.out.println("狀態信息:" + stat);


        //更新節點數據
        //1、普通更新
        Stat stat1 = client.setData().forPath("/temp/tm/t1", "更新666".getBytes());
        System.out.println("更新后狀態信息:"+stat1);
        System.out.println("更新后的值:"+new String(client.getData().forPath("/temp/tm/t1")));
        //2、指定版本更新,-1為最新版本
        client.setData().withVersion(-1).forPath("/temp/tm/t1","更新8888".getBytes());
        System.out.println("版本更新后的值:"+new String(client.getData().forPath("/temp/tm/t1")));


        //刪除節點
        //1、刪除節點
        client.delete().forPath("/temp/tm/t1");
        //2、刪除並遞歸刪除子節點
        client.delete().deletingChildrenIfNeeded().forPath("/temp2");
        //3、指定版本刪除,版本不一致會拋出異常
        client.delete().withVersion(-1).forPath("/temp/tm");
        //4、保證強制刪除
        client.delete().guaranteed().forPath("/temp");

    }
}

第四部分:zookeeper應用場景

 一、數據發布/訂閱

  基於節點和注冊監聽機制實現 

二、命名服務

  給應用,主機命名,也可以使用持久順序節點(臨時順序節點)生成全局id,格式是job_000000001

三、日志收集

  日志源機器和日志收集器機器的管理

四、分布式鎖

  共享鎖(讀鎖):在固定節點下創建臨時順序節點(主機名-讀/寫(R/w)-0000000001),

 

  排它鎖: 在固定節點下創建臨時節點,創建鎖(zk保證多線程創建唯一節點),使用監聽機制來確定鎖被釋放

五、master選舉

  場景:一個廣告推送服務,需要大量計算才可以獲取到某個類型的推廣id,使用master-slave方式

  應用技術點:選舉master方式,通過創建臨時節點選擇(節點不可以重復創建),利用監聽節點檢測master狀態(或者添加state節點,使用心跳機制)

  結果:master負責計算,計算結束寫入某個節點,salve和master進行業務讀取即可。避免了大量的重復計算

六、分布式隊列

  先進先出隊列(FIFO):創建queue_fifo

  Barrier分布式屏障,類似多線程計算,計算結束進行結果匯總

第五部分:zookeeper深入進階

 

 


第六部分:zookeeper源碼分析

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM