Zookeeper安裝使用及JavaAPI使用


 

一、Zookeeper單擊模式安裝及使用

1、系統環境

2、導入JDK和Zookeeper包

1)、使用SecureCRT工具打開SFTP連接,直接拖拽,到當前用戶文件夾下,然后使用mv命令(mv 文件 目標文件夾)移動到自定義路徑下

2)、使用tar -zxvf 包名,解壓JDK和Zookeeper

 3、配置java環境變量,vi /etc/profile,編輯完成保存后,使用source /etc/profile使環境變量生效,java -version命令驗證

4、在zookeeper目錄下的conf目錄下,新建zoo.cfg文件並配置如下(zoo.cfg文件是Zookeeper默認配置文件,其配置案例在zoo_samples.cfg中)

 

tickTime:服務器與服務器之間或服務器與客戶端之間心跳間隔

dataDir:Zookeeper保存數據的目錄,默認情況下,Zookeeper將寫數據的日志文件也保存在這;如果目錄不存在啟動時候會自動創建

clientPort:客戶端連接Zookeeper服務器的端口,Zookeeper會監聽這個端口,接收客戶端的訪問請求

5、啟動Zookeeper,在bin文件夾下調用zkServer.sh start命令,start后面如果不指定配置文件路徑,默認使用con/zoo.cfg

查看服務狀態,./zkServer.sh status

6、客戶端連接服務端,bin/zkCli.sh -server 127.0.0.1:2181

7、增刪改查命令使用,進入客戶端后通過help可查看所有命令使用方法

1)、ls 查看znode節點

2)、create 創建znode節點,節點后面必須賦值,否則不會創建成功

3)、get 獲取節點值

4)、set 修改節點值

5)、創建/test子節點,注意獲取/test值時,cversion由0變為1

6)、delete 刪除節點

如果有子節點,父節點不能刪除,需要先刪除子節點再刪除父節點

7)、quit 退出客戶端操作界面

8、停掉Zookeeper服務

二、Zookeeper集群模式搭建(3台,最好奇數台,根據Leader選舉算法Paxos協議,半數原則)

zk1對應IP:192.168.7.128

zk2對應IP:192.168.7.216

zk3對應IP:192.168.7.217

1、每台服務器上搭建基礎環境,參照單擊模式搭建

2、在Zookeeper保存數據的文件路徑下創建myid文件,並寫入server對應值(如果保存數據文件路徑不存在自己創建),各台機器執行如下命令

zk1中命令:echo "1">myid;zk2中命令:echo "2">myid;zk3中命令:echo "3">myid

最終如下

3、配置各台及其conf/zoo.cfg文件如下

initLimit:Zookeeper集群中,連接到Leader的Follower服務初始化連接時最長限制時間多少個心跳間隔,如上配置5,表示5*2000=10秒,10秒內沒有連接上表示Follower連接失敗

syncLimit:Leader和Follower之間發送消息,請求和應答最長心跳間隔數,如上2*2000=4秒

 server.x(x表示上面配置的1,2,3),x就是各台服務器中寫入myid文件的數

IP:2888:3888      IP就是各服務器IP,2888是這個服務器與集群中的Leader進行信息交換的端口,3888是表示萬一集群中的Leader服務器掛掉,需要一個端口來重新進行選舉,選出一個新Leader,也就是這個3888端口是用來執行選舉Leader時候用

4、啟動各台Zookeeper服務 bin/zkServer.sh start ,都啟動成功后,bin/zkServer.sh status查看服務器狀態

起初查看狀態報錯如下,后來想到是防火牆的問題,就執行 systemctl stop firewalld 關閉即可

查看防火牆並關閉

各台服務器 bin/zkServer.sh restart 命令重啟Zookeeper服務,查看狀態

zk1服務器如下,注意Mode:leader,即此服務器為集群Leader(Leader選舉有算法,並非第一個就是Leader)

zk2服務器如下,注意Mode:

 

zk3服務器和zk2服務器一樣,Mode都是follower

5、測試集群,隨便一個服務器上增刪改節點,另一個服務器上也會一致性變化(可自行驗證)

6、如果Leader服務器掛掉,會選出另一台Leader服務器,3台服務器最多可掛掉一台,超過一台掛掉Zookeeper就不可用了

 三、ZooKeeper JavaAPI使用,直接上代碼,看注釋

package com.hjp.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ConnectionWatcher implements Watcher {

    private static final int SESSION_TIMEOUT = 5000;

    protected ZooKeeper zk;
    private CountDownLatch connectedSignal = new CountDownLatch(1);

    public void connect(String hosts) throws IOException, InterruptedException {
        //第一個參數是Zookeeper服務主機地址,可指定端口號,默認為2181;第二個參數以毫秒為單位的會話超時參數;
        // 第三個參數是一個Watcher對象的實例。Watcher對象接收來自於Zookeeper的回調,以獲得各種事件通知,
        // 本例中CreateGroup是一個Watcher對象,因此參數為this
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        //當一個ZooKeeper的實例被創建時,會啟動一個線程連接到Zookeeper服務。
        // 由於對構造函數的調用是立即返回的,因此在使用新建的Zookeeper對象之前一定要等待其與Zookeeper服務之間的連接建立成功。
        // 使用CountDownLatch使當前線程等待,直到Zookeeper對象准備就緒
        connectedSignal.await();
    }

    public void process(WatchedEvent watchedEvent) {
        //客戶端與ZK建立連接后,Watcher的process方法會被調用,參數是表示該連接的事件,
        // 連接成功后調用CountDownLatch的countDown方法,計數器減為0,釋放線程鎖,zk對象可用
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            connectedSignal.countDown();
        }
    }

    public void close() throws InterruptedException {
        zk.close();
    }
}
連接ZK
package com.hjp.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZKOperateAPI extends ConnectionWatcher {

    //創建組
    public void create(String groupName, String data) throws KeeperException, InterruptedException {
        String path = "/" + groupName;
        //創建znode節點,第一個參數為路徑;第二個參數為znode內容,字節數組;
        // 第三個參數訪問控制列表(簡稱ACL,此處使用完全開放的ACL,允許任何客戶端對znode進行讀寫);
        // 第四個為創建znode類型,此處是持久的(兩種類型,短暫的和持久的,短暫類型會在客戶端與zk服務斷開連接后,被zk服務刪掉,而持久的不會)
        String createPath = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("Created " + createPath);
    }

    //加入組
    public void join(String groupName, String memberName, String data) throws KeeperException, InterruptedException {
        String path = "/" + groupName + "/" + memberName;
        //創建短暫znode,會在客戶端斷開連接后刪掉
        String createPath = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("Create " + createPath);
    }

    //列出組成員
    public void list(String groupName) {
        String path = "/" + groupName;
        try {
            //第一個參數為組名,即znode路徑;第二個參數是否設置觀察標識,如果為true,那么一旦znode狀態改變,當前對象的Watcher會被觸發
            List<String> children = zk.getChildren(path, false);
            if (children.isEmpty()) {
                System.out.printf("No members in group %s\n", groupName);
                System.exit(1);
            }
            for (String child : children) {
                System.out.println(child);
            }
        } catch (KeeperException.NoNodeException ex) {
            System.out.printf("Group %s does not exist\n", groupName);
            System.exit(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    //刪除組
    public void delete(String groupName) throws KeeperException, InterruptedException {
        String path = "/" + groupName;
        try {
            List<String> children = zk.getChildren(path, false);
            for (String child : children) {
                //刪除方法第一個參數指定路徑,第二個參數是版本號;這是一種樂觀鎖機制,如果指定的版本號和對應znode版本號一致才可刪除;
                // 如果設置為-1,不校驗可直接刪除
                zk.delete(path + "/" + child, -1);
            }
            zk.delete(path, -1);
        } catch (KeeperException.NoNodeException ex) {
            System.out.printf("Group %s does not exist\n", groupName);
            System.exit(1);
        }
    }

}
增刪改查操作,繼承連接類
import com.hjp.zookeeper.ZKOperateAPI;
import org.junit.Test;

public class TestZK {

    @Test
    public void create() throws Exception {
        ZKOperateAPI operateAPI = new ZKOperateAPI();
        //端口號不寫,默認是2181
//        operateAPI.connect("192.168.7.128:2181");
        operateAPI.connect("192.168.7.128");
        operateAPI.create("testAPI", "aaa");
        operateAPI.close();
    }

    @Test
    public void join() throws Exception {
        ZKOperateAPI operateAPI = new ZKOperateAPI();
        operateAPI.connect("192.168.7.128");
        ;
        operateAPI.join("testAPI", "testAPIChild", "aaaChild");
        //模擬正在某種操作,休眠20秒后,斷開zk服務連接,可查看zk服務中短暫znode被刪除
        Thread.sleep(20000);
        operateAPI.close();
    }

    @Test
    public void list() throws Exception {
        ZKOperateAPI operateAPI = new ZKOperateAPI();
        operateAPI.connect("192.168.7.216");
        operateAPI.list("testAPI");
        operateAPI.close();
    }

    @Test
    public void delete() throws Exception{
        ZKOperateAPI operateAPI = new ZKOperateAPI();
        operateAPI.connect("192.168.7.217");
        operateAPI.delete("testAPI");
        operateAPI.close();
    }

}
測試執行代碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM