Zookeeper是什么&怎么用


1.Zookeeper概述

Zookeeper 是一個開源的分布式協調服務框架 ,主要用來解決分布式集群中應用系統的一致性問題和數據管理問題

image-20200710171951672

2:Zookeeper的特點

  • Zookeeper 本質上是一個分布式文件系統, 適合存放小文件,也可以理解為一個數據庫

image-20200710143351331

  • 在上圖左側, Zookeeper 中存儲的其實是一個又一個 Znode, Znode 是 Zookeeper 中的節點

  • Znode 是有路徑的, 例如 /data/host1 , /data/host2 , 這個路徑也可以理解為是Znode 的 Name

  • Znode 也可以攜帶數據, 例如說某個 Znode 的路徑是 /data/host1 , 其值是一個字符串 "192.168.0.1"

  • 正因為 Znode 的特性, 所以 Zookeeper 可以對外提供出一個類似於文件系統的試圖, 可以通過操作文件系統的方式操作 Zookeeper

    • 使用路徑獲取 Znode
    • 獲取 Znode 攜帶的數據
    • 修改 Znode 攜帶的數據
    • 刪除 Znode
    • 添加 Znode

3.Zookeeper的應用場景

3.1 數據發布/訂閱

數據發布/訂閱系統,需要發布者將數據發布到Zookeeper的節點上,供訂閱者進行數據訂閱,進而達到動態獲取數據的目的,實現配置信息的集中式管理和數據的動態更新。

發布/訂閱一般有兩種設計模式:推模式和拉模式,服務端主動將數據更新發送給所有訂閱的客戶端稱為推模式;客戶端主動請求獲取最新數據稱為拉模式。

Zookeeper采用了推拉相結合的模式,客戶端向服務端注冊自己需要關注的節點,一旦該節點數據發生變更,那么服務端就會向相應的客戶端推送Watcher事件通知,客戶端接收到此通知后,主動到服務端獲取最新的數據。

3.2 命名服務

命名服務是分步實現系統中較為常見的一類場景,分布式系統中,被命名的實體通常可以是集群中的機器、提供的服務地址或遠程對象等,通過命名服務,客戶端可以根據指定名字來獲取資源的實體,在分布式環境中,上層應用僅僅需要一個全局唯一的名字。Zookeeper
可以實現一套分布式全局唯一ID的分配機制。

image-20200710143734088

通過調用Zookeeper節點創建的API接口就可以創建一個順序節點,並且在API返回值中會返回這個節點的完整名字,利用此特性,可以生成全局ID,其步驟如下

  1. 客戶端根據任務類型,在指定類型的任務下通過調用接口創建一個順序節點,如"job-"。
  2. 創建完成后,會返回一個完整的節點名,如"job-00000001"。
  3. 客戶端拼接type類型和返回值后,就可以作為全局唯一ID了,如"type2-job-00000001"。

3.3 分布式協調/通知

Zookeeper中特有的Watcher注冊於異步通知機制,能夠很好地實現分布式環境下不同機器,甚至不同系統之間的協調與通知,從而實現對數據變更的實時處理。通常的做法是不同的客戶端都對Zookeeper上的同一個數據節點進行Watcher注冊,監聽數據節點的變化(包括
節點本身和子節點),若數據節點發生變化,那么所有訂閱的客戶端都能夠接收到相應的Watcher通知,並作出相應處理。

在絕大多數分布式系統中,系統機器間的通信無外乎**心跳檢測、工作進度匯報和系統調度 **。

(1)心跳檢測:不同機器間需要檢測到彼此是否在正常運行,可以使用Zookeeper實現機器間的心跳檢測,基於其臨時節點特性(臨時節點的生存周期是客戶端會話,客戶端若當即后,其臨時節點自然不再存在),可以讓不同機器都在Zookeeper的一個指定節點下創建臨時子節點,不同的機器之間可以根據這個臨時子節點來判斷對應的客戶端機器是否存活。通過Zookeeper可以大大減少系統耦合。

(2)工作進度匯報,通常任務被分發到不同機器后,需要實時地將自己的任務執行進度匯報給分發系統,可以在Zookeeper上選擇一個節點,每個任務客戶端都在這個節點下面創建臨時子節點,這樣不僅可以判斷機器是否存活,同時各個機器可以將自己的任務執行進度寫到該臨時節點中去,以便中心系統能夠實時獲取任務的執行進度

(3)系統調度,Zookeeper能夠實現如下系統調度模式:分布式系統由控制台和一些客戶端系統兩部分構成,控制台的職責就是需要將一些指令信息發送給所有的客戶端,以控制他們進行相應的業務邏輯,后台管理人員在控制台上做一些操作,實際上就是修改Zookeeper上某些節點的數據,Zookeeper可以把數據變更以時間通知的形式發送給訂閱客戶端

3.4分布式鎖

分布式鎖用於控制分布式系統之間同步訪問共享資源的一種方式,可以保證不同系統訪問一個或一組資源時的一致性,主要分為排它鎖和共享鎖。

排它鎖又稱為寫鎖或獨占鎖 ,若事務T1對數據對象O1加上了排它鎖,那么在整個加鎖期間,只允許事務T1對O1進行讀取和更新操作,其他任何事務都不能再對這個數據對象進行任何類型的操作,直到T1釋放了排它鎖

①獲取鎖,在需要獲取排它鎖時,所有客戶端通過調用接口,在/exclusive_lock節點下創建臨時子節點/exclusive_lock/lock。Zookeeper可以保證只有一個客戶端能夠創建成功,沒有成功的客戶端需要注冊/exclusive_lock節點監聽。

② 釋放鎖,當獲取鎖的客戶端宕機或者正常完成業務邏輯都會導致臨時節點的刪除,此時,所有在/exclusive_lock節點上注冊監聽的客戶端都會收到通知,可以重新發起分布式鎖獲取。

共享鎖又稱為讀鎖 ,若事務T1對數據對象O1加上共享鎖,那么當前事務只能對O1進行讀取操作,其他事務也只能對這個數據對象加共享鎖,直到該數據對象上的所有共享鎖都被釋放。在需要獲取共享鎖時,所有客戶端都會到/shared_lock下面創建一個臨時順序節點。

3.5 分布式隊列

有一些時候,多個團隊需要共同完成一個任務,比如,A團隊將Hadoop集群計算的結果交給B團隊繼續計算,B完成了自己任務再交給C團隊繼續做。這就有點像業務系統的工作流一樣,一環一環地傳下去。

分布式環境下,我們同樣需要一個類似單進程隊列的組件,用來實現跨進程、跨主機、跨網絡的數據共享和數據傳遞,這就是我們的分布式隊列。

4.Zookeeper的架構

Zookeeper集群是一個基於主從架構的高可用集群

image-20200710161121251

每個服務器承擔如下三種角色中的一種

Leader 一個Zookeeper集群同一時間只會有一個實際工作的Leader,它會發起並維護與各Follwer及Observer間的心跳。所有的寫操作必須要通過Leader完成再由Leader將寫操作廣播給其它服務器。

Follower 一個Zookeeper集群可能同時存在多個Follower,它會響應Leader的心跳。Follower可直接處理並返回客戶端的讀請求,同時會將寫請求轉發給Leader處理,並且負責在Leader處理寫請求時對請求進行投票。

Observer 角色與Follower類似,但是無投票權。

image-20200710161218244

5.Zookeeper的選舉機制

Leader選舉是保證分布式數據一致性的關鍵所在。當Zookeeper集群中的一台服務器出現以下兩種情況之一時,需要進入Leader選舉。

5.1. 服務器啟動時期的Leader選舉

若進行Leader選舉,則至少需要兩台機器,這里選取3台機器組成的服務器集群為例。在集群初始化階段,當有一台服務器Server1啟動時,其單獨無法進行和完成Leader選舉,當第二台服務器Server2啟動時,此時兩台機器可以相互通信,每台機器都試圖找到Leader,於是進入Leader選舉過程。選舉過程如下

(1) 每個Server發出一個投票。由於是初始情況,Server1和Server2都會將自己作為Leader服務器來進行投票,每次投票會包含所推舉的服務器的myid和ZXID,使用(myid, ZXID)來表示,此時Server1的投票為(1, 0),Server2的投票為(2, 0),然后各自將這個投票發給集群中其他機器

(2) 接受來自各個服務器的投票。集群的每個服務器收到投票后,首先判斷該投票的有效性,如檢查是否是本輪投票、是否來自LOOKING狀態的服務器。

(3) 處理投票。針對每一個投票,服務器都需要將別人的投票和自己的投票進行PK,PK規則如下

  • 優先檢查ZXID。ZXID比較大的服務器優先作為Leader。
  • 如果ZXID相同,那么就比較myid。myid較大的服務器作為Leader服務器。

對於Server1而言,它的投票是(1, 0),接收Server2的投票為(2, 0),首先會比較兩者的ZXID,均為0,再比較myid,此時Server2的myid最大,於是更新自己的投票為(2, 0),然后重新投票,對於Server2而言,其無須更新自己的投票,只是再次向集群中所有機器發出上一次
投票信息即可

(4) 統計投票。每次投票后,服務器都會統計投票信息,判斷是否已經有過半機器接受到相同的投票信息,對於Server1、Server2而言,都統計出集群中已經有兩台機器接受了(2, 0)的投票信息,此時便認為已經選出了Leader。

(5) 改變服務器狀態。一旦確定了Leader,每個服務器就會更新自己的狀態,如果是Follower,那么就變更為FOLLOWING,如果是Leader,就變更為LEADING。

5.2.服務器運行時期的Leader選舉

在Zookeeper運行期間,Leader與非Leader服務器各司其職,即便當有非Leader服務器宕機或新加入,此時也不會影響Leader,但是一旦Leader服務器掛了,那么整個集群將暫停對外服務,進入新一輪Leader選舉,其過程和啟動時期的Leader選舉過程基本一致過程相同。

6.Zookeeper安裝

集群規划

服務器IP 主機名 myid的值
192.168.174.100 node01 1
192.168.174.110 node02 2
192.168.174.120 node03 3

服務器IP前面的網段(192.168.174)要使用自己的網段

第一步:下載zookeeeper的壓縮包,下載網址如下

http://archive.apache.org/dist/zookeeper/

我們在這個網址下載我們使用的zk版本為3.4.9

下載完成之后,上傳到我們的linux的/export/soxwares路徑下准備進行安裝

第二步:解壓

解壓zookeeper的壓縮包到/export/servers路徑下去,然后准備進行安裝(路徑可以自己設置,我這里用的是/export/servers)

cd /export/software  --到上傳文件的目錄
tar -zxvf zookeeper-3.4.9.tar.gz -C ../servers/ --解壓到servers目錄

第三步:修改配置文件

第一台機器修改配置文件

cd /export/servers/zookeeper-3.4.9/conf/
cp zoo_sample.cfg zoo.cfg
mkdir -p /export/servers/zookeeper-3.4.9/zkdatas/
vim zoo.cfg

在zoo.cfg文件的末尾加上

dataDir=/export/servers/zookeeper-3.4.9/zkdatas  -- 自己的路徑
# 保留多少個快照
autopurge.snapRetainCount=3
# 日志多少小時清理一次
autopurge.purgeInterval=1
# 集群中服務器地址
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

第四步:添加myid配置

在第一台機器的
/export/servers/zookeeper-3.4.9/zkdatas /這個路徑下創建一個文件,文件名為myid ,文件內容為1

echo 1 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第五步:安裝包分發並修改myid的值

安裝包分發到其他機器

第一台機器上面執行以下兩個命令

scp -r /export/servers/zookeeper-3.4.9/ node02:/export/servers/
scp -r /export/servers/zookeeper-3.4.9/ node03:/export/servers/

第二台機器上修改myid的值為2

echo 2 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第三台機器上修改myid的值為3

echo 3 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第六步:三台機器啟動zookeeper服務

三台機器啟動zookeeper服務

這個命令三台機器都要執行

/export/servers/zookeeper-3.4.9/bin/zkServer.sh start

查看啟動狀態

/export/servers/zookeeper-3.4.9/bin/zkServer.sh status

7.Zookeeper的Shell 客戶端操作

image-20200710171818769

啟動:來到zookeeper文件夾下面執行下面一段話,其中node01是的服務器名,你也可以用IP地址

 bin/zkCli.sh -server node01:2181 

1:創建普通節點

create /app1 hello

2: 創建順序節點

create -s /app3 world

3:創建臨時節點

create -e /tempnode world

4:創建順序的臨時節點

create -s -e /tempnode2 aaa

5:獲取節點數據

get /app1

6:修改節點數據

set /app1 xxx

7:刪除節點

delete /app1 刪除的節點不能有子節點
rmr /app1 遞歸刪除

Znode 的特點

  • 文件系統的核心是 Znode
  • 如果想要選取一個 Znode , 需要使用路徑的形式, 例如 /test1/test11
  • Znode 本身並不是文件, 也不是文件夾, Znode 因為具有一個類似於 Name 的路徑, 所以可以從邏輯上實現一個樹狀文件系統
  • ZK 保證 Znode 訪問的原子性, 不會出現部分 ZK 節點更新成功, 部分 ZK 節點更新失敗的問題
  • Znode 中數據是有大小限制的, 最大只能為 1M
  • Znode 是由三個部分構成
    • stat : 狀態, Znode的權限信息, 版本等
    • data : 數據, 每個Znode都是可以攜帶數據的, 無論是否有子節點
    • children : 子節點列表

Znode 的類型

每個 Znode 有兩大特性, 可以構成四種不同類型的 Znode

  • 持久性

    • 持久 客戶端斷開時, 不會刪除持有的Znode
    • 臨時 客戶端斷開時, 刪除所有持有的Znode, 臨時Znode不允許有子Znode
  • 順序性

    • 有序 創建的Znode有先后順序, 順序就是在后面追加一個序列號, 序列號是由父節點管理的自增
    • 無序 創建的Znode沒有先后順序

Znode 的屬性

  • dataVersion 數據版本, 每次當 Znode 中的數據發生變化的時候, dataVersion都會自增一下
  • cversion 節點版本, 每次當 Znode 的節點發生變化的時候, cversion 都會自增
  • aclVersion ACL(Access Control List) 的版本號, 當 Znode 的權限信息發生變化的時候aclVersion會自增
  • zxid 事務ID
  • ctime 創建時間
  • mtime 最近一次更新的時間
  • ephemeralOwner 如果 Znode 為臨時節點, ephemeralOwner 表示與該節點關聯的 SessionId

通知機制

  • 通知類似於數據庫中的觸發器, 對某個Znode設置 Watcher , 當Znode發生變化的時候,WatchManager 會調用對應的 Watcher
  • 當Znode發生刪除, 修改, 創建, 子節點修改的時候, 對應的 Watcher 會得到通知
  • Watcher 的特點
    • 一次性觸發 一個 Watcher 只會被觸發一次, 如果需要繼續監聽, 則需要再次添加Watcher
    • 事件封裝: Watcher 得到的事件是被封裝過的, 包括三個內容 keeperStae, eventType, path

8.Zookeeper的JavaAPI操作

這里操作Zookeeper的JavaAPI使用的是一套zookeeper客戶端框架 Curator ,解決了很多Zookeeper客戶端非常底層的細節開發工作 。
Curator包含了幾個包:

  • curator-framework:對zookeeper的底層api的一些封裝
  • curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器等

Maven依賴(使用curator的版本:2.12.0,對應Zookeeper的版本為:3.4.x,如果跨版本會有兼容性問題,很有可能導致節點操作失敗):

8.1.創建java工程,導入jar包

<dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

8.2 節點的操作

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;

/**
 * @Description
 * @Author wugongzi
 * @Date 2020/7/10 21:12
 */
public class ZKTest {

    /*
    節點的watch機制
     */

    @Test
    public void watchZnode() throws Exception {
        //1:定制一個重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1);

        //2:獲取客戶端
        String conectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
        CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr, 8000, 8000, retryPolicy);

        //3:啟動客戶端
        client.start();

        //4:創建一個TreeCache對象,指定要監控的節點路徑
        TreeCache treeCache = new TreeCache(client, "/hello3");

        //5:自定義一個監聽器
        treeCache.getListenable().addListener(new TreeCacheListener() {
            //@Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {

                ChildData data = treeCacheEvent.getData();
                if(data != null){
                    switch (treeCacheEvent.getType()){
                        case NODE_ADDED:
                            System.out.println("監控到有新增節點!");
                            break;
                        case NODE_REMOVED:
                            System.out.println("監控到有節點被移除!");
                            break;
                        case NODE_UPDATED:
                            System.out.println("監控到節點被更新!");
                            break;
                        default:
                            break;
                    }
                }
            }
        });

        //開始監聽
        treeCache.start();

        Thread.sleep(1000000);
    }
    /*
     獲取節點數據
     */
    @Test
    public void getZnodeData() throws Exception {
        //1:定制一個重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
        //2:獲取客戶端
        String conectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
        CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr, 8000, 8000, retryPolicy);

        //3:啟動客戶端
        client.start();
        //4:獲取節點數據
        byte[] bytes = client.getData().forPath("/hello");
        System.out.println(new String(bytes));

        //5:關閉客戶端
        client.close();

    }
    /*
      設置節點數據
     */
    @Test
    public void setZnodeData() throws Exception {
        //1:定制一個重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
        //2:獲取客戶端
        String conectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
        CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr, 8000, 8000, retryPolicy);
        //3:啟動客戶端
        client.start();
        //4:修改節點數據
        client.setData().forPath("/hello", "zookeeper".getBytes());
        //5:關閉客戶端
        client.close();

    }
    /*
    創建臨時節點
     */
    @Test
    public void createTmpZnode() throws Exception {
        //1:定制一個重試策略
        /*
            param1: 重試的間隔時間
            param2:重試的最大次數
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
        //2:獲取一個客戶端對象
        /*
           param1:要連接的Zookeeper服務器列表
           param2:會話的超時時間
           param3:鏈接超時時間
           param4:重試策略
         */
        String connectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);

        //3:開啟客戶端
        client.start();
        //4:創建節點
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello4","world".getBytes());

        Thread.sleep(5000);
        //5:關閉客戶端
        client.close();

    }
    /*
        創建永久節點

     */
    @Test
    public void createZnode() throws Exception {
        //1:定制一個重試策略
        /*
            param1: 重試的間隔時間
            param2:重試的最大次數
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
        //2:獲取一個客戶端對象
        /*
           param1:要連接的Zookeeper服務器列表
           param2:會話的超時時間
           param3:鏈接超時時間
           param4:重試策略
         */
        String connectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);

        //3:開啟客戶端
        client.start();
        //4:創建節點
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2","world".getBytes());
        //5:關閉客戶端
        client.close();
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM