1、Zookeeper基本功能
(增 刪 改 查;注冊,監聽)
兩點:
1、放數據(少量)。
2、監聽節點。
注意:
Zookeeper中的數據不同於數據庫中的數據,沒有表,沒有記錄,沒有字段;
Zookeeper中的數據是key-value對,key可以有子key
value為二進制數據。
2、應用場景
2.1、服務器上下線動態感知
2.2、配置文件管理
3、Zookeeper本身就是一個HA集群
Zookeeper自身就是一個十分可靠的分布式系統。
這個分布式系統只有一個程序,進程:QuorumpeerMain,只不過這個進程在工作的時候有多種不同狀態;
3.1、zookeeper集群結構示意圖
3.2、白話談選舉過程-zookeeper的整體運行機制
leader和follower通信端口,集群內部工作端口2888
選舉端口3888
假如集群中共的節點按照 myid 1,2,3,4,5 的順序一次啟動;id為1的節點最先啟動,它啟動之后想集群中的2888端口發出消息,此時沒有leader在2888端口回應1號節點,1號節點就知道了,此時集群中是沒有leader的,然后1號節點發起選舉,不停的往3888端口發選舉消息,並且告訴大家投他自己,也就是1號;此時id為2的節點啟動,同樣的第一件事也是向2888廣播消息,沒有人回應,知道沒有leader,此時的1號節點不斷的在3888宣傳自己,2號節點收到3888端口的消息,發現集群中有個兄弟節點發起了,投票且投1號,2號節點查看自己的id后發現,我的id是2比1大啊,我投我自己,然后2號節點向3888端口不停的廣播投2號;此時1號節點在自己的3888端口收到消息發現,有人投2號且id比我的大,那我也都2號;此時1號節點和2號節點都不停的往3888發起選舉且投2號;就在這時候3號點啟動,同樣第一件事給集群2888廣播消息,沒有人回應,但是會收到3888端口的選舉信息,經查看發現id都比我小(1<3 2<3),我要投自己3;往3888廣播選舉3號的消息;然后1號節點和2號節點收到消息后,發現又來了一個大的節點,那我們都投3吧,此時超過了半數( 3 > 5 / 2)節點,3號節點將自己的轉台切換為leader,成功上位;此后4號節點和5號節點已啟動就發現已經有leader了,自動變為follower;
有沒有發現;節點發起選舉時只會選舉自己(自私),當發現有id大於自身的節點也參與選舉時,他會無私的支持最大者(無私)。
如果是運行的過程中leader掛了,在重新投票的過程中,投票信息會帶有節點的數據版本信息,只有最新數據的且id較大者,會被選為新的leader。
3.3、安裝Zookeeper集群
上傳安裝包,解壓
修改conf/zoo.cfg
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/root/zkdata # the port at which the clients will connect clientPort=2181 # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.1=hdp-01:2888:3888 server.2=hdp-02:2888:3888 server.3=hdp-03:2888:3888 |
配置文件修改完后,將安裝包拷貝給hdp-02 和 hdp-03
接着,到hdp-01上,新建數據目錄/root/zkdata,並在目錄中生成一個文件myid,內容為1
接着,到hdp-02上,新建數據目錄/root/zkdata,並在目錄中生成一個文件myid,內容為2
接着,到hdp-03上,新建數據目錄/root/zkdata,並在目錄中生成一個文件myid,內容為3
啟動zookeeper集群:
3.3.1、自動腳本
#!bin/bash for host in hdp-01 hdp-02 hdp-03 do echo "${host}:${1}ing" ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1" done sleep 2 for host in hdp-01 hdp-02 hdp-03 do ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh status" done
chmod +x zkmanager.sh
ssh $host會進入這台機器的主目錄(/root)下下執行文件中的命令啊,在啟動過程中會在該目錄下生成Zookeeper的日志。zookeeper.out,,可以查看日志信息。
使用
./zkmanager start
./zkmanager stop
4、zkCli客戶端
默認鏈接localhost
bin/zkCli.sh
連接指定主機
bin/zkCli.sh -server hpd-01 -p 2181
4.1、管理數據
4.1.1、ls-查看節點
ls /
ls /zookeeper
4.1.2、get-查看數據
get /zookeeper
4.1.3、create-創建節點
命令行只能存放字符串數據,沒有辦法存放二進制數據
"hellozk"之后的數據全部是該節點的元數據信息,目的是為了維護數據版本的一致性。
create /aa "hellozk"
4.1.4、set-修改節點的值
數據版本會增加1
set /aa hellospark
4.1.5、rmr-遞歸刪除數據節點
rmr /aa
4.2、監聽節點
注意:注冊的監聽器在正常收到一次所監聽的事件后,就失效。
4.2.1、get注冊監聽
get注冊監聽,只有當監聽節點的數據發生變化,才會出發,監聽節點添加新節點不會出發注冊的監聽器;
且一次監聽注冊只起一次作用,觸發之后,失效。除非再次注冊。
監聽/aa 節點的數值變化,一旦/aa節點的值發生了變化(假如值被hdp-03上的zkCli修改了),向zk注冊watch事件的客戶端(假如是hdp-01上的zkCli)就會收到zk的通知,則hdp-01上的zkCli就會收到zk發回的通知,當然zkCli收到通知后只是輸出到了控制台。
get /aa watch
會看到,狀態(表示和服務器鏈接狀態良好),以及事件的類型type(數據節點的值發生了變化),以及哪一個節點
4.2.2、ls-注冊子節點變化事件
ls是獲取子節點,監聽子節點事件用 ls /aa watch
hdp-01的zkCli注冊ls 監聽
ls /aa watch
hdp-02的zkCli對aa節點添加字節點,hdp-01會收到zk的事件通知。
create /aa/xx 132
5、zookeeper數據存儲機制
5.1、數據存儲形式
zookeeper中對用戶的數據采用kv形式存儲
只是zk有點特別:
key:是以路徑的形式表示的,那就以為着,各key之間有父子關系,比如
/ 是頂層key
用戶建的key只能在/ 下作為子節點,比如建一個key: /aa 這個key可以帶value數據
也可以建一個key: /bb
也可以建key: /aa/xx
zookeeper中,對每一個數據key,稱作一個znode
綜上所述,zk中的數據存儲形式如下:
5.2、znode類型
zookeeper中的znode有多種類型:
1、PERSISTENT 持久的:創建者就算跟集群斷開聯系,該類節點也會持久存在與zk集群中
2、EPHEMERAL 短暫的:創建者一旦跟集群斷開聯系,zk就會將這個節點刪除
3、SEQUENTIAL 帶序號的:這類節點,zk會自動拼接上一個序號,而且序號是遞增的
組合類型:
PERSISTENT :持久不帶序號
EPHEMERAL :短暫不帶序號
PERSISTENT 且 SEQUENTIAL :持久且帶序號
EPHEMERAL 且 SEQUENTIAL :短暫且帶序號
6、java客戶端
6.1、創建zk客戶端
timeOut指定會話超時時間,即客戶端關閉連接后,會話還可以保持多久;
watcher是一個接口,即為當客戶端收到zk事件通知時候要進行什么邏輯操作。
//構造一個了解Zookeeper的客戶端對象
ZooKeeper zk = new ZooKeeper(“hdp-01:2181,hdp-02:2181,hdp-03:2181”,timeOut,watcher)
6.2、增刪改查

public class ZookeeperClientDemo { ZooKeeper zk = null; @Before public void init() throws Exception{ // 構造一個連接zookeeper的客戶端對象 zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null); } @Test public void testCreate() throws Exception{ // 參數1:要創建的節點路徑 參數2:數據 參數3:訪問權限 參數4:節點類型 String create = zk.create("/eclipse", "hello eclipse".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(create); zk.close(); } @Test public void testUpdate() throws Exception { // 參數1:節點路徑 參數2:數據 參數3:所要修改的版本,-1代表任何版本 zk.setData("/eclipse", "我愛你".getBytes("UTF-8"), -1); zk.close(); } @Test public void testGet() throws Exception { // 參數1:節點路徑 參數2:是否要監聽 參數3:所要獲取的數據的版本,null表示最新版本 byte[] data = zk.getData("/eclipse", false, null); System.out.println(new String(data,"UTF-8")); zk.close(); } @Test public void testListChildren() throws Exception { // 參數1:節點路徑 參數2:是否要監聽 // 注意:返回的結果中只有子節點名字,不帶全路徑 List<String> children = zk.getChildren("/cc", false); for (String child : children) { System.out.println(child); } zk.close(); } @Test public void testRm() throws InterruptedException, KeeperException{ zk.delete("/eclipse", -1); zk.close(); } }
6.2.1、創建節點create
byte[] 數據 要求不能大於1M
ACL是訪問權限Access Control List:什么樣的人可以訪問這個數據節點,一般是內部一套系統使用,這里就選擇開放權限
String path = zk.create(String path,byte[] data,List<ACL> acl,CreateMode createMode);
節點類型
6.2.2、修改數據setData
version指明要修改哪個版本的數據,如果用戶不關心,哪個版本,可以設置成-1,表示修改全部版本。
返回該節點數據的元數據Stat
6.2.3、查找數據getData
watch:表示要不要監聽
Stat:用元數據來表示要回去哪個版本,最新版本用null表示
6.2.4、查找子節點getChildren
返回所有子節點的名字,不帶全路徑,只有子節點名字
6.2.5、刪除節點delete
-1表示刪除所有版本
6.3、注冊監聽
注意:注冊的監聽器在正常收到一次所監聽的事件后,就失效。
zk會另外起一個線程,去等待zk發來的通知,並作出我們設置的watcher邏輯(若沒有設置watcher邏輯,而是是指了boolean true,則直接輸出收到的通知)
Watcher是一個接口:客戶端收到監聽事件后的回調邏輯
6.3.1、get監聽節點數據變化
@Before public void init() throws Exception { // 構造一個連接zookeeper的客戶端對象 zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null); } @Test public void testGetWatch() throws Exception { byte[] data = zk.getData("/mygirls", true, null); // 監聽節點數據變化 List<String> children = zk.getChildren("/mygirls", true); //監聽節點的子節點變化事件 System.out.println(new String(data, "UTF-8")); Thread.sleep(Long.MAX_VALUE); }
注意:注冊的監聽器在正常收到一次所監聽的事件后,就失效。可以在new Zookeeper(創建客戶端)的時候,指定默認的回調邏輯同事在回調邏輯中繼續注冊監聽,以后直接在getData時寫true就可以了,這樣就可以一直監聽。
注意:new Zookeeper的時候,會出發連接成功事件(發生路徑null,type類型none)
可以用轉太判斷來避開初始事件
構造Zookeeper的時候,添加回調邏輯,后續getData是監聽節點數據是注冊監聽就只要設置true,那么就會調用對象構造時候的回調邏輯(默認邏輯)。

public class ZookeeperWatchDemo { ZooKeeper zk = null; @Before public void init() throws Exception { // 構造一個連接zookeeper的客戶端對象 zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) { System.out.println(event.getPath()); // 收到的事件所發生的節點路徑 System.out.println(event.getType()); // 收到的事件的類型 System.out.println("趕緊換照片,換浴室里面的洗浴套裝....."); // 收到事件后,我們的處理邏輯 try { zk.getData("/mygirls", true, null); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } }else if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged){ System.out.println("子節點變化了......"); } } }); } @Test public void testGetWatch() throws Exception { byte[] data = zk.getData("/mygirls", true, null); // 監聽節點數據變化 List<String> children = zk.getChildren("/mygirls", true); //監聽節點的子節點變化事件 System.out.println(new String(data, "UTF-8")); Thread.sleep(Long.MAX_VALUE); } }
6.3.2、監聽節點的子節點變化事件
6.4、Zookeeper客戶端工作線程
sendThread
eventThread
Zookeeper中的eventThread是守護線程
守護線程: A是B的守護線程,若B線程結束,不管A線程有沒有執行完畢,A線程都會退出。(主人已掛,仆人陪葬)
thread.setDaemon(true);
7、實例服務器上下線,動態感知
這是一個簡單的分布式系統:
業務功能:模擬業務,這里只是簡單的時間查詢;客戶端去請求服務器獲得時間信息,而且服務器動態上下線時候,客戶端會實時感知。
服務器:TimeQueryServer
// 構造zk客戶端連接// 注冊服務器信息// 啟動業務線程開始處理業務
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class TimeQueryServer { ZooKeeper zk = null; // 構造zk客戶端連接 public void connectZK() throws Exception{ zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null); } // 注冊服務器信息 public void registerServerInfo(String hostname,String port) throws Exception{ // 先判斷注冊節點的父節點是否存在,如果不存在,則創建 Stat stat = zk.exists("/servers", false); if(stat==null){ zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 注冊服務器數據到zk的約定注冊節點下【短暫+序號】 String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" 服務器向zk注冊信息成功,注冊的節點為:" + create); } public static void main(String[] args) throws Exception { TimeQueryServer timeQueryServer = new TimeQueryServer(); // 構造zk客戶端連接 timeQueryServer.connectZK(); // 注冊服務器信息 timeQueryServer.registerServerInfo(args[0], args[1]); // 啟動業務線程開始處理業務 new TimeQueryService(Integer.parseInt(args[1])).start(); } }
業務類:TimeQueryService
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; public class TimeQueryService extends Thread{ int port = 0; public TimeQueryService(int port){ this.port = port; } @Override public void run() { try {
// ServerSocket監聽port端口 ServerSocket ss = new ServerSocket(port); System.out.println("業務線程已綁定端口"+port+"准備接受消費端請求了....."); while(true){
// 拿到客戶端請求socket Socket sc = ss.accept(); InputStream inputStream = sc.getInputStream(); OutputStream outputStream = sc.getOutputStream(); outputStream.write(new Date().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }
客戶端(消費者:):Consumer
// 構造zk連接對象// 查詢在線服務器列表 // 處理業務(向一台服務器發送時間查詢請求)
import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; public class Consumer { // 定義一個list用於存放最新的在線服務器列表 private volatile ArrayList<String> onlineServers = new ArrayList<>(); // 構造zk連接對象 ZooKeeper zk = null; // 構造zk客戶端連接;注冊默認監聽后的回調邏輯 public void connectZK() throws Exception { zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() { @Override public void process(WatchedEvent event) {
// 鏈接狀態正常,且為節點的子節點變化事件 if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) { try { // 事件回調邏輯中,再次查詢zk上的在線服務器節點即可,查詢邏輯中又再次注冊了子節點變化事件監聽 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查詢在線服務器列表 public void getOnlineServers() throws Exception { // 查詢服務器列表,再次開啟監聽 List<String> children = zk.getChildren("/servers", true); ArrayList<String> servers = new ArrayList<>(); // 遍歷節點的子節點 for (String child : children) {
// 獲取節點數據,此時的業務沒有設置監聽數值變化的需求 byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查詢了一次zk,當前在線的服務器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑選一台當前在線的服務器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次請求挑選的服務器為:" + server); // 客戶端socket請求 Socket socket = new Socket(hostname, port); OutputStream out = socket.getOutputStream();// 網絡輸出流:向服務端發送請求。 InputStream in = socket.getInputStream();// 網絡輸入流:接受服務端發回的請求。 out.write("haha".getBytes()); out.flush(); byte[] buf = new byte[256]; int read = in.read(buf); System.out.println("服務器響應的時間為:" + new String(buf, 0, read)); out.close(); in.close(); socket.close(); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 構造zk連接對象 consumer.connectZK(); // 查詢在線服務器列表 consumer.getOnlineServers(); // 處理業務(向一台服務器發送時間查詢請求) consumer.sendRequest(); } }