Zookeeper(三) Zookeeper原理與應用


一、zookeeper原理解析

     1、進群角色描述

    2、Paxos 算法概述( ZAB 協議)    分布式一致性算法

 

     3、Zookeeper 的選主(恢復模式)

 以一個簡單的例子來說明整個選舉的過程.
假設有五台服務器組成的 zookeeper 集群,它們的 id 1-5,同時它們都是最新啟動的,也就是 沒有歷史數據,在存放數據量這一點上,都是一樣的.假設這些服務器依序啟動,來看看會發生 什么

(1) 服務器 1 啟動,此時只有它一台服務器啟動了,它發出去的報沒有任何響應,所以它的選舉 狀態一直是 LOOKING 狀態
(2)服務器 2 啟動,它與最開始啟動的服務器 1 進行通信,互相交換自己的選舉結果,由於兩者 都沒有歷史數據,所以 id 值較大的服務器 勝出,但是由於沒有達到超過半數以上的服務器都 同意選舉它(這個例子中的半數以上是 3),所以服務器 1,2 還是繼續保持 LOOKING 狀態.
(3) 服務器 3 啟動,根據前面的理論分析,服務器 3 成為服務器 1,2,3 中的老大,而與上面不同的 是,此時有三台服務器選舉了它,所以它成為了這次選舉的 leader.
(4) 服務器 4 啟動,根據前面的分析,理論上服務器 4 應該是服務器 1,2,3,4 中最大的,但是由於 前面已經有半數以上的服務器選舉了服務器 3,所以它只能接收當小弟的命了.
(5) 服務器 5 啟動,4 一樣,當小弟.     (如果干掉ID3,怎么重新選舉 id最大的那台也就是5id)

總結: zookeeper server 的三種工作狀態
LOOKING:當前 Server 不知道 leader 是誰,正在搜尋,正在選舉
LEADING:當前 Server 即為選舉出來的 leader,負責協調事務
FOLLOWINGleader 已經選舉出來,當前 Server 與之同步,服從 leader 的命令

     4、非全新集群的選舉機制(數據恢復)

那么,初始化的時候,是按照上述的說明進行選舉的,但是當 zookeeper 運行了一段時間之 后,有機器 down 掉,重新選舉時,選舉過程就相對復雜了。
需要加入數據 versionserver id 和邏輯時鍾。
數據 version:數據新的 version 就大,數據每次更新都會更新 version
Leader id:就是我們配置的 myid 中的值,每個機器一個。
邏輯時鍾:這個值從 0 開始遞增,每次選舉對應一個值,也就是說: 如果在同一次選舉中,那么 這個值應該是一致的 邏輯時鍾值越大,說明這一次選舉 leader 的進程更新.

選舉的標准就變成:
    (1)邏輯時鍾小的選舉結果被忽略,重新投票
    (2)統一邏輯時鍾后,數據 id 大的勝出
    (3)數據 id 相同的情況下, leader id 大的勝出
根據這個規則選出 leader

二、zookeeper應用案例

     1、服務器上下線動態感知

       需求:某分布式系統中,主節點可以有多台,可以動態上下線。 任意一台客戶端都能實時感知 到主節點服務器的上下線 
      設計思路:
     

         (1) 設計服務器端存入服務器上線,下線的信息,比如都寫入到 servers 節點下
         (2)設計客戶端監聽該 servers 節點,獲取該服務器集群的在線服務器列表
         (3)服務器一上線,就往 zookeeper 文件系統中的一個統一的節點比如 servers 下寫入一個臨 時節 點,記錄下服務器的信息(思考,該節點最好采用什么類型的節點?)
         (4) 服務器一下線,則刪除 servers 節點下的該服務器的信息,則客戶端因為監聽了該節點的數據變化,所以將第一時間得知服務器的在線狀態
      實現:

     服務器端

package com.ghgj.zookeeper.mydemo;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 用來模擬服務器的動態上線下線
* 總體思路就是服務器上線就上 zookeeper 集群創建一個臨時節點,然后監聽了該數據節
點的個數變化的客戶端都收到通知
* 下線,則該臨時節點自動刪除,監聽了該數據節點的個數變化的客戶端也都收到通知
*/
public class DistributeServer {
private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
private static final int sessionTimeout = 4000;
private static final String PARENT_NODE = "/server";
static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
DistributeServer distributeServer = new DistributeServer();
distributeServer.getZookeeperConnect();
distributeServer.registeServer("hadoop03");
Thread.sleep(Long.MAX_VALUE);
}
/**
* 拿到 zookeeper 進群的鏈接
*/
public void getZookeeperConnect() throws Exception {
zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
/**
* 服務器上線就注冊,掉線就自動刪除,所以創建的是臨時順序節點
*/
public void registeServer(String hostname) throws Exception{
Stat exists = zk.exists(PARENT_NODE, false);
if(exists == null){
zk.create(PARENT_NODE,"server_parent_node".getBytes(),Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
zk.create(PARENT_NODE+"/"+hostname, hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+" is online, start working......");
}
}

  客戶端

package com.ghgj.zookeeper.mydemo;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* 用來模擬用戶端的操作:連上 zookeeper 進群,實時獲取服務器動態上下線的節點信息
* 總體思路就是每次該 server 節點下有增加或者減少節點數,我就打印出來該 server 節點
下的所有節點
*/
public class DistributeClient {
private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
private static final int sessionTimeout = 4000;
private static final String PARENT_NODE = "/server";
static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
DistributeClient dc = new DistributeClient();
dc.getZookeeperConnect();
Thread.sleep(Long.MAX_VALUE);
}
/**
* 拿到 zookeeper 進群的鏈接
*/
public void getZookeeperConnect() throws Exception {
zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
// 獲取父節點 server 節點下所有子節點,即是所有正上線服務的服
務器節點
List<String> children = zk.getChildren(PARENT_NODE, true);
List<String> servers = new ArrayList<String>();
for(String child: children){
// 取出每個節點的數據,放入到 list 里
String server = new String(zk.getData(PARENT_NODE+"/"+child,
false, null), "UTF-8");
servers.add(server);
}
// 打印 list 里面的元素
System.out.println(servers);
} catch (Exception e) {
e.printStackTrace();
}
}
});
System.out.println("Client is online, start Working......");
}
}

  2、分布式共享鎖

       需求:在我們自己的分布式業務系統中,可能會存在某種資源,需要被整個系統的各台服務器共享 訪問,但是只允許一台服務器同時訪問 

       設計思路:

         (1) 設計多個客戶端同時訪問同一個數據
         (2)為了同一時間只能允許一個客戶端上去訪問,所以各個客戶端去 zookeeper 集群的一個 znode 節點去注冊一個臨時節點,定下規則,每次都是編號最小的客戶端才能去訪問
         (3)多個客戶端同時監聽該節點,每次當有子節點被刪除時,就都收到通知,然后判斷自己 的編號是不是最小的,最小的就去執行訪問,不是最小的就繼續監聽。
      代碼實現:

package com.ghgj.zookeeper.mydemo;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 需求:多個客戶端,需要同時訪問同一個資源,但同時只允許一個客戶端進行訪問。
* 設計思路:多個客戶端都去父 znode 下寫入一個子 znode,能寫入成功的去執行訪問,
寫入不成功的等待
*/
public class MyDistributeLock {
private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
private static final int sessionTimeout = 4000;
private static final String PARENT_NODE = "/parent_locks";
private static final String SUB_NODE = "/sub_client";
static ZooKeeper zk = null;
private static String currentPath = "";
public static void main(String[] args) throws Exception {
MyDistributeLock mdc = new MyDistributeLock();
// 1、拿到 zookeeper 鏈接
mdc.getZookeeperConnect();
// 2、查看父節點是否存在,不存在則創建
Stat exists = zk.exists(PARENT_NODE, false);
if(exists == null){
zk.create(PARENT_NODE, PARENT_NODE.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
// 3、監聽父節點
zk.getChildren(PARENT_NODE, true);
// 4、往父節點下注冊節點,注冊臨時節點,好處就是,當宕機或者斷開鏈接時該
節點自動刪除
currentPath = zk.create(PARENT_NODE+SUB_NODE, SUB_NODE.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 5、關閉 zk 鏈接
Thread.sleep(Long.MAX_VALUE);
zk.close();
}
/**
* 拿到 zookeeper 集群的鏈接
*/
public void getZookeeperConnect() throws Exception {
zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 匹配看是不是子節點變化,並且監聽的路徑也要對
if(event.getType() == EventType.NodeChildrenChanged &&
event.getPath().equals(PARENT_NODE)){
try {
// 獲取父節點的所有子節點, 並繼續監聽
List<String> childrenNodes = zk.getChildren(PARENT_NODE, true);
// 匹配當前創建的 znode 是不是最小的 znode
Collections.sort(childrenNodes);
if((PARENT_NODE+"/"+childrenNodes.get(0)).equals(currentPath)){
// 處理業務
handleBusiness(currentPath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
public void handleBusiness(String create) throws Exception{
System.out.println(create+" is working......");
Thread.sleep(new Random().nextInt(4000));
zk.delete(currentPath, -1);
System.out.println(create+" is done ......");
currentPath = zk.create(PARENT_NODE+SUB_NODE, SUB_NODE.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}

  

 

補充:監聽機制案例

package com.ghgj.zkapi;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;

public class ZKAPIDEMOWatcher {

	// 獲取zookeeper連接時所需要的服務器連接信息,格式為主機名:端口號
	private static final String ConnectString = "hadoop02:2181";

	// 請求了解的會話超時時長
	private static final int SessionTimeout = 5000;

	private static ZooKeeper zk = null;
	static Watcher w = null;
	static Watcher watcher = null;

	public static void main(String[] args) throws Exception {

	watcher = new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				System.out.println(event.getPath() + "\t-----" + event.getType());
				List<String> children;
				try {
					if (event.getPath().equals("/spark") && event.getType() == EventType.NodeChildrenChanged) {
						// zk.setData("/spark", "spark-sql".getBytes(), -1);
						System.out.println("數據更改成功 ~~~~~~~~~~~~~~~~~~");
						children = zk.getChildren("/spark", watcher);
					}
					if (event.getPath().equals("/spark") && event.getType() == EventType.NodeDataChanged) {
						// zk.setData("/spark", "spark-sql".getBytes(), -1);
						System.out.println("數據更改成功 ¥##########");
						zk.getData("/spark", watcher, null);
					}
					if (event.getPath().equals("/mx") && event.getType() == EventType.NodeChildrenChanged) {
						// zk.setData("/mx", "spark-sql".getBytes(), -1);
						System.out.println("數據更改成功  ---------");
						children = zk.getChildren("/mx", watcher);
					}

				} catch (KeeperException | InterruptedException e) {
					e.printStackTrace();
				}
			}
		};

		zk = new ZooKeeper(ConnectString, SessionTimeout, watcher);

		zk.getData("/spark", true, null);
		zk.getChildren("/spark", true);
		zk.getChildren("/mx", true);
		zk.exists("/spark", true);

		
		自定義循環自定義
		w = new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				try {
					zk.getData("/hive", w, null);
					System.out.println("hive shuju bianhua ");
				} catch (KeeperException e) {
					e.printStackTrace();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};

		zk.getData("/hive", w, null);

		// zk.setData(path, data, version);

		// 表示給znode /ghgj 的數據變化事件加了監聽
		// 第二個參數使用true還是false的意義就是是否使用拿zookeeper鏈接時指定的監聽器
		// zk.getData("/ghgj", true, null);
		// zk.setData("/ghgj", "hadoophdfs2".getBytes(), -1);

		/*
		 * zk.getData("/sqoop", new Watcher(){
		 * 
		 * @Override public void process(WatchedEvent event) {
		 * System.out.println("**************");
		 * System.out.println(event.getPath()+"\t"+event.getType()); } }, null);
		 */
		// zk.setData("/sqoop", "hadoophdfs3".getBytes(), -1); //
		// NodeDataChanged
		// zk.delete("/sqoop", -1); // NodeDeleted
		// zk.create("/sqoop/s1", "s1".getBytes(), Ids.OPEN_ACL_UNSAFE,
		// CreateMode.PERSISTENT);

		// zk.exists("/hivehive", new Watcher(){
		// @Override
		// public void process(WatchedEvent event) {
		// System.out.println("**************");
		// System.out.println(event.getPath()+"\t"+event.getType());
		// }
		// });

		// create方法
		// zk.create("/hivehive", "hivehive".getBytes(), Ids.OPEN_ACL_UNSAFE,
		// CreateMode.PERSISTENT);
		// zk.delete("/hivehive", -1);
		// zk.setData("/hivehive", "hadoop".getBytes(), -1);

		// 需求:有一個父節點叫做/spark,數據是spark,當父節點/spark下有三個子節點,
		// 那么就把該父節點的數據改成spark-sql
		// zk.create("/spark", "spark".getBytes(), Ids.OPEN_ACL_UNSAFE,
		// CreateMode.PERSISTENT);

		/*
		 * zk.getChildren("/spark", new Watcher() {
		 * 
		 * @Override public void process(WatchedEvent event) { try {
		 * List<String> children = zk.getChildren("/spark", true);
		 * if(children.size() == 3){
		 * 
		 * } zk.setData("/spark", "spark-sql".getBytes(), -1);
		 * System.out.println("數據更改成功"); } catch (KeeperException |
		 * InterruptedException e) { e.printStackTrace(); } } });
		 */

		Thread.sleep(Long.MAX_VALUE);

		zk.close();
	}
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM