分布式之zk的應用場景


分布式應用系統中,經常會用到zk,比如dubbo注冊中心,kafka分布式集群等都用到zk這一工具。除了這些用來做分布式集群外,zk還有那西應用場景事我們可以使用到該工具的呢?所以接下來就是我們要了解的重點了。

首先在使用zk的各種應用之前,我們需要了解zk 的相關功能模塊,這樣才能讓我們更清晰的了解為什么可以這么去使用:

zookeeper以目錄樹的形式管理數據,提供znode監聽、數據設置等接口,基於這些接口,我們可以實現Leader選舉、配置管理、命名服務等功能,ZK提供了以下API,供client操作znode和znode中存儲的數據:

  • create(path, data, flags):創建路徑為path的znode,在其中存儲data[]數據,flags可設置為Regular或Ephemeral,並可選打上sequential標志。
  • delete(path, version):刪除相應path/version的znode
  • exists(path,watch):如果存在path對應znode,則返回true;否則返回false,watch標志可設置監聽事件
  • getData(path, watch):返回對應znode的數據和元信息(如version等)
  • setData(path, data, version):將data[]數據寫入對應path/version的znode
  • getChildren(path, watch):返回指定znode的子節點集合

1.統一服務器名稱

命名服務器事一個比較常用的應用場景,客戶端通過制定名字來獲取服務器資源獲或提供者信息等,被命名的可以服務器地址,遠程對象。通過zk提供的創建節點的api,很容易創建一個全局唯一的path,這個path就可以做一個名稱,

dubbo使用zk就是用來做服務器名稱。維護全局的服務地址列表。

服務提供者在啟動的時候,向ZK上的指定節點/dubbo/${serviceName}/providers目錄下寫入自己的URL地址,這個操作就完成了服務的發布。

服務消費者啟動的時候,訂閱/dubbo/${serviceName}/providers目錄下的提供者URL地址, 並向/dubbo/${serviceName} /consumers目錄下寫入自己的URL地址。

注意,所有向ZK上注冊的地址都是臨時節點,這樣就能夠保證服務提供者和消費者能夠自動感應資源的變化。 另外,Dubbo還有針對服務粒度的監控,方法是訂閱/dubbo/${serviceName}目錄下所有提供者和消費者的信息

2.統一配置管理

zk客戶端api提供了操作znode數據的功能。再分布式環境中我們可以配置文件存放在znode上,不同的服務需要使用到哪些配置的時候可以直接從znode上去獲取。而且通過zk 的心跳極值,我們的配置文件是可以做到動態配置的。一般的配置中心的做法是在系統啟動之后加載我們的內存當中,一但配置文件需要做響應的調整的時候,需要重啟服務進行load配置操作,但是很多的場景事我們只需要更改一點點的內容就去重啟服務,代價不可謂不大。但zk就可以避免這問題的發生,當配置文件發生改變的時候,watch為通知到我們的服務對其修改操作。

3.分布式通知/協調

ZooKeeper中特有watcher注冊與異步通知機制,能夠很好的實現分布式環境下不同系統之間的通知與協調,實現對數據變更的實時處理。使用方法通常是不同系統都對ZK上同一個znode進行注冊,監聽znode的變化(包括znode本身內容及子節點的),其中一個系統update了znode,那么另一個系統能夠收到通知,並作出相應處理

1. 另一種心跳檢測機制:檢測系統和被檢測系統之間並不直接關聯起來,而是通過zk上某個節點關聯,大大減少系統耦合。

2. 另一種系統調度模式:某系統有控制台和推送系統兩部分組成,控制台的職責是控制推送系統進行相應的推送工作。管理人員在控制台作的一些操作,實際上是修改了ZK上某些節點的狀態,而ZK就把這些變化通知給他們注冊Watcher的客戶端,即推送系統,於是,作出相應的推送任務。

3. 另一種工作匯報模式:一些類似於任務分發系統,子任務啟動后,到zk來注冊一個臨時節點,並且定時將自己的進度進行匯報(將進度寫回這個臨時節點),這樣任務管理者就能夠實時知道任務進度。

總之,使用zookeeper來進行分布式通知和協調能夠大大降低系統之間的耦合

4.共享鎖

分布式鎖,這個主要得益於ZooKeeper為我們保證了數據的強一致性。鎖服務可以分為兩類,一個是 保持獨占,另一個是 控制時序。

1. 所謂保持獨占,就是所有試圖來獲取這個鎖的客戶端,最終只有一個可以成功獲得這把鎖。通常的做法是把zk上的一個znode看作是一把鎖,通過create znode的方式來實現。所有客戶端都去創建 /distribute_lock 節點,最終成功創建的那個客戶端也即擁有了這把鎖。

2. 控制時序,就是所有視圖來獲取這個鎖的客戶端,最終都是會被安排執行,只是有個全局時序了。做法和上面基本類似,只是這里 /distribute_lock 已經預先存在,客戶端在它下面創建臨時有序節點(這個可以通過節點的屬性控制:CreateMode.EPHEMERAL_SEQUENTIAL來指定)。Zk的父節點(/distribute_lock)維持一份sequence,保證子節點創建的時序性,從而也形成了每個客戶端的全局時序。

5.隊列管理

隊列方面,簡單地講有兩種,一種是常規的先進先出隊列,另一種是要等到隊列成員聚齊之后的才統一按序執行。對於第一種先進先出隊列,和分布式鎖服務中的控制時序場景基本原理一致,這里不再贅述。 第二種隊列其實是在FIFO隊列的基礎上作了一個增強。通常可以在 /queue 這個znode下預先建立一個/queue/num 節點,並且賦值為n(或者直接給/queue賦值n),表示隊列大小,之后每次有隊列成員加入后,就判斷下是否已經到達隊列大小,決定是否可以開始執行了。這種用法的典型場景是,分布式環境中,一個大任務Task A,需要在很多子任務完成(或條件就緒)情況下才能進行。這個時候,凡是其中一個子任務完成(就緒),那么就去 /taskList 下建立自己的臨時時序節點(CreateMode.EPHEMERAL_SEQUENTIAL),當 /taskList 發現自己下面的子節點滿足指定個數,就可以進行下一步按序進行處理了。

6.master選舉

在分布式環境中,相同的業務應用分布在不同的機器上,有些業務邏輯(例如一些耗時的計算,網絡I/O處理),往往只需要讓整個集群中的某一台機器進行執行,其余機器可以共享這個結果,這樣可以大大減少重復勞動,提高性能,於是這個master選舉便是這種場景下的碰到的主要問題。

利用ZooKeeper的強一致性,能夠保證在分布式高並發情況下節點創建的全局唯一性,即:同時有多個客戶端請求創建 /currentMaster 節點,最終一定只有一個客戶端請求能夠創建成功。利用這個特性,就能很輕易的在分布式環境中進行集群選取了。

另外,這種場景演化一下,就是動態Master選舉。這就要用到EPHEMERAL_SEQUENTIAL類型節點的特性了。

上文中提到,所有客戶端創建請求,最終只有一個能夠創建成功。在這里稍微變化下,就是允許所有請求都能夠創建成功,但是得有個創建順序,於是所有的請求最終在ZK上創建結果的一種可能情況是這樣: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次選取序列號最小的那個機器作為Master,如果這個機器掛了,由於他創建的節點會馬上小時,那么之后最小的那個機器就是Master了。

1. 在搜索系統中,如果集群中每個機器都生成一份全量索引,不僅耗時,而且不能保證彼此之間索引數據一致。因此讓集群中的Master來進行全量索引的生成,然后同步到集群中其它機器。另外,Master選舉的容災措施是,可以隨時進行手動指定master,就是說應用在zk在無法獲取master信息時,可以通過比如http方式,向一個地方獲取master。

2. 在Hbase中,也是使用ZooKeeper來實現動態HMaster的選舉。在Hbase實現中,會在ZK上存儲一些ROOT表的地址和HMaster的地址,HRegionServer也會把自己以臨時節點(Ephemeral)的方式注冊到Zookeeper中,使得HMaster可以隨時感知到各個HRegionServer的存活狀態,同時,一旦HMaster出現問題,會重新選舉出一個HMaster來運行,從而避免了HMaster的單點問題

 

附上zk負載均衡響應的實現代碼

1.權重輪詢模式

package com.samp.zk.balance;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.fastjson.JSONObject;

public class RoundRobin {

    private int currentIndex = -1;// 當前位置
    private int currentWeight = 0 ;//當前權重
    private int maxGcd = 0 ;    //最大權重數
    private int maxWeight = 0;// 最大公約數
    private int servetCount = 0 ;// 總服務器數量
    
    private List<Server> serverLst; // 服務器列表 
    
    
    private int gcd (int a , int b){
        BigInteger b1 = new BigInteger(String.valueOf(a));
        BigInteger b2 = new BigInteger(String.valueOf(b));
        BigInteger result = b1.gcd(b2);
        return result.intValue();
    }
    
    private int getMaxCurrentGcd (List<Server> serverList){
        int result = 0 ;
        for(int i = 0,len = serverLst.size();i< len -1 ;i++){
            if(result == 0){
                result = gcd(serverLst.get(i).weight, serverLst.get(i+1).weight);
            }else{
                result = gcd (result,serverLst.get(i+1).weight);
            }
        }
        return result;
    }
    
    private int getMaxCurrentWeight(List<Server> serverList){
        int result = 0 ;
        for(int i = 0,len = serverLst.size();i< len -1 ;i++){
            if( result ==0 ){
                result = Math.max(serverLst.get(i).weight, serverLst.get(i+1).weight);
            }else{
                result= Math.max(result, serverLst.get(i).weight);
            }
        }
        return result ;
    }
    
    public static void main(String[] args){
        RoundRobin obj =new RoundRobin();
        obj.init();
        Map<String,Integer> map = new HashMap<String,Integer>();
        for(int i=0;i<100;i++){
            
            Server ser = obj.getServer();
            String ip = ser.getIp();
            if(map.containsKey(ip)){
                map.put(ip, map.get(ip)+1);
            }else{
                map.put(ip, 1);
            }
        }
        
        for (Entry<String, Integer> m: map.entrySet()) {
            System.out.println("服務器 " + m.getKey() + " 請求次數: " + m.getValue());
        }
        
    }
    
    /**
     * 
     * @Title: getServer 
     * @Description: 服務獲取方式:
     *     1.初始化開始位置為-1,權重是0,
     *     2.第1次輪詢獲取服務器未當前服務器權重最高的
     *  3.第2次輪詢權重遞減1,獲取有大於等於該權限的服務器
     *  4.重復第3步,直到權重為最小值0時,從第1步開始從新輪詢
     * @return 參數說明
     * @return Server    返回類型
     */
    public Server getServer(){
        while (true){
            currentIndex = (currentIndex + 1) % servetCount;
            if(currentIndex ==0 ){
                currentWeight = currentWeight - maxGcd; 
                if(currentWeight <= 0 ){
                    currentWeight = maxWeight ;
                    if(currentWeight == 0 )
                        return null;
                }
            }
            if(serverLst.get(currentIndex).weight >= currentWeight ){
                return serverLst.get(currentIndex);
            }
        }
    
    }
    
    public Server getServerBy(){
        
        return null;
    }
    
    public Server getServerFromDubbo(){
        AtomicInteger sequence = new AtomicInteger(1);
        int maxWeight = 0; // 最大權重
        int minWeight = Integer.MAX_VALUE; // 最小權重
        int weightSum = 0;
        Map<String,Integer> map = new HashMap<String,Integer>();
        for (int i = 0; i < serverLst.size(); i++) {
            int weight = serverLst.get(i).weight;
            String ip = serverLst.get(i).getIp();
            maxWeight = Math.max(maxWeight, weight); // 累計最大權重
            minWeight = Math.min(minWeight, weight); // 累計最小權重
            if (weight > 0) {
                map.put(ip, weight);
                weightSum += weight;
            }
        }
        System.out.println("============"+JSONObject.toJSON(map)+"======"+weightSum);
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) { // 權重不一樣
            int mod = currentSequence % weightSum;
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<String,Integer> each : map.entrySet()) {
                    final String ip = each.getKey();
                    final Integer v = each.getValue();
                    if (mod == 0 && v > 0) {
                        return new Server(ip, v);
                    }
                    if (v > 0) {
                        mod--;
                    }
                }
            }
        }
        return null;
    }
    
    public void getServer2(){
        List<String> lst = new ArrayList<String>();
        for (int i = 0; i < maxWeight; i++) {
            for (Server ser :serverLst) {
                final String ip = ser.getIp();
                int num = ser.getWeight();
                if((num -1 )>0){
                    continue;
                }
                lst.add(ip);
            }
        }
    }
    
    private List<Server> getNewList(List<Server> sers){
        List<Server> l = new ArrayList<Server>();
        for(Server ser:sers){
            String ip = ser.getIp();
            int weight = ser.getWeight()-1;
            Server s = new Server(ip, weight);
            l.add(s);
        }
        return l;
    }
    
    public void init2(){
        Server s11 = new Server("127.0.0.1", 1);
        Server s12 = new Server("127.0.0.1", 1);
        Server s13 = new Server("127.0.0.1", 1);
        Server s21 = new Server("127.0.0.2", 1);
        Server s31 = new Server("127.0.0.3", 1);
        Server s32 = new Server("127.0.0.3", 1);
        Server s41 = new Server("127.0.0.4", 1);
        Server s42 = new Server("127.0.0.4", 1);
        Server s51 = new Server("127.0.0.5", 1);
        Server s52 = new Server("127.0.0.5", 1);
        Server s53 = new Server("127.0.0.5", 1);
        Server s54 = new Server("127.0.0.5", 1);
        
        serverLst = new ArrayList<Server>();
        serverLst.add(s11);
        serverLst.add(s12);
        serverLst.add(s13);
        serverLst.add(s21);
        serverLst.add(s31);
        serverLst.add(s32);
        
        serverLst.add(s41);
        serverLst.add(s42);
        serverLst.add(s53);
        serverLst.add(s54);
        serverLst.add(s51);
        serverLst.add(s52);
        
        maxWeight = getMaxCurrentWeight(serverLst);
        
    }
    
    public void init(){
        Server s1 = new Server("127.0.0.1", 3);
        Server s2 = new Server("127.0.0.2", 1);
        Server s3 = new Server("127.0.0.3", 2);
        Server s4 = new Server("127.0.0.4", 2);
        Server s5 = new Server("127.0.0.5", 4);
        
        serverLst = new ArrayList<Server>();
        serverLst.add(s1);
        serverLst.add(s2);
        serverLst.add(s3);
        serverLst.add(s4);
        serverLst.add(s5);
        
        maxGcd = getMaxCurrentGcd(serverLst);
        maxWeight = getMaxCurrentWeight(serverLst);
        currentIndex = -1 ;
        currentWeight = 0 ;
        servetCount = serverLst.size();
    }
    
    public int getCurrentIndex() {
        return currentIndex;
    }
    public void setCurrentIndex(int currentIndex) {
        this.currentIndex = currentIndex;
    }
    public int getCurrentWeight() {
        return currentWeight;
    }
    public void setCurrentWeight(int currentWeight) {
        this.currentWeight = currentWeight;
    }
    public int getMaxGcd() {
        return maxGcd;
    }
    public void setMaxGcd(int maxGcd) {
        this.maxGcd = maxGcd;
    }
    public int getMaxWeight() {
        return maxWeight;
    }
    public void setMaxWeight(int maxWeight) {
        this.maxWeight = maxWeight;
    }
    public int getServetCount() {
        return servetCount;
    }
    public void setServetCount(int servetCount) {
        this.servetCount = servetCount;
    }


    class Server {
        private String ip;
        private int weight;
        
        public Server(String ip, int weight) {
            super();
            this.ip = ip;
            this.weight = weight;
        }
        public String getIp() {
            return ip;
        }
        public void setIp(String ip) {
            this.ip = ip;
        }
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
        }
        
    }

}
View Code

2.隨機分配

package com.samp.zk.balance;

import java.util.List;
import java.util.Random;

import org.I0Itec.zkclient.ZkClient;

/** 
 * @ClassName RandomLoadBalance  
 * @Description 隨機方式實現負載均衡 
 * @author hezc 
 * @date 2017年2月14日  
 *   
 */
public class RandomLoadBalance implements LoadBalance {  
  
    @Override  
    public String select(String zkServer) {  
        ZkClient zkClient = new ZkClient(zkServer);  
        List<String> serverList = zkClient.getChildren(Constant.root);  
        zkClient.close();  
        Random r=new Random();  
        if(serverList.size()>=1){  
            String server=serverList.get(r.nextInt(serverList.size()));  
            return server;  
        }else{  
            return null;  
        }  
    }  

}

3.一致hash

package com.samp.zk.balance;

import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;

import org.I0Itec.zkclient.ZkClient;

/** 
 * @ClassName ConsistentHashLoadBalance  
 * @Description 一致hash實現zk負載均衡 
 * @author hezc 
 * @date 2017年2月13日  
 *   
 */
public class ConsistentHashLoadBalance implements LoadBalance {  
    private String client;  
      
    public void SetClient(String client){  
        this.client=client;  
    }  
      
    @Override  
    public String select(String zkServer) {  
        ZkClient zkClient = new ZkClient(zkServer);  
        List<String> serverList = zkClient.getChildren(Constant.root);  
        ConsistentHashSelector selector=new ConsistentHashSelector(client,serverList);  
        return selector.select();  
          
    }  
      
     private static final class ConsistentHashSelector {  
            public ConsistentHashSelector(String client,List<String> appServer){  
                this.client=client;  
                this.appServer=appServer;  
            }  
           
            private String client;  
            private List<String> appServer;  
              
            public String select() {  
                String key =client ;  
                byte[] digest = md5(key);  
                String server =appServer.get((int) hash(digest, 0));  
                return server;  
            }  
  
            private long hash(byte[] digest, int number) {  
                return (((long) (digest[3 + number * 4] & 0xFF) << 24)  
                        | ((long) (digest[2 + number * 4] & 0xFF) << 16)  
                        | ((long) (digest[1 + number * 4] & 0xFF) << 8)   
                        | (digest[0 + number * 4] & 0xFF))   
                        & 0xFFFFFFFFL;  
            }  
  
            private byte[] md5(String value) {  
                MessageDigest md5;  
                try {  
                    md5 = MessageDigest.getInstance("MD5");  
                } catch (NoSuchAlgorithmException e) {  
                    throw new IllegalStateException(e.getMessage(), e);  
                }  
                md5.reset();  
                byte[] bytes = null;  
                try {  
                    bytes = value.getBytes("UTF-8");  
                } catch (UnsupportedEncodingException e) {  
                    throw new IllegalStateException(e.getMessage(), e);  
                }  
                md5.update(bytes);  
                return md5.digest();  
            }  
  
        }  

    
}
View Code

4.最小活動優先

package com.samp.zk.balance;

import java.util.List;

import org.I0Itec.zkclient.ZkClient;

/** 
 * @ClassName LeastActiveLoadBalance  
 * @Description TODO 
 * @author hezc 
 * @date 2017年2月14日  
 *   
 */
public class LeastActiveLoadBalance implements LoadBalance {  
  
    @Override  
    public String select(String zkServer) {  
        ZkClient zkClient = new ZkClient(zkServer);  
        List<String> serverList = zkClient.getChildren(Constant.root);  
  
        String tempServer = null;  
        int tempConn = -1;  
        for (int i = 0; i < serverList.size(); i++) {  
            String server = serverList.get(i);  
            if (zkClient.readData(Constant.root + "/" + server) != null) {  
                int connNum = zkClient.readData(Constant.root + "/" + server);  
                if (tempConn == -1) {  
                    tempServer = server;  
                    tempConn = connNum;  
                }  
                if (connNum < tempConn) {  
                    tempServer = server;  
                    tempConn = connNum;  
                }  
            }else{  
                zkClient.close();  
                return server;  
            }  
        }  
        zkClient.close();  
        if (tempServer != null && !tempServer.equals("")) {  
            return tempServer;  
        }  
  
        return null;  
    }  

}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM