Zookeeper入門看這篇就夠了


簡介

Zookeeper 是一個分布式應用程序的分布式開源協調服務。是Apache Hadoop 的一個子項目,主要用來解決分布式應用中經常遇到的一些數據管理問題,例如:統一命名服務、狀態同步服務、集群管理、分布式應用配置項的管理等。

Zookeeper 工作原理

ZooKeeper 核心是原子廣播,該機制保證了各個Server之間的同步,實現這個機制的協議叫做 Zab協議 ,Zab協議有兩個模式,他們分別是 “恢復模式 & 廣播模式”。

恢復模式

Zab協議會讓ZK集群進入崩潰恢復模式的情況如下:
(1)當服務框架在啟動過程中
(2)當Leader服務器出現網絡中斷,崩潰退出與重啟等異常情況。
(3)當集群中已經不存在過半的服務器與Leader服務器保持正常通信。

在所有的follower服務器中選舉一台為Leader,當leader被選舉出來,集群中有多數服務與新的Leader完成狀態同步之后就會退出恢復模式,用來保證至少有一半的follower能和Leader保持數據一致,當多數的follower集群與leader數據保持一致的時候,就會進入消息廣播模式。

狀態同步保證了 Leader 和 Server具有相同的系統狀態,所謂的狀態同步其實就是數據的同步。

一旦leader 已經和多數的follower進行狀態同步之后,它就開始廣播消息,並且進入廣播模式,這時候當一個server加入Zookeeper 服務中,它會在恢復模式下啟動,發現leader,並和leader進行狀態同步,同步結束后,它也參與消息廣播,Zookeeper服務一直維持在 Broadcast狀態,直到leader崩潰了或者leader失去了大部分的followers支持。

廣播模式

消息廣播模式,Zab協議消息廣播過程使用的是原子廣播協議,類似於一個二階段提交,但是又有點不一樣,並不是所有的follower節點都需要返回ack才進行一致性事務完成,只需要多數以上即可。

針對每個客戶端的事務請求,leader服務器會為其生成對應的事務Proposal,並將其發送給集群中其余所有的機器,然后再分別收集各自的選票,最后進行事務提交。

  • leader 接收到消息請求后,將消息賦予一個全局唯一的 64 位自增的 Id,我們通常稱之為zxid,通過 zxid 的大小比較即可實現有序的特性。
  • leader 通過 隊列 保證發送的順序性,將帶有zxid的消息作為一個提案(proposal)分發給所有follower
  • 當follower 接收到proposal,先將proposal寫到本地事務日志,寫事務成功后再向Leader 回一個ACK確認
  • 當leader 接收到多數的ack確認后,leader 會向所有follower 發送 commit 命令,同意會在本地執行該消息。
  • 當follower 收到消息 commit 命令后,就會執行該消息。

消息廣播模式流程示意圖如下:

消息廣播模式流程示意圖

首先客戶端會輪詢Zookeeper集群中的各個節點,當輪詢到一台是follower,如果是讀的請求,follower會返回請求結果,如果是增刪改操作,follower 會向leader生成事務請求,針對客戶端的事務請求,針對客戶端的事務請求,leader會為這個生成對應的事務Proposal,然后發送集群中所有follower服務器,然后分別在收集各自的選票,最后進行事務提交。

Zab協議的二階段提交,在提交過程中移除了中斷提交過程的操作,對於Zookeeper集群來說,超過半數反饋Ack確認就代表事務成功,這種方式無法完成所有節點事務一致性問題,所以Zab協議采用恢復模式來解決數據不一致的問題。

消息廣播協議是基於具有FIFO特性的TCP協議進行通訊,因此可以保證消息廣播過程中的接收和發送的順序性。

事務ID

為了保證事務的順序一致性,Zookeeper 采用了遞增的事務ID號(zxid)來標識事務,所有的操作(proposal)都會在被提出時加上zxid,zxid是一個64位的數字,他高32位是epoch用來標識leader關系是否發生變化,每當有新的leader 被選舉出來,都會有一個新的epoch,標識當前屬於哪個leader的領導。

