Zookeeper 掃盲


Zookeeper 掃盲 :disappointed_relieved:

配置文件詳解:

  • tickTime:基本事件單元,以毫秒為單位,這個時間作為 Zookeeper 服務器之間或客戶端之間維持心跳的時間間隔
  • dataDir:存儲內存中數據庫快照的位置,顧名思義就是 Zookeeper 保存數據的目錄,默認情況下,Zookeeper 將寫數據的日志文件也保存到這個目錄里
  • clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求
  • initLimit:這個配置項是用來配置 Zookeeper 接受客戶端初始化連接時最長能忍受多少個心跳時間間隔
    • 當已經超過 10 個心跳的時間也就是(ticktime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗,總的時間長度就是:10*2000 = 20s
  • syncLimit:這個配置項表示 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是:5*2000 = 10s
  • server.A = B:C:D
    • A:表示這是第幾號服務器
    • B:服務器的 IP 地址
    • C:服務器與集群中的 Leader 服務器交換信息的端口
    • D:一旦集群中的 Leader 服務器掛了,需要一個端口重新進行選舉,選出一個新的 Leader
    • 2181:對外提供端口
    • 2888:內部同步端口
    • 3888:節點掛了,選舉端口

ZK 設計目標

  • 簡單的數據結構(樹形名字空間)
  • 構建集群(一般奇數,超過半數以上就可以正常訪問)
  • 順序訪問
  • 高性能(12w QPS)

集群搭建

mark

Java 連接 ZK

  • 客戶端通過創建一個 ZK 實例來連接 ZK 服務器

  • Zookeeper(Arguments) 方法

    • connectString:連接服務器列表,用“,”分隔
    • sessionTimeout:心跳檢測時間周期(毫秒)
    • wather:時間處理通知器
    • canBeReadOnly:標識當前會話是否支持只讀
    • SessionId 和 SessionPasswd:提供連接 Zookeeper 的sessionId 和 密碼,通過這倆個確定唯一一台客戶端,目的是可以提供重復會話
  • ZK 客戶端和服務器會話的建立是一個異步的過程,我們程序方法在處理完客戶端初始化后立即返回,
    • 也就是程序往下執行代碼,這樣,大多數情況下我們並沒有真正構建好一個可用會話,在會話生命周期處於“CONNECTING”時,才算建立完畢

Zookeeper 組成

  • 根據其身份的特征分為三種:Leader、Follower、Observer,其中 Follower 和 Observer 統稱為 Learner (學習者)
  • leader:負責客戶端的 writer 類型請求
  • Follower:負責客戶端 reader 類型請求,參與 leader 選舉
  • Observer:特殊的“Follower”,其可以接收客戶端 reader 請求,但不參與選舉。(擴容系統支撐能力,提高讀取速度)因為他不接受任何同步的寫入請求,只負責 leader 同步數據

Java 操作方法

  • 創建節點(znode)方法:create

    • 提供了兩套創建節點的方法,同步和異步創建節點方法
    • 參數1:節點路徑:/nodeName (不允許遞歸創建節點,也就是在父節點不存在的情況下,不允許創建子節點)
    • 參數2:節點內容:要求類型是字節數組(不支持序列化方式,如果需要實現程序化,可使用 Java 相關序列化框架,如 Hession、Kryo 框架)
    • 參數3:節點權限:使用 Ids.OPEN_ACL_UNSAFE 開放權限
    • 參數4:節點類型:創建節點的類型: CreateMode.*
    • persistent(持久節點)
    • persistent_sequential(持久順序節點)
    • ephemeral(臨時節點)
    • ephemeral_sequential(臨時順序節點)
    • 參數5:注冊一個異步回調函數,要實現 AsynCallBack.StringCallBack 接口,重寫 processResult(int rc, String path, Object ctx, String name) 方法,當節點創建完畢后執行此方法。
    • rc:為服務端相應碼 0 表示調用成功、-4 表示端口連接、-110 表示指定節點存在、-112 表示會話已經過期
    • path:接口調用時傳入 API 的數據節點的路徑參數
    • ctx:為調用接口傳入 API 的 ctx 值
    • name:實際在服務器端創建節點的名稱
    • 參數6:傳遞給回調函數的參數,一般為上下文(Context)信息
  • 使用了 CountDownLatch 中的 countDown 只要是要確保我們的zk 連接成功再繼續往下進行。

  • 對於 Zookeeper 中存在節點時,我們添加相同節點時,我們不能創建成功。

在創建臨時節點時,在本次回話有效,當本次回話結束時,我們的臨時節點就會失效

單一視圖,三個節點上數據是一致的,消息廣播,臨時的 temp

分布式鎖原理:對於臨時節點,同一時間只能有一個 Client 操作一個節點,同時貌似加了一把鎖的形式,可以對於相同的業務邏輯,不同的 Tomcat 操作,就確保了操作的唯一性。存在內存中,效率高 12WQPS

zk.create("/app/c1", "c1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

原生的API中 Zookeeper 不允許遞歸創建節點

public class ZookeeperBase {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181";
    /** session超時時間 */
    static final int SESSION_OUTTIME = 2000;//ms
    /** 信號量,阻塞程序執行,用於等待zookeeper連接成功,發送成功信號 */
    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{

        ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
            @Override
            public void process(WatchedEvent event) {
                //獲取事件的狀態
                KeeperState keeperState = event.getState();
                EventType eventType = event.getType();
                //如果是建立連接
                if(KeeperState.SyncConnected == keeperState){
                    if(EventType.None == eventType){
                        //如果建立連接成功,則發送信號量,讓后續阻塞程序向下執行
                        connectedSemaphore.countDown();
                        System.out.println("zk 建立連接");
                    }
                }
            }
        });

        //進行阻塞
        connectedSemaphore.await();

        System.out.println("..");
        //創建父節點
//        zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //創建子節點
//        zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //獲取節點洗信息
//        byte[] data = zk.getData("/testRoot", false, null);
//        System.out.println(new String(data));
//        System.out.println(zk.getChildren("/testRoot", false));

        //修改節點的值
//        zk.setData("/testRoot", "modify data root".getBytes(), -1);
//        byte[] data = zk.getData("/testRoot", false, null);
//        System.out.println(new String(data));    

        //判斷節點是否存在
//        System.out.println(zk.exists("/testRoot/children", false));
        //刪除節點
//        zk.delete("/testRoot/children", -1);
//        System.out.println(zk.exists("/testRoot/children", false));

        zk.close();
    }

}

getChildren 只可以取下面直接的一層,

使用 -1 是跳過版本檢查,如果再刪除的時候,會檢查本地版本和遠程版本若相同則會刪除,否則不刪除。同時不支持遞歸的刪除

getChildren 讀取數據方法:包括子節點列表的獲取和子節點數據的獲取

  • 參數1:path:獲取指定節點的下的數據(獲取子節點列表)
  • 參數2:watcher:注冊的 watcher ,一旦在本次子節點獲取后,子節點列表發生變化的話,那么就會向客戶端發送通知,該參數允許為 null。
  • 參數3:表明是否需要注冊一個 watcher:如果為 true,則會使用到 zookeeper 客戶端上下文中提到的那個默認 watcher ,如果為 false,則表明不需要注冊 watcher。
  • 參數4:cb:回調函數
  • 參數5:ctx:上下文信息對象
  • 參數6:stat:指定數據節點的節點狀態信息

注意:

  • 當我們獲取指定節點的子節點列表后,還需要訂閱這個子節點列表的變化通知,這時候就可以注冊一個 watcher 來實現
  • 當子節點被添加或刪除時,服務器端就會觸發一個“NodeChildrenChanged“類型的時間通知,
  • 服務器端發送給客戶端收到這個事件通知中,是不包含最新的節點列表的,客戶端必須主動從新進行獲取,通常在客戶端收到這個事件通知后,就可以再次主動獲取最新的子節點列表了
  • ZK 服務端在向客戶端發送 watcher “NodeChildrenChanged”事件通知的時候,僅僅只發了一個通知,不會節點把節點的節點的變化情況發送給客戶端,需要客戶端自己重新獲取
  • watcher 通知是一次性的,即觸發后失效,因此客戶端需要反復注冊 watcher 才行。

getData 方法:獲取指定節點的數據內容

  • path:路徑
  • watcher:注冊的 watcher 對象,一旦之后節點內容有變更,則會向客戶端發送通知,該參數允許為 null,觸發事件為“NodeDataChanged”事件通知
  • stat:指定節點的狀態信息
  • watch:是否使用 watcher,如果為 true 則使用默認上下文中的 watcher, false 則不使用 watcher
  • cb:回調函數
  • ctx:傳遞上下文信息對象

setData 方法:修改指定節點的數據內容

  • path:路徑
  • data:數據內容
  • 版本號:(-1 覆蓋之前的所有的版本)
  • cb:回調函數
  • ctx:用於傳遞的下文信息對象

exists 方法:檢測節點是否存在

  • path:路徑
  • watcher:注冊的 watcher 對象,用於三類時間監聽(節點的創建,刪除,更新)
  • cb:回調函數
  • ctx:傳遞的上下文信息對象
  • exits 方法的意義在於無論節點是否存在,都可以進行注冊 watcher,能夠對節點的創建,刪除和修改進行監聽,但是其子節點發送各種變化,都不會通知客戶端。

事件類型

  • ZK 有 watch 事件,是一次性觸發的,當 watch 監視的數據發生變化時,通知設置了該watch 的 client ,即 watcher

  • watcher 是監聽數據發送了某些變化,那就一定會有對應的事件類型和狀態類型

  • 事件類型:(znode節點相關)
    • EventType.NodeCreated
    • EventType.NodeDataChanged
    • EventType.NodeChildrenChaged
    • EventType.NodeDeleted
  • 狀態類型:(客戶端實例相關)

    • KeeperState.Disconnected
    • KeeperState.SyncConnected
    • KeeperState.AuthFailed
    • KeeperState.Expired
  • watch事件是一次性的,watcher 表示 client

package bjsxt.zookeeper.watcher;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
* Zookeeper Wathcher
* 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類)
* @author(alienware)
* @since 2015-6-14
*/
public class ZooKeeperWatcher implements Watcher {

    /** 定義原子變量 */
    AtomicInteger seq = new AtomicInteger();
    /** 定義session失效時間 */
    private static final int SESSION_TIMEOUT = 10000;
    /** zookeeper服務器地址 */
    private static final String CONNECTION_ADDR = "192.168.80.88:2181";
    /** zk父路徑設置 */
    private static final String PARENT_PATH = "/testWatch";
    /** zk子路徑設置 */
    private static final String CHILDREN_PATH = "/testWatch/children";
    /** 進入標識 */
    private static final String LOG_PREFIX_OF_MAIN = "【Main】";
    /** zk變量 */
    private ZooKeeper zk = null;
    /** 信號量設置,用於等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 創建ZK連接
     * @param connectAddr ZK服務器地址列表
     * @param sessionTimeout Session超時時間
     */
    public void createConnection(String connectAddr, int sessionTimeout) {
        this.releaseConnection();
        try {
            zk = new ZooKeeper(connectAddr, sessionTimeout, this);
            System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務器");
            connectedSemaphore.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉ZK連接
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 創建節點
     * @param path 節點路徑
     * @param data 數據內容
     * @return
     */
    public boolean createPath(String path, String data) {
        try {
            //設置監控(由於zookeeper的監控都是一次性的所以 每次必須設置監控)
            this.zk.exists(path, true);
            System.out.println(LOG_PREFIX_OF_MAIN + "節點創建成功, Path: " +
                               this.zk.create(    /**路徑*/
                                                   path,
                                                   /**數據*/
                                                   data.getBytes(),
                                                   /**所有可見*/
                                                   Ids.OPEN_ACL_UNSAFE,
                                                   /**永久存儲*/
                                                   CreateMode.PERSISTENT ) + 
                               ", content: " + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 讀取指定節點數據內容
     * @param path 節點路徑
     * @return
     */
    public String readData(String path, boolean needWatch) {
        try {
            return new String(this.zk.getData(path, needWatch, null));
        } catch (Exception e) {
            e.printStackTrace();
            return "";
        }
    }

    /**
     * 更新指定節點數據內容
     * @param path 節點路徑
     * @param data 數據內容
     * @return
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " +
                                this.zk.setData(path, data.getBytes(), -1));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 刪除指定節點
     *
     * @param path
     *            節點path
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 判斷指定節點是否存在
     * @param path 節點路徑
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 獲取子節點
     * @param path 節點路徑
     */
    private List<String> getChildren(String path, boolean needWatch) {
        try {
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 刪除所有節點
     */
    public void deleteAllTestPath() {
        if(this.exists(CHILDREN_PATH, false) != null){
            this.deleteNode(CHILDREN_PATH);
        }
        if(this.exists(PARENT_PATH, false) != null){
            this.deleteNode(PARENT_PATH);
        }    
    }

    /**
     * 收到來自Server的Watcher通知后的處理。
     */
    @Override
    public void process(WatchedEvent event) {

        System.out.println("進入 process 。。。。。event = " + event);

        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (event == null) {
            return;
        }

        // 連接狀態
        KeeperState keeperState = event.getState();
        // 事件類型
        EventType eventType = event.getType();
        // 受影響的path
        String path = event.getPath();

        String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

        System.out.println(logPrefix + "收到Watcher通知");
        System.out.println(logPrefix + "連接狀態:\t" + keeperState.toString());
        System.out.println(logPrefix + "事件類型:\t" + eventType.toString());

        if (KeeperState.SyncConnected == keeperState) {
            // 成功連接上ZK服務器
            if (EventType.None == eventType) {
                System.out.println(logPrefix + "成功連接上ZK服務器");
                connectedSemaphore.countDown();
            }
            //創建節點
            else if (EventType.NodeCreated == eventType) {
                System.out.println(logPrefix + "節點創建");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.exists(path, true);
            }
            //更新節點
            else if (EventType.NodeDataChanged == eventType) {
                System.out.println(logPrefix + "節點數據更新");
                System.out.println("我看看走不走這里……..");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "數據內容: " + this.readData(PARENT_PATH, true));
            }
            //更新子節點
            else if (EventType.NodeChildrenChanged == eventType) {
                System.out.println(logPrefix + "子節點變更");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "子節點列表:" + this.getChildren(PARENT_PATH, true));
            }
            //刪除節點
            else if (EventType.NodeDeleted == eventType) {
                System.out.println(logPrefix + "節點 " + path + " 被刪除");
            }
            else ;
        }
        else if (KeeperState.Disconnected == keeperState) {
            System.out.println(logPrefix + "與ZK服務器斷開連接");
        }
        else if (KeeperState.AuthFailed == keeperState) {
            System.out.println(logPrefix + "權限檢查失敗");
        }
        else if (KeeperState.Expired == keeperState) {
            System.out.println(logPrefix + "會話失效");
        }
        else ;

        System.out.println("--------------------------------------------");

    }

    /**
     * <B>方法名稱:</B>測試zookeeper監控<BR>
     * <B>概要說明:</B>主要測試watch功能<BR>
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        //建立watcher
        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
        //創建連接
        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
        //System.out.println(zkWatch.zk.toString());

        Thread.sleep(1000);

        // 清理節點
        zkWatch.deleteAllTestPath();

        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {

            Thread.sleep(1000);


            // 讀取數據
            System.out.println("---------------------- read parent ----------------------------");
            //zkWatch.readData(PARENT_PATH, true);

            // 讀取子節點
            System.out.println("---------------------- read children path ----------------------------");
            zkWatch.getChildren(PARENT_PATH, true);

            // 更新數據
            zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            // 創建子節點
            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
        }

        Thread.sleep(50000);
        // 清理節點
        zkWatch.deleteAllTestPath();
        Thread.sleep(1000);
        zkWatch.releaseConnection();
    }

}

Zookeeper 的 ACL (AUTH)

ACL(Access Control List),Zookeeper 作為一個分布式協調框架,其內部存儲的都是一些關乎分布式系統運行時狀態的元數據,尤其是涉及到一些分布式鎖、Master選舉和協調等應用場景。我們需要有效地保障 Zookeeper 中的數據安全,Zookeeper 提供一套完善的 ACL 權限控制機制來保障數據的安全。

ZK 提供了三種模式:權限模式、授權對象、權限

權限模式:Scheme 開發人員最多使用的如下四種權限模式:

  • IP :IP 模式通過 IP 地址粒度來進行控制權限,例如配置了:IP:192.168.1.107 即表示權限控制都是針對這個 IP 地址的,同時也支持按網段分配,比如 192.168.1.*
  • Digest:digest 是最常用的權限控制模式,更符合我們對權限控制的認識,起類似於 username:password 形式的權限標識進行權限配置,ZK 會對形成的權限標識先后進行兩次編碼處理,分別是 SHA-1 加密算法、BASE64 編碼
  • world:world 是一直最開放的權限控制模式,這種模式可以看做為特殊的 Disgest,它僅僅是一個標識而已
  • Super:超級用戶模式,在超級用戶模式下可以對 ZK 任意進行控制

權限對象:指的是權限賦予用戶或者一個指定的實體,例如 IP 地址或機器等,在不同的模式下,授權對象是不同的,這種模式和權限對象一一對應。

權限:權限就是指那些通過權限檢測后可以被允許執行的操作,在 ZK 中,對數據的操作權限分為五大類:CREATE、DELETE、READ、WRITE、ADMIN

認證只是針對某一個節點。

zkClient 的使用

創建客戶端方法:ZKClient(Arguments)

  • arg1:zkServers zookeeper服務器的地址,用“,”分隔
  • arg2:sessionTimeout 超時會話,為毫秒,默認為 30 000ms
  • arg3:connectionTimeout 連接超時會話
  • arg4:IZkConnection 接口的實現類
  • arg5:zkSerializer 自定義序列化實現

可以使用遞歸創建,每個節點沒法指定 value,可以遞歸刪除

zkclient 最大的一個優點就是: 把重復watch的那個事情去掉,不需要再寫數據時watch

readeata 時,直接讀取的就是字符串,不是byte字節流

創建節點方法:create、createEphemeral、createEphemeralSequential、createPersistent、createPersistentSequential

  • arg1:path,路徑
  • arg2:data,數據內容,可以傳入 null
  • arg3:mode,節點類型,為一個枚舉類型,四種形式
  • arg4:acl 策略
  • arg5:callback 回調函數
  • arg6:context 上下文對象
  • arg7:createParents 是否創建父節點

刪除節點方法:delete、deleteRecursive

  • arg1:Path,路徑
  • arg2:callback,回調函數
  • arg3:context,上下文對象

讀取子節點數據方法:getChildren

  • arg1:path,路徑

讀取節點數據方法:readData

  • arg1:path,路徑
  • arg2:returnNullIFPathNotExists (避免為空節點拋出異常,直接返回 null)
  • arg3:節點狀態

更新數據方法:writeData

  • arg1:path,路徑
  • arg2:data,數據信息
  • arg3:version 版本號

檢測節點是否存在方法:exists

  • arg:path,路徑

我們發現,ZkClient 里面並沒有類似的 watcher、watch 參數,這也就是我們說開發人員無需關心反復注冊 Watcher 的問題,ZkClient 給我們提供了一套監聽方式,我們可以使用監聽節點的方式進行操作,剔除了繁瑣的反復 watcher 操作,簡化了代碼復雜程度

subscribeChildChanges 方法

  • arg1:path,路徑
  • 實現 IZkChildListener 接口的類,只需要重寫其 handleChaildChanges(String parentPath,List<String> currentChilds) 方法,其中參數 parentPath 為所監聽節點全路徑,currentChilds 為最新的子節點列表(相對路徑)
  • IZkChildListener 事件說明針對於下面三個事件觸發:
    • 新增子節點
    • 減少子節點
    • 刪除節點

IZkChildListener 有以下特點:

  1. 客戶端可以對一個不存在的節點進行變更的監聽

  2. 一旦客戶端對一個節點注冊了子節點列表變更監聽后,那么當前節點的子節點雷彪發送變更的時候,服務端都會通知客戶端,並將最新的自己誒單列表發送給客戶端

  3. 該節點本身創建或刪除也會通知到客戶端

  4. 另外最重要的是這個監聽是一直存在的,不是單次監聽,相比較原生 API 提供的要簡單的多

  5. subscribeChildChange("/super",new IZkChildListener() {})

    • 對於 Zkclient 會告訴你變化之后的數據是多少,對於節點的當前和子級的狀態

    • 當對於一個節點的update 的時候,並不會監聽,只會監聽當前節點或子節點的添加和刪除

  6. subscribeDataChange("/super",new IZkDataListener() {})

    • 對於變更的狀態包括:刪除和修改,其中的狀態分開走

原先zk設計的狀況:

  • event 只會告訴你變化的事件,觸發什么事件,自己根據path,讀取數據,並且還是一次,

  • 通知,狀態,節點路徑,對於變化的之后的數據沒有告訴,設計的理念就是輕量,敏捷。

Curator 封裝

是否可以監控一個節點下面所有節點的狀態,分布式鎖,原子統計

Curator 框架中使用了鏈式編程風格,易讀性更強,使用工程方法創建連接對象

使用 CuratorFrameworkFactory 的兩個靜態工廠方法(參數不同)來實現

  • arg1:connectString,連接串
  • arg2:retryPolicy,重試連接策略,有四種實現分別為:
    • ExponentialBackoffRetry
    • RetryNTimes
    • RetryOneTimes
    • RetryUntilElapsed
  • arg3:sessionTimeoutMs 會話超時時間 默認為 60 000ms
  • arg4:connectionTimeoutMs 連接超時時間,默認為15 000ms
  • 對於 retryPolicy 策略通過一個接口來讓用戶自定義實現

創建節點 create 方法,可選鏈式項:

  • creatingParentsIfNeeded
  • withMode
  • forPath
  • withACL

刪除節點 delete 方法,可選鏈式項

  • deletingChildrenIfNedded
  • guaranteed
  • withVersion
  • forPath

讀取和修改數據

  • getData
  • setData

異步綁定回調方法

  • 創建節點時綁定一個回調函數,該回調函數可以輸出服務器的狀態碼以及服務器事件類型
  • 可以加入線程池進行優化

讀取子節點方法

  • getChildren

判斷子節點是否存在

  • checkExists

回調函數為什么會使用線程池:

  • 一次性批量創建多個節點,不必要每次都回調
  • ThreadPoolExcutor 底層自定義線程
  • 批量節點操作,自己規划 callback ,多余的緩沖在隊列中

Watcher 監聽功能

  • 依賴 maven jar包

    <dependency>
      <groupid>org.apache.curator</groupid>
        <artifactid>curator-recipes</artifactid>
        <version>2.4.2</version>
    </dependency>
  • 我們使用 NodeCache 的方式去客戶端實例中注冊一個監聽緩存,然后實現對應的監聽方法即可,

  • 主要的監聽方式:
    • NodeCacheListener:監聽節點的新增,修改操作
    • PathchildrenCacheListener:監聽子節點的新增、修改、刪除操作

在客戶端使用緩存,在服務端進行變化時,和本地的進行對比,有差異數據同步,空間換取時間,替換了原來的注冊的思路。

只能監聽一級節點,再下一級就不能實現,對於刪除也不能迭代刪除

分布式鎖

分布式鎖就是在共享的一段代碼中,一個服務器使用,其他的服務器不允許訪問,分布式鎖

實現分布式鎖,對於Java程序寫出花來也沒用,就是只針對同一個JVM,當多個JVM如何同步

分布式計數器:DistributedAtomicInteger

使用了distribute,

重試的時間次數,

barrier 同時開始,同時結束,代碼,

DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);

其中的 5 代表五個客戶端的連接,當五個連接上就可以同時開始,我們使用

barrier.enter();

此時就同時開始,

barrier.leave();

同時結束

在聲明 Barrier 的時候也可以不設置程序的數量

同時還有另外一種寫法:

實現聲明 barrier

/** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
    /** session超時時間 */
    static final int SESSION_OUTTIME = 5000;//ms

    static DistributedBarrier barrier = null;

    public static void main(String[] args) throws Exception {



        for(int i = 0; i < 5; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                        CuratorFramework cf = CuratorFrameworkFactory.builder()
                                    .connectString(CONNECT_ADDR)
                                    .sessionTimeoutMs(SESSION_OUTTIME)
                                    .retryPolicy(retryPolicy)
                                    .build();
                        cf.start();
                        barrier = new DistributedBarrier(cf, "/super");
                        System.out.println(Thread.currentThread().getName() + "設置barrier!");
                        barrier.setBarrier();    //設置
                        barrier.waitOnBarrier();    //等待
                        System.out.println("---------開始執行程序----------");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },"t" + i).start();
        }

        Thread.sleep(5000);
        barrier.removeBarrier();    //釋放


    }

其中 barrier 作為第六人就是吹哨的人,一般我們自己設定的 const 包,用於設置的吹哨的公共

使用 curator 的分布式鎖,對於監聽的相同節點,若之前發生了變更,之后連接還會貌似數據恢復一點,數據同步,相當於重復注冊,當前 /super 節點,持續訂閱服務,

curator 人性化操作:

  • 對於一個節點的CRUD監控
  • 實現分布式鎖
  • 實現barrier、原子計數器
  • 實現隊列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM