Zookeeper—學習筆記(一)


1、Zookeeper基本功能

(增 刪 改 查;注冊,監聽)

兩點:

  1、放數據(少量)。

  2、監聽節點。

 注意:

  Zookeeper中的數據不同於數據庫中的數據,沒有表,沒有記錄,沒有字段;

  Zookeeper中的數據是key-value對,key可以有子key

  value為二進制數據。

2、應用場景

2.1、服務器上下線動態感知

 

2.2、配置文件管理

 

 

 3、Zookeeper本身就是一個HA集群

Zookeeper自身就是一個十分可靠的分布式系統。

這個分布式系統只有一個程序,進程:QuorumpeerMain,只不過這個進程在工作的時候有多種不同狀態

3.1、zookeeper集群結構示意圖

3.2、白話談選舉過程-zookeeper的整體運行機制

  leader和follower通信端口,集群內部工作端口2888

   選舉端口3888

  假如集群中共的節點按照 myid 1,2,3,4,5 的順序一次啟動;id為1的節點最先啟動,它啟動之后想集群中的2888端口發出消息,此時沒有leader在2888端口回應1號節點,1號節點就知道了,此時集群中是沒有leader的,然后1號節點發起選舉,不停的往3888端口發選舉消息,並且告訴大家投他自己,也就是1號;此時id為2的節點啟動,同樣的第一件事也是向2888廣播消息,沒有人回應,知道沒有leader,此時的1號節點不斷的在3888宣傳自己,2號節點收到3888端口的消息,發現集群中有個兄弟節點發起了,投票且投1號,2號節點查看自己的id后發現,我的id是2比1大啊,我投我自己,然后2號節點向3888端口不停的廣播投2號;此時1號節點在自己的3888端口收到消息發現,有人投2號且id比我的大,那我也都2號;此時1號節點和2號節點都不停的往3888發起選舉且投2號;就在這時候3號點啟動,同樣第一件事給集群2888廣播消息,沒有人回應,但是會收到3888端口的選舉信息,經查看發現id都比我小(1<3 2<3),我要投自己3;往3888廣播選舉3號的消息;然后1號節點和2號節點收到消息后,發現又來了一個大的節點,那我們都投3吧,此時超過了半數( 3 > 5 / 2)節點,3號節點將自己的轉台切換為leader,成功上位;此后4號節點和5號節點已啟動就發現已經有leader了,自動變為follower;

  有沒有發現;節點發起選舉時只會選舉自己(自私),當發現有id大於自身的節點也參與選舉時,他會無私的支持最大者(無私)。

  如果是運行的過程中leader掛了,在重新投票的過程中,投票信息會帶有節點的數據版本信息,只有最新數據的且id較大者,會被選為新的leader。  

 3.3、安裝Zookeeper集群

上傳安裝包,解壓

修改conf/zoo.cfg

# The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

# do not use /tmp for storage, /tmp here is just

# example sakes.

dataDir=/root/zkdata

# the port at which the clients will connect

clientPort=2181

# Set to "0" to disable auto purge feature

#autopurge.purgeInterval=1

server.1=hdp-01:2888:3888

server.2=hdp-02:2888:3888

server.3=hdp-03:2888:3888

 

配置文件修改完后,將安裝包拷貝給hdp-02 和 hdp-03

 

接着,到hdp-01上,新建數據目錄/root/zkdata,並在目錄中生成一個文件myid,內容為1

接着,到hdp-02上,新建數據目錄/root/zkdata,並在目錄中生成一個文件myid,內容為2

接着,到hdp-03上,新建數據目錄/root/zkdata,並在目錄中生成一個文件myid,內容為3

 

 

啟動zookeeper集群:

3.3.1、自動腳本

#!bin/bash
for host in hdp-01 hdp-02 hdp-03
do
echo "${host}:${1}ing"
ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done

sleep 2

for host in hdp-01 hdp-02 hdp-03
do
ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"
done

 

chmod +x zkmanager.sh
ssh $host會進入這台機器的主目錄(/root)下下執行文件中的命令啊,在啟動過程中會在該目錄下生成Zookeeper的日志。zookeeper.out,,可以查看日志信息。

使用

./zkmanager start
./zkmanager stop

 

4、zkCli客戶端

默認鏈接localhost

bin/zkCli.sh 

連接指定主機

bin/zkCli.sh -server hpd-01 -p 2181

 

4.1、管理數據

4.1.1、ls-查看節點

ls /
ls
/zookeeper

 

4.1.2、get-查看數據

get /zookeeper

 

4.1.3、create-創建節點

命令行只能存放字符串數據,沒有辦法存放二進制數據

"hellozk"之后的數據全部是該節點的元數據信息,目的是為了維護數據版本的一致性。

create /aa "hellozk"

4.1.4、set-修改節點的值

數據版本會增加1

set /aa hellospark

 

4.1.5、rmr-遞歸刪除數據節點

rmr /aa

4.2、監聽節點

注意:注冊的監聽器在正常收到一次所監聽的事件后,就失效。

4.2.1、get注冊監聽

get注冊監聽,只有當監聽節點的數據發生變化,才會出發,監聽節點添加新節點不會出發注冊的監聽器;

且一次監聽注冊只起一次作用,觸發之后,失效。除非再次注冊。

監聽/aa 節點的數值變化,一旦/aa節點的值發生了變化(假如值被hdp-03上的zkCli修改了),向zk注冊watch事件的客戶端(假如是hdp-01上的zkCli)就會收到zk的通知,則hdp-01上的zkCli就會收到zk發回的通知,當然zkCli收到通知后只是輸出到了控制台。

get /aa watch

 

 

會看到,狀態(表示和服務器鏈接狀態良好),以及事件的類型type(數據節點的值發生了變化),以及哪一個節點

4.2.2、ls-注冊子節點變化事件

ls是獲取子節點,監聽子節點事件用 ls /aa watch

hdp-01的zkCli注冊ls 監聽

ls /aa watch

 hdp-02的zkCli對aa節點添加字節點,hdp-01會收到zk的事件通知。

create /aa/xx 132

 

5、zookeeper數據存儲機制

5.1、數據存儲形式

zookeeper中對用戶的數據采用kv形式存儲

只是zk有點特別:

key:是以路徑的形式表示的,那就以為着,各key之間有父子關系,比如

/ 是頂層key

用戶建的key只能在/ 下作為子節點,比如建一個key: /aa  這個key可以帶value數據

 也可以建一個key:   /bb

也可以建key: /aa/xx

zookeeper中,對每一個數據key,稱作一個znode

 

綜上所述,zk中的數據存儲形式如下:

5.2、znode類型

zookeeper中的znode有多種類型:

1、PERSISTENT  持久的:創建者就算跟集群斷開聯系,該類節點也會持久存在與zk集群中

2、EPHEMERAL  短暫的:創建者一旦跟集群斷開聯系,zk就會將這個節點刪除

3、SEQUENTIAL  帶序號的:這類節點,zk會自動拼接上一個序號,而且序號是遞增的 

 

組合類型:

PERSISTENT  :持久不帶序號

EPHEMERAL  :短暫不帶序號

PERSISTENT  且 SEQUENTIAL   :持久且帶序號

EPHEMERAL  且 SEQUENTIAL  :短暫且帶序號

 

6、java客戶端

6.1、創建zk客戶端

timeOut指定會話超時時間,即客戶端關閉連接后,會話還可以保持多久;
watcher是一個接口,即為當客戶端收到zk事件通知時候要進行什么邏輯操作。
//構造一個了解Zookeeper的客戶端對象
ZooKeeper zk = new
ZooKeeper(“hdp-01:2181,hdp-02:2181,hdp-03:2181”,timeOut,watcher)

6.2、增刪改查

public class ZookeeperClientDemo {
    ZooKeeper zk = null;
    @Before
    public void init()  throws Exception{
        // 構造一個連接zookeeper的客戶端對象
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
    }
    
    
    @Test
    public void testCreate() throws Exception{

        // 參數1:要創建的節點路徑  參數2:數據  參數3:訪問權限  參數4:節點類型
        String create = zk.create("/eclipse", "hello eclipse".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        
        System.out.println(create);
        
        zk.close();
        
    }
    
    
    @Test
    public void testUpdate() throws Exception {
        
        // 參數1:節點路徑   參數2:數據    參數3:所要修改的版本,-1代表任何版本
        zk.setData("/eclipse", "我愛你".getBytes("UTF-8"), -1);
        
        zk.close();
        
    }
    
    
    @Test    
    public void testGet() throws Exception {
        // 參數1:節點路徑    參數2:是否要監聽    參數3:所要獲取的數據的版本,null表示最新版本
        byte[] data = zk.getData("/eclipse", false, null);
        System.out.println(new String(data,"UTF-8"));
        
        zk.close();
    }
    
    
    
    @Test    
    public void testListChildren() throws Exception {
        // 參數1:節點路徑    參數2:是否要監聽   
        // 注意:返回的結果中只有子節點名字,不帶全路徑
        List<String> children = zk.getChildren("/cc", false);
        
        for (String child : children) {
            System.out.println(child);
        }
        
        zk.close();
    }
    
    
    @Test
    public void testRm() throws InterruptedException, KeeperException{
        
        zk.delete("/eclipse", -1);
        
        zk.close();
    }
    
    
    

}
View Code

 

6.2.1、創建節點create

byte[] 數據 要求不能大於1M

ACL是訪問權限Access Control List:什么樣的人可以訪問這個數據節點,一般是內部一套系統使用,這里就選擇開放權限

String path = zk.create(String path,byte[] data,List<ACL> acl,CreateMode createMode);

 

節點類型

6.2.2、修改數據setData

version指明要修改哪個版本的數據,如果用戶不關心,哪個版本,可以設置成-1,表示修改全部版本。

返回該節點數據的元數據Stat

 

6.2.3、查找數據getData

watch:表示要不要監聽

Stat:用元數據來表示要回去哪個版本,最新版本用null表示

6.2.4、查找子節點getChildren

返回所有子節點的名字,不帶全路徑,只有子節點名字

 

6.2.5、刪除節點delete

-1表示刪除所有版本

6.3、注冊監聽

注意:注冊的監聽器在正常收到一次所監聽的事件后,就失效。

zk會另外起一個線程,去等待zk發來的通知,並作出我們設置的watcher邏輯(若沒有設置watcher邏輯,而是是指了boolean true,則直接輸出收到的通知)

 

Watcher是一個接口:客戶端收到監聽事件后的回調邏輯

 

 6.3.1、get監聽節點數據變化

      @Before
    public void init() throws Exception {
        // 構造一個連接zookeeper的客戶端對象
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
    }

      @Test
    public void testGetWatch() throws Exception {

        byte[] data = zk.getData("/mygirls", true, null); // 監聽節點數據變化
        
        List<String> children = zk.getChildren("/mygirls", true); //監聽節點的子節點變化事件

        System.out.println(new String(data, "UTF-8"));

        Thread.sleep(Long.MAX_VALUE);

    }

 

 注意:注冊的監聽器在正常收到一次所監聽的事件后,就失效。可以在new Zookeeper(創建客戶端)的時候,指定默認的回調邏輯同事在回調邏輯中繼續注冊監聽,以后直接在getData時寫true就可以了,這樣就可以一直監聽。

注意:new Zookeeper的時候,會出發連接成功事件(發生路徑null,type類型none

可以用轉太判斷來避開初始事件

 

構造Zookeeper的時候,添加回調邏輯,后續getData是監聽節點數據是注冊監聽就只要設置true,那么就會調用對象構造時候的回調邏輯(默認邏輯)。

public class ZookeeperWatchDemo {

    ZooKeeper zk = null;

    @Before
    public void init() throws Exception {
        // 構造一個連接zookeeper的客戶端對象
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {

            @Override
            public void process(WatchedEvent event) {

                if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) {
                    System.out.println(event.getPath()); // 收到的事件所發生的節點路徑
                    System.out.println(event.getType()); // 收到的事件的類型
                    System.out.println("趕緊換照片,換浴室里面的洗浴套裝....."); // 收到事件后,我們的處理邏輯

                    try {
                        zk.getData("/mygirls", true, null);

                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }else if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged){
                    
                    System.out.println("子節點變化了......");
                }

            }
        });
    }

    @Test
    public void testGetWatch() throws Exception {

        byte[] data = zk.getData("/mygirls", true, null); // 監聽節點數據變化
        
        List<String> children = zk.getChildren("/mygirls", true); //監聽節點的子節點變化事件

        System.out.println(new String(data, "UTF-8"));

        Thread.sleep(Long.MAX_VALUE);

    }

}
View Code

  6.3.2、監聽節點的子節點變化事件

 

6.4、Zookeeper客戶端工作線程

sendThread

eventThread

Zookeeper中的eventThread是守護線程

 

 守護線程:  A是B的守護線程,若B線程結束,不管A線程有沒有執行完畢,A線程都會退出。(主人已掛,仆人陪葬)

thread.setDaemon(true);

 7、實例服務器上下線,動態感知

這是一個簡單的分布式系統:

業務功能:模擬業務,這里只是簡單的時間查詢;客戶端去請求服務器獲得時間信息,而且服務器動態上下線時候,客戶端會實時感知。

服務器:TimeQueryServer

// 構造zk客戶端連接// 注冊服務器信息// 啟動業務線程開始處理業務

 

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class TimeQueryServer {
    ZooKeeper zk = null;
    
    // 構造zk客戶端連接
    public void connectZK() throws Exception{
        
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);          
    }
    
    // 注冊服務器信息
    public void registerServerInfo(String hostname,String port) throws Exception{
        
        // 先判斷注冊節點的父節點是否存在,如果不存在,則創建
        Stat stat = zk.exists("/servers", false);
        if(stat==null){
            zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        
        // 注冊服務器數據到zk的約定注冊節點下【短暫+序號】
        String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        System.out.println(hostname+" 服務器向zk注冊信息成功,注冊的節點為:" + create);   
    }
    
    public static void main(String[] args) throws Exception {
        
        TimeQueryServer timeQueryServer = new TimeQueryServer();
        
        // 構造zk客戶端連接
        timeQueryServer.connectZK();
        
        // 注冊服務器信息
        timeQueryServer.registerServerInfo(args[0], args[1]);
        
        // 啟動業務線程開始處理業務
        new TimeQueryService(Integer.parseInt(args[1])).start();  
    }
}

 

業務類:TimeQueryService

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;

public class TimeQueryService extends Thread{
    
    int port = 0;
    public TimeQueryService(int port){
        
        this.port = port;
    }        
    @Override
    public void run() {
        
        try {
       // ServerSocket監聽port端口 ServerSocket ss
= new ServerSocket(port); System.out.println("業務線程已綁定端口"+port+"准備接受消費端請求了....."); while(true){
          // 拿到客戶端請求socket Socket sc
= ss.accept(); InputStream inputStream = sc.getInputStream(); OutputStream outputStream = sc.getOutputStream(); outputStream.write(new Date().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }

 

客戶端(消費者:):Consumer

// 構造zk連接對象// 查詢在線服務器列表
// 處理業務(向一台服務器發送時間查詢請求)
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;

public class Consumer {

    // 定義一個list用於存放最新的在線服務器列表
    private volatile ArrayList<String> onlineServers = new ArrayList<>();

    // 構造zk連接對象
    ZooKeeper zk = null;

    // 構造zk客戶端連接;注冊默認監聽后的回調邏輯
    public void connectZK() throws Exception {
      
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
          // 鏈接狀態正常,且為節點的子節點變化事件
if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) { try { // 事件回調邏輯中,再次查詢zk上的在線服務器節點即可,查詢邏輯中又再次注冊了子節點變化事件監聽 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查詢在線服務器列表 public void getOnlineServers() throws Exception {      // 查詢服務器列表,再次開啟監聽 List<String> children = zk.getChildren("/servers", true); ArrayList<String> servers = new ArrayList<>();      // 遍歷節點的子節點 for (String child : children) {
       // 獲取節點數據,此時的業務沒有設置監聽數值變化的需求
byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查詢了一次zk,當前在線的服務器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑選一台當前在線的服務器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次請求挑選的服務器為:" + server);           // 客戶端socket請求 Socket socket = new Socket(hostname, port); OutputStream out = socket.getOutputStream();// 網絡輸出流:向服務端發送請求。 InputStream in = socket.getInputStream();// 網絡輸入流:接受服務端發回的請求。 out.write("haha".getBytes()); out.flush(); byte[] buf = new byte[256]; int read = in.read(buf); System.out.println("服務器響應的時間為:" + new String(buf, 0, read)); out.close(); in.close(); socket.close(); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 構造zk連接對象 consumer.connectZK(); // 查詢在線服務器列表 consumer.getOnlineServers(); // 處理業務(向一台服務器發送時間查詢請求) consumer.sendRequest(); } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM