一、zookeeper原理解析
1、進群角色描述
2、Paxos 算法概述( ZAB 協議) 分布式一致性算法
3、Zookeeper 的選主(恢復模式)
以一個簡單的例子來說明整個選舉的過程.
假設有五台服務器組成的 zookeeper 集群,它們的 id 從 1-5,同時它們都是最新啟動的,也就是 沒有歷史數據,在存放數據量這一點上,都是一樣的.假設這些服務器依序啟動,來看看會發生 什么.
(1) 服務器 1 啟動,此時只有它一台服務器啟動了,它發出去的報沒有任何響應,所以它的選舉 狀態一直是 LOOKING 狀態
(2)服務器 2 啟動,它與最開始啟動的服務器 1 進行通信,互相交換自己的選舉結果,由於兩者 都沒有歷史數據,所以 id 值較大的服務器 2 勝出,但是由於沒有達到超過半數以上的服務器都 同意選舉它(這個例子中的半數以上是 3),所以服務器 1,2 還是繼續保持 LOOKING 狀態.
(3) 服務器 3 啟動,根據前面的理論分析,服務器 3 成為服務器 1,2,3 中的老大,而與上面不同的 是,此時有三台服務器選舉了它,所以它成為了這次選舉的 leader.
(4) 服務器 4 啟動,根據前面的分析,理論上服務器 4 應該是服務器 1,2,3,4 中最大的,但是由於 前面已經有半數以上的服務器選舉了服務器 3,所以它只能接收當小弟的命了.
(5) 服務器 5 啟動,同 4 一樣,當小弟. (如果干掉ID3,怎么重新選舉 id最大的那台也就是5id)
總結: zookeeper server 的三種工作狀態
LOOKING:當前 Server 不知道 leader 是誰,正在搜尋,正在選舉
LEADING:當前 Server 即為選舉出來的 leader,負責協調事務
FOLLOWING: leader 已經選舉出來,當前 Server 與之同步,服從 leader 的命令
4、非全新集群的選舉機制(數據恢復)
那么,初始化的時候,是按照上述的說明進行選舉的,但是當 zookeeper 運行了一段時間之 后,有機器 down 掉,重新選舉時,選舉過程就相對復雜了。
需要加入數據 version、 server id 和邏輯時鍾。
數據 version:數據新的 version 就大,數據每次更新都會更新 version。
Leader id:就是我們配置的 myid 中的值,每個機器一個。
邏輯時鍾:這個值從 0 開始遞增,每次選舉對應一個值,也就是說: 如果在同一次選舉中,那么 這個值應該是一致的 ; 邏輯時鍾值越大,說明這一次選舉 leader 的進程更新.
選舉的標准就變成:
(1)邏輯時鍾小的選舉結果被忽略,重新投票
(2)統一邏輯時鍾后,數據 id 大的勝出
(3)數據 id 相同的情況下, leader id 大的勝出
根據這個規則選出 leader。
二、zookeeper應用案例
1、服務器上下線動態感知
需求:某分布式系統中,主節點可以有多台,可以動態上下線。 任意一台客戶端都能實時感知 到主節點服務器的上下線
設計思路:
(1) 設計服務器端存入服務器上線,下線的信息,比如都寫入到 servers 節點下
(2)設計客戶端監聽該 servers 節點,獲取該服務器集群的在線服務器列表
(3)服務器一上線,就往 zookeeper 文件系統中的一個統一的節點比如 servers 下寫入一個臨 時節 點,記錄下服務器的信息(思考,該節點最好采用什么類型的節點?)
(4) 服務器一下線,則刪除 servers 節點下的該服務器的信息,則客戶端因為監聽了該節點的數據變化,所以將第一時間得知服務器的在線狀態
實現:
服務器端
package com.ghgj.zookeeper.mydemo; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * 用來模擬服務器的動態上線下線 * 總體思路就是服務器上線就上 zookeeper 集群創建一個臨時節點,然后監聽了該數據節 點的個數變化的客戶端都收到通知 * 下線,則該臨時節點自動刪除,監聽了該數據節點的個數變化的客戶端也都收到通知 */ public class DistributeServer { private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181"; private static final int sessionTimeout = 4000; private static final String PARENT_NODE = "/server"; static ZooKeeper zk = null; public static void main(String[] args) throws Exception { DistributeServer distributeServer = new DistributeServer(); distributeServer.getZookeeperConnect(); distributeServer.registeServer("hadoop03"); Thread.sleep(Long.MAX_VALUE); } /** * 拿到 zookeeper 進群的鏈接 */ public void getZookeeperConnect() throws Exception { zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { } }); } /** * 服務器上線就注冊,掉線就自動刪除,所以創建的是臨時順序節點 */ public void registeServer(String hostname) throws Exception{ Stat exists = zk.exists(PARENT_NODE, false); if(exists == null){ zk.create(PARENT_NODE,"server_parent_node".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zk.create(PARENT_NODE+"/"+hostname, hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" is online, start working......"); } }
客戶端
package com.ghgj.zookeeper.mydemo; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; /** * 用來模擬用戶端的操作:連上 zookeeper 進群,實時獲取服務器動態上下線的節點信息 * 總體思路就是每次該 server 節點下有增加或者減少節點數,我就打印出來該 server 節點 下的所有節點 */ public class DistributeClient { private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181"; private static final int sessionTimeout = 4000; private static final String PARENT_NODE = "/server"; static ZooKeeper zk = null; public static void main(String[] args) throws Exception { DistributeClient dc = new DistributeClient(); dc.getZookeeperConnect(); Thread.sleep(Long.MAX_VALUE); } /** * 拿到 zookeeper 進群的鏈接 */ public void getZookeeperConnect() throws Exception { zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { try { // 獲取父節點 server 節點下所有子節點,即是所有正上線服務的服 務器節點 List<String> children = zk.getChildren(PARENT_NODE, true); List<String> servers = new ArrayList<String>(); for(String child: children){ // 取出每個節點的數據,放入到 list 里 String server = new String(zk.getData(PARENT_NODE+"/"+child, false, null), "UTF-8"); servers.add(server); } // 打印 list 里面的元素 System.out.println(servers); } catch (Exception e) { e.printStackTrace(); } } }); System.out.println("Client is online, start Working......"); } }
2、分布式共享鎖
需求:在我們自己的分布式業務系統中,可能會存在某種資源,需要被整個系統的各台服務器共享 訪問,但是只允許一台服務器同時訪問
設計思路:
(1) 設計多個客戶端同時訪問同一個數據
(2)為了同一時間只能允許一個客戶端上去訪問,所以各個客戶端去 zookeeper 集群的一個 znode 節點去注冊一個臨時節點,定下規則,每次都是編號最小的客戶端才能去訪問
(3)多個客戶端同時監聽該節點,每次當有子節點被刪除時,就都收到通知,然后判斷自己 的編號是不是最小的,最小的就去執行訪問,不是最小的就繼續監聽。
代碼實現:
package com.ghgj.zookeeper.mydemo; import java.util.Collections; import java.util.List; import java.util.Random; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * 需求:多個客戶端,需要同時訪問同一個資源,但同時只允許一個客戶端進行訪問。 * 設計思路:多個客戶端都去父 znode 下寫入一個子 znode,能寫入成功的去執行訪問, 寫入不成功的等待 */ public class MyDistributeLock { private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181"; private static final int sessionTimeout = 4000; private static final String PARENT_NODE = "/parent_locks"; private static final String SUB_NODE = "/sub_client"; static ZooKeeper zk = null; private static String currentPath = ""; public static void main(String[] args) throws Exception { MyDistributeLock mdc = new MyDistributeLock(); // 1、拿到 zookeeper 鏈接 mdc.getZookeeperConnect(); // 2、查看父節點是否存在,不存在則創建 Stat exists = zk.exists(PARENT_NODE, false); if(exists == null){ zk.create(PARENT_NODE, PARENT_NODE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 3、監聽父節點 zk.getChildren(PARENT_NODE, true); // 4、往父節點下注冊節點,注冊臨時節點,好處就是,當宕機或者斷開鏈接時該 節點自動刪除 currentPath = zk.create(PARENT_NODE+SUB_NODE, SUB_NODE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 5、關閉 zk 鏈接 Thread.sleep(Long.MAX_VALUE); zk.close(); } /** * 拿到 zookeeper 集群的鏈接 */ public void getZookeeperConnect() throws Exception { zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 匹配看是不是子節點變化,並且監聽的路徑也要對 if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(PARENT_NODE)){ try { // 獲取父節點的所有子節點, 並繼續監聽 List<String> childrenNodes = zk.getChildren(PARENT_NODE, true); // 匹配當前創建的 znode 是不是最小的 znode Collections.sort(childrenNodes); if((PARENT_NODE+"/"+childrenNodes.get(0)).equals(currentPath)){ // 處理業務 handleBusiness(currentPath); } } catch (Exception e) { e.printStackTrace(); } } } }); } public void handleBusiness(String create) throws Exception{ System.out.println(create+" is working......"); Thread.sleep(new Random().nextInt(4000)); zk.delete(currentPath, -1); System.out.println(create+" is done ......"); currentPath = zk.create(PARENT_NODE+SUB_NODE, SUB_NODE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } }
補充:監聽機制案例
package com.ghgj.zkapi; import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; public class ZKAPIDEMOWatcher { // 獲取zookeeper連接時所需要的服務器連接信息,格式為主機名:端口號 private static final String ConnectString = "hadoop02:2181"; // 請求了解的會話超時時長 private static final int SessionTimeout = 5000; private static ZooKeeper zk = null; static Watcher w = null; static Watcher watcher = null; public static void main(String[] args) throws Exception { watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + "\t-----" + event.getType()); List<String> children; try { if (event.getPath().equals("/spark") && event.getType() == EventType.NodeChildrenChanged) { // zk.setData("/spark", "spark-sql".getBytes(), -1); System.out.println("數據更改成功 ~~~~~~~~~~~~~~~~~~"); children = zk.getChildren("/spark", watcher); } if (event.getPath().equals("/spark") && event.getType() == EventType.NodeDataChanged) { // zk.setData("/spark", "spark-sql".getBytes(), -1); System.out.println("數據更改成功 ¥##########"); zk.getData("/spark", watcher, null); } if (event.getPath().equals("/mx") && event.getType() == EventType.NodeChildrenChanged) { // zk.setData("/mx", "spark-sql".getBytes(), -1); System.out.println("數據更改成功 ---------"); children = zk.getChildren("/mx", watcher); } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }; zk = new ZooKeeper(ConnectString, SessionTimeout, watcher); zk.getData("/spark", true, null); zk.getChildren("/spark", true); zk.getChildren("/mx", true); zk.exists("/spark", true); 自定義循環自定義 w = new Watcher() { @Override public void process(WatchedEvent event) { try { zk.getData("/hive", w, null); System.out.println("hive shuju bianhua "); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }; zk.getData("/hive", w, null); // zk.setData(path, data, version); // 表示給znode /ghgj 的數據變化事件加了監聽 // 第二個參數使用true還是false的意義就是是否使用拿zookeeper鏈接時指定的監聽器 // zk.getData("/ghgj", true, null); // zk.setData("/ghgj", "hadoophdfs2".getBytes(), -1); /* * zk.getData("/sqoop", new Watcher(){ * * @Override public void process(WatchedEvent event) { * System.out.println("**************"); * System.out.println(event.getPath()+"\t"+event.getType()); } }, null); */ // zk.setData("/sqoop", "hadoophdfs3".getBytes(), -1); // // NodeDataChanged // zk.delete("/sqoop", -1); // NodeDeleted // zk.create("/sqoop/s1", "s1".getBytes(), Ids.OPEN_ACL_UNSAFE, // CreateMode.PERSISTENT); // zk.exists("/hivehive", new Watcher(){ // @Override // public void process(WatchedEvent event) { // System.out.println("**************"); // System.out.println(event.getPath()+"\t"+event.getType()); // } // }); // create方法 // zk.create("/hivehive", "hivehive".getBytes(), Ids.OPEN_ACL_UNSAFE, // CreateMode.PERSISTENT); // zk.delete("/hivehive", -1); // zk.setData("/hivehive", "hadoop".getBytes(), -1); // 需求:有一個父節點叫做/spark,數據是spark,當父節點/spark下有三個子節點, // 那么就把該父節點的數據改成spark-sql // zk.create("/spark", "spark".getBytes(), Ids.OPEN_ACL_UNSAFE, // CreateMode.PERSISTENT); /* * zk.getChildren("/spark", new Watcher() { * * @Override public void process(WatchedEvent event) { try { * List<String> children = zk.getChildren("/spark", true); * if(children.size() == 3){ * * } zk.setData("/spark", "spark-sql".getBytes(), -1); * System.out.println("數據更改成功"); } catch (KeeperException | * InterruptedException e) { e.printStackTrace(); } } }); */ Thread.sleep(Long.MAX_VALUE); zk.close(); } }