對於Zookeeper 來說,每次的變化都會產生一個唯一的事務id,zxid(ZooKeeper Transaction Id)通過zxid ,可以確定更新操作的先后順序,如果說 zxid1 小於 zxid2,說明 zxid1比zxid先發生。

Zookeeper 模型

Zookeeper 是一個目錄樹結構,名稱是由斜杠 (/) 分隔的一系列路徑元素。ZooKeeper 命名空間中的每個節點都由路徑標識。

ZooKeeper 層級樹狀結構

ZooKeeper 層級樹狀結構

根節點 / 包含兩個節點(/modele1 & /module2),其中節點 /module1 包含三個子節點(/module1/app1 & /module1/app2 & /module1/app3),在Zookeeper 中,節點以絕對路徑表示,不存在相對路徑,出了根節點以外,其他節點不能以 / 結尾。

特性

資源共享: 例如存儲空間,計算能力,數據,和服務等等

擴展性: 從軟件和硬件上增加系統的規模

並發性: 多個用戶同時訪問

性能: 確保當負載增加的時候,系統想要時間不會有影響

容錯性: 盡管一些組件暫時不可用了,整個系統仍然是可用的

API抽象: 系統的獨立組件對用戶隱藏,僅僅暴露服務

Zookeeper的角色

zookeeper選舉

  • 領導者(leader) :負責進行投票的發起和決議,更新系統狀態  
  • 學習者(learner) :包括跟隨者(follower)和觀察者(observer),follower用於接受客戶端請求並想客戶端返回結果,在選主過程中參與投票  
  • Observer :可以接受客戶端連接,將寫請求轉發給leader,但observer不參加投票過程,只同步leader的狀態,observer的目的是為了擴展系統,提高讀取速度  
  • 客戶端(client) :請求發起方

保證

順序一致性: 客戶端的更新將按發送順序應用。

原子性: 更新成功或失敗,沒有部分結果。

統一視圖: 無論服務器連接到哪個服務器,客戶端都將看到相同的服務視圖。即,即使客戶端故障轉移到具有相同會話的不同服務器,客戶端也永遠不會看到系統的舊視圖。

可靠性: 一旦應用更新了,它將從那時起一直存在,直到客戶端覆蓋更新。

及時性: 系統的客戶視圖保證在特定時間范圍內是最新的。

Znode 節點

Znode有兩種類型: 持久節點和臨時節點 ,Znode的類型在創建的之后就不能在進行修改了。

臨時節點

臨時節點在客戶端會話結束的時候,Zookeeper 會將臨時節點(znode)刪除,並且臨時節點不能有子節點。利用臨時節點的特性,我們可以使用臨時節點來進行集群管理以及發現服務的上下線等。

創建臨時節點命令:create -e /module1/app1 app1
創建一個臨時節點為 “/module1/app1” ,數據為 “app1”

持久節點

持久節點不依賴於客戶端會話,只有當客戶端明確要刪除持久節點(znode)的時候才會被刪除

創建臨時節點命令:create /module1 module1
創建一個臨時節點為 “/module1” ,數據為 “module1”

順序節點

ZooKeeper 中還提供了一種順序節點的類型,每次創建順序節點時候,ZooKeeper 都會在路徑后面自動添加10為的數據中,例如 0000000001 計數器會保證在同一父節點下唯一,創建節點的時候會添加順序,常見分布式鎖。

順序節點只是節點的一種特性,也就說不管是 持久節點還是 臨時節點 都可以設置為順序節點,所以Znode類型可以理解為 4種類型:

  1. 持久節點
  2. 臨時節點
  3. 持久順序節點
  4. 臨時順序節點

創建順序節點命令(加上 “-s”參數):create -s /module1/app app

我們會看到 Created /module1/app0000000001

意思是我們創建了一個持久順序節點“/module1/app0000000001” 如果再執行上面命令 會生成節點 “/module1/app0000000002”,同理 如果我們 create -s后面添加 -e 參數,就表示我們創建了一個臨時節點。

節點數據

  • 創建節點的時候,我們可以指定節點中存儲的數據,ZooKeeper可以保證讀寫都是原子操作,而且每次讀寫操作都是對數據的完整讀取或者完成寫入,不提供對數據的部分讀取或者寫入操作。
  • ZooKeeper 雖然提供了節點存儲數據的功能,但是我們並不能把它當成一個數據庫,重點不要把Zookeeper 當成數據庫用,因為Zookeeper 規定了節點的數據大小不能超過1M,所以我們不能在節點上存儲過多的數據,盡可能保證小的數據量,因為數據過大,會導致ZK的性能下降。
  • 如果確實需要存儲大量的數據,一般可以在分布式數據庫或者Redis保存這部分數據,然后在Znode中保留數據庫中的索引。

Zookeeper單機模式安裝

java 環境

配置JAVA環境,檢驗環境 java -version

下載安裝Zookeeper

下載地址:https://zookeeper.apache.org/releases.html

下載1
下載2

下載解壓Zookeeper

 cd /usr/local/
 get https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
 tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
cd zookeeper-3.7.0/

重命名配置文件 zoo_sample.cfg

cp conf/zoo_sample.cfg conf/zoo.cfg

啟動ZK

./bin/zkServer.sh start

連接ZK客戶端

./bin/zkCli.sh 

當我們看到下圖的信息的時候,表示我們啟動成功

在這里插入圖片描述

Zookeeper命令

基本命令

  • create : 在樹中的某個位置創建一個節點
  • delete : 刪除一個節點存在:測試節點是否存在於某個位置
  • get data : 從節點讀取數據
  • set data: 將數據寫入節點
  • get children : 檢索節點的子節點列表
  • sync : 等待數據被傳播

操作Zookeeper

  1. 查看Zookeeper中包含的key
ls /
  1. 創建一個新的Znode
    創建成功以后我們可以使用 ls /查看我們創建的內容
create /zkMxn muxiaonong

 ls /
[zkMxn, zookeeper]
  1. get命令獲取創建Znode的內容
get /zkMxn
  1. set 命令來對 zk 所關聯的字符串進行設置
set /zkMxn mxn666
  1. 刪除Znode
 delete /zkMxn

Java Api操作 ZK

1. 導入Jar包

<dependency>    
        <groupId>org.apache.zookeeper</groupId>   
        <artifactId>zookeeper</artifactId>
        <version>3.6.3</version>
</dependency>
<!--junit單元測試--> 
<dependency> 
        <groupId>junit</groupId> 
        <artifactId>junit</artifactId> 
        <version>4.13.2</version> 
        <scope>runtime</scope> 
 </dependency>

2. API操作Zookeeper

創建Zookeeper對象

	public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
        this(connectString, sessionTimeout, watcher, false);
    }
  • connectString: 連接的地址,包括主機名和端口號,多個的話用逗號隔開
  • sessionTimeout: 等待客戶端通信的最長時間,客戶端如果超過這個時間沒有和服務端進行通信,那么就認為該客戶端已經終止,一般設置值為 5-10秒,單位為毫秒
  • watcher: 監聽器,用於接收會話事件的接口,需要自己定義,實現process()方法

連接Zookeeper

Zookeeper zkClient = "";
String connectStr = "192.168.2.1:2181";
zkClient = new ZooKeeper(connectStr, 5000, new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) { }
});

創建節點

public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException {}
  • path: 節點路徑
  • data: 節點數據
  • acl: 節點權限,例如:ZooDefs.Ids.OPEN_ACL_UNSAFE

OPEN_ACL_UNSAFE:完全開發,采用world驗證模式,由於每個ZK連接都有world驗證模式,所以當我們節點設置了該參數時,對所有連接開放
CREATOR_ALL_ACL: 創建該Znode連接的擁有所有權限,這里采用的是auth驗證模式,用sessionID做驗證,如果設置了該參數,只有創建改Znode節點的連接才能對這個Znode進行任何操作
READ_ACL_UNSAFE:所有的客戶端都可讀,這里采用world驗證模式,和第一條同理,所有連接都可以讀取該znode

  • createMode: 節點類型,例如:CreateMode.PERSISTENT

PERSISTENT:持久節點
PERSISTENT_SEQUENTIAL:持久有序節點
EPHEMERAL:短暫節點
EPHEMERAL_SEQUENTIAL:短暫有序節點

完整APIDemo:


import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;

/** @Author mxn
  * @Description //TODO ZooKeeper Java API測試
  * @Date 10:22 2021/9/29
  * @Param
  * @return
 **/
@Slf4j
public class ZookeeperTest {

    // IP 和端口
    private final static String ipAddress = "192.168.2.123:2181";


    public static void main(String[] args) {
        ZookeeperTest test = new ZookeeperTest();

        String key = "/zkMxn";
        String value = "wo is muxiaonong";
        //創建Znode
        test.add(key,value);
//        獲取節點數據
//        test.get(key);
        //修改節點數據
//        test.modify(key,"wo is zhuzhuxia");
        //刪除節點
//        test.delete(key);


    }

    /**
     * @return
     * @Author mxn
     * @Description //TODO 獲取ZooKeeper連接
     * @Date 10:22 2021/9/29
     * @Param
     **/
    public static ZooKeeper getConntection() {
        ZooKeeper zooKeeper = null;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            //watch機制(回調),監聽是否連接成功
            zooKeeper = new ZooKeeper(ipAddress, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                        //如果受收到了服務端的響應事件,連接成功
                        countDownLatch.countDown();
                    }
                }
            });

            countDownLatch.await();
            log.info("zookeeper狀態:{}",zooKeeper.getState());//CONNECTED

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return zooKeeper;
    }

   /** @Author lyy
     * @Description //TODO 關閉ZooKeeper連接
     * @Date 14:57 2021/9/29
     * @Param 
     * @return 
    **/
    public static void closeConnection(ZooKeeper zooKeeper) {
        try {
//            zooKeeper.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
    /** @Author lyy
      * @Description //TODO 添加節點
      * @Date 13:36 2021/9/29
      * @Param 
      * @return 
     **/
    public void add(String key ,String value) {
        ZooKeeper zooKeeper = ZookeeperTest.getConntection();
        try {
            //參數類型
            //1.key
            //2.value
            //3.對應的ACL,當前節點的權限控制
            //4.設置當前節點類型
            zooKeeper.create(key, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            ZookeeperTest.closeConnection(zooKeeper);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

   /** @Author lyy
     * @Description //TODO 獲取節點信息
     * @Date 14:57 2021/9/29
     * @Param 
     * @return 
    **/
    public void get(String key) {
        ZooKeeper zooKeeper = ZookeeperTest.getConntection();
        Stat stat = new Stat();
        String data = null;
        try {
            byte[] bytes = zooKeeper.getData(key, null, stat);
            data = new String(bytes, "gbk");
            log.info("當前節點信息:{}",data);
            ZookeeperTest.closeConnection(zooKeeper);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /** @Author lyy
      * @Description //TODO 修改節點信息
      * @Date 14:57 2021/9/29
      * @Param 
      * @return 
     **/
    public void modify(String key,String newValue) {
        ZooKeeper zooKeeper = ZookeeperTest.getConntection();
        Stat stat = new Stat();
        //version樂觀鎖概念,此處需要獲取version信息,則需要先get拿到節點信息
        try {
            //獲取節點(修改需要version信息)
            zooKeeper.getData(key, null, stat);
            //再修改
            zooKeeper.setData(key, newValue.getBytes(), stat.getVersion());
            ZookeeperTest.closeConnection(zooKeeper);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

   /** @Author lyy
     * @Description //TODO 刪除節點
     * @Date 14:57 2021/9/29
     * @Param 
     * @return 
    **/
    public void delete(String key) {
        ZooKeeper zooKeeper = ZookeeperTest.getConntection();
        Stat stat = new Stat();
        try {
            //獲取節點(刪除需要version信息)
            zooKeeper.getData(key, null, stat);
            //刪除節點
            zooKeeper.delete(key, stat.getVersion());
            ZookeeperTest.closeConnection(zooKeeper);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

總結

到這里我們對Zookeeper 大概有個入門級的了解了,不過Zookeeper遠遠比我們這里講述的功能多,如何用Zookeeper實現集群管理、分布式鎖,隊列等等,小農會在后面的文章中進行講解,關注我,后續精彩內容第一時間推送。

我是牧小農,怕什么真理無窮,進一步有進一步的歡喜,大家加油!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM