ZooKeeper 典型應用場景-數據發布與訂閱


  ZooKeeper 是一個高可用的分布式數據管理與系統協調框架。基於對 Paxos 算法的實現,使該框架保證了分布式環境中數據的強一致性,也正是基於這樣的特性,使得 ZooKeeper 可以解決很多分布式問題。 

  隨着互聯網系統規模的不斷擴大,大數據時代飛速到來,越來越多的分布式系統將 ZooKeeper 作為核心組件使用,如 Hadoop、Hbase、Kafka、Storm等,因此,正確理解 ZooKeeper 的應用場景,對於 ZooKeeper 的使用者來說顯得尤為重要。本節主要將重點圍繞數據發布/訂閱、負載均衡、命名服務、分布式協調/通知、集群管理、Master選舉、分布式鎖和分布式隊列等方面來講解 ZooKeeper 的典型應用場景及實現。

1、數據發布/訂閱

  發布/訂閱模式是一對多的關系,多個訂閱者對象同時監聽某一主題對象,這個主題對象在自身狀態發生變化時會通知所有的訂閱者對象。使它們能自動的更新自己的狀態。發布/訂閱可以使得發布方和訂閱方獨立封裝、獨立改變。當一個對象的改變需要同時改變其他對象,而且它不知道具體有多少對象需要改變時可以使用發布/訂閱模式。發布/訂閱模式在分布式系統中的典型應用有配置管理服務發現、注冊

  配置管理是指如果集群中的機器擁有某些相同的配置並且這些配置信息需要動態的改變,我們可以使用發布/訂閱模式把配置做統一集中管理,讓這些機器格子各自訂閱配置信息的改變,當配置發生改變時,這些機器就可以得到通知並更新為最新的配置。

  服務發現、注冊是指對集群中的服務上下線做統一管理。每個工作服務器都可以作為數據的發布方向集群注冊自己的基本信息,而讓某些監控服務器作為訂閱方,訂閱工作服務器的基本信息,當工作服務器的基本信息發生改變如上下線、服務器角色或服務范圍變更,監控服務器可以得到通知並響應這些變化。

1.1、配置管理

  所謂的配置中心,顧名思義就是發布者將數據發布到 ZooKeeper 的一個或一系列節點上,供訂閱者進行數據訂閱,進而達到動態獲取數據的目的,實現配置信息的集中式管理和數據的動態更新。

發布/訂閱系統一般有兩種設計模式,分別是推(Push)模式和拉(Pull)模式。

  • 推模式

服務端主動將數據更新發送給所有訂閱的客戶端。

  • 拉模式

客戶端通過采用定時輪詢拉取。

ZooKeeper采用的是推拉相結合的方式:客戶端向服務端注冊自己需要關注的節點,一旦該節點的數據發生變更,那么服務端就會向相應的客戶端發送Watcher事件通知,客戶端接收到這個消息通知之后,需要主動到服務端獲取最新的數據。

  如果將配置信息存放到ZK上進行集中管理,那么通常情況下,應用在啟動的時候會主動到ZK服務器上進行一次配置信息的獲取,同時,在指定上注冊一個Watcher監聽,這樣一來,但凡配置信息發生變更,服務器都會實時通知所有訂閱的客戶端,從而達到實時獲取最新配置信息的目的。

  下面我們通過一個“配置管理”的實際案例來展示ZK在“數據發布/訂閱”場景下的使用方式。

  在我們平常的應用系統開發中,經常會碰到這樣的需求:系統中需要使用一些通用的配置信息,例如機器列表信息、運行時的開關配置、數據庫的配置信息等。這些全局配置信息通常具備以下特性:

  1)、數據量通常比較小

  2)、數據內容在運行時會發生變化

  3)、集群中各機器共享、配置一致

  對於這類配置信息,一般的做法通常可以選擇將其存儲的本地配置文件或是內存變量中。無論采取哪種配置都可以實現相應的操作。但是一旦遇到集群規模比較大的情況的話,兩種方式就不再可取。而我們還需要能夠快速的做到全部配置信息的變更,同時希望變更成本足夠小,因此我們需要一種更為分布式的解決方案。

  接下來我們以“數據庫切換”的應用場景展開,看看如何使用ZK來實現配置管理。

  配置存儲

     在進行配置管理之前,首先我們需要將初始化配置存儲到ZK上去,一般情況下,我們可以在ZK上選取一個數據節點用於配置的存儲,我們將需要集中管理的配置信息寫入到該數據節點中去。

  配置獲取

    集群中每台機器在啟動初始化階段,首先會從上面提到的ZK的配置節點上讀取數據庫信息,同時,客戶端還需要在該配置節點上注冊一個數據變更的Watcher監聽,一旦發生節點數據變更,所有訂閱的客戶端都能夠獲取數據變更通知。  

  配置變更

     在系統運行過程中,可能會出現需要進行書局切換的情況,這個時候就需要進行配置變更。借助ZK,我們只需要對ZK上配置節點的內容進行更新,ZK就能夠幫我們將數據變更的通知發送到各個客戶端,每個客戶端在接收到這個變更通知后,就可以重新進行最新數據的獲取。

1.2、服務發現、注冊

 

1.3、綜合例子

架構圖:

 

Manage Server 程序主體流程:

Work Server 程序主體流程:

 系統的核心類:

1.4、程序代碼實現

 

public class ServerConfig {

    private String dbUrl;
    private String dbPwd;
    private String dbUser;

    public String getDbUrl() {
        return dbUrl;
    }

    public void setDbUrl(String dbUrl) {
        this.dbUrl = dbUrl;
    }

    public String getDbPwd() {
        return dbPwd;
    }

    public void setDbPwd(String dbPwd) {
        this.dbPwd = dbPwd;
    }

    public String getDbUser() {
        return dbUser;
    }

    public void setDbUser(String dbUser) {
        this.dbUser = dbUser;
    }

    @Override
    public String toString() {
        return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]";
    }

}
public class ServerData {

    private String address;
    private Integer id;
    private String name;

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]";
    }
}
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

import com.alibaba.fastjson.JSON;

public class WorkServer {

    private ZkClient zkClient;
    private String configPath;
    private String serversPath;
    private ServerData serverData;
    private ServerConfig serverConfig;
    private IZkDataListener dataListener;

    public WorkServer(String configPath, String serversPath, ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
        this.zkClient = zkClient;
        this.configPath = configPath;
        this.serversPath = serversPath;
        this.serverData = serverData;
        this.serverConfig = initConfig;
        this.dataListener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {

            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
                String retJson = new String((byte[]) data);
                ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson, ServerConfig.class);
                updateConfig(serverConfigLocal);
                System.out.println("new Work server config is:" + serverConfig.toString());
            }
        };
    }

    public void start() {
        System.out.println("work server start...");
        initRunning();
    }

    public void stop() {
        System.out.println("work server stop...");
        zkClient.unsubscribeDataChanges(configPath, dataListener);
    }

    private void initRunning() {
        registMe();
        zkClient.subscribeDataChanges(configPath, dataListener);
    }

    private void registMe() {
        String mePath = serversPath.concat("/").concat(serverData.getAddress());

        try {
            zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
                    .getBytes());
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(serversPath, true);
            registMe();
        }
    }

    private void updateConfig(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
    }
}
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import com.alibaba.fastjson.JSON;

public class ManageServer {

    private String serversPath;
    private String commandPath;
    private String configPath;
    private ZkClient zkClient;
    private ServerConfig config;
    private IZkChildListener childListener;
    private IZkDataListener dataListener;
    private List<String> workServerList;

    public ManageServer(String serversPath, String commandPath,
                        String configPath, ZkClient zkClient, ServerConfig config) {
        this.serversPath = serversPath;
        this.commandPath = commandPath;
        this.zkClient = zkClient;
        this.config = config;
        this.configPath = configPath;
        this.childListener = new IZkChildListener() {

            public void handleChildChange(String parentPath,
                                          List<String> currentChilds) throws Exception {
                workServerList = currentChilds;

                System.out.println("work server list changed, new list is ");
                execList();
            }
        };
        this.dataListener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                // ignore;
            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
                String cmd = new String((byte[]) data);
                System.out.println("cmd:" + cmd);
                exeCmd(cmd);
            }
        };
    }

    private void initRunning() {
        zkClient.subscribeDataChanges(commandPath, dataListener);
        zkClient.subscribeChildChanges(serversPath, childListener);
    }

    /*
     * 1: list 2: create 3: modify
     */
    private void exeCmd(String cmdType) {
        if ("list".equals(cmdType)) {
            execList();

        } else if ("create".equals(cmdType)) {
            execCreate();
        } else if ("modify".equals(cmdType)) {
            execModify();
        } else {
            System.out.println("error command!" + cmdType);
        }
    }

    private void execList() {
        System.out.println(workServerList.toString());
    }

    private void execCreate() {
        if (!zkClient.exists(configPath)) {
            try {
                zkClient.createPersistent(configPath, JSON.toJSONString(config)
                        .getBytes());
            } catch (ZkNodeExistsException e) {
                zkClient.writeData(configPath, JSON.toJSONString(config)
                        .getBytes());
            } catch (ZkNoNodeException e) {
                String parentDir = configPath.substring(0,
                        configPath.lastIndexOf('/'));
                zkClient.createPersistent(parentDir, true);
                execCreate();
            }
        }
    }

    private void execModify() {
        config.setDbUser(config.getDbUser() + "_modify");

        try {
            zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
        } catch (ZkNoNodeException e) {
            execCreate();
        }
    }

    public void start() {
        initRunning();
    }

    public void stop() {
        zkClient.unsubscribeChildChanges(serversPath, childListener);
        zkClient.unsubscribeDataChanges(commandPath, dataListener);
    }
}
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

import com.sql.zookeeper.common.ZookeeperConstant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;

public class SubscribeZkClient {

    private static final int CLIENT_QTY = 5;

    private static final String CONFIG_PATH = "/config";
    private static final String COMMAND_PATH = "/command";
    private static final String SERVERS_PATH = "/servers";

    public static void main(String[] args) throws Exception {

        List<ZkClient> clients = new ArrayList<ZkClient>();
        List<WorkServer> workServers = new ArrayList<WorkServer>();
        ManageServer manageServer = null;

        try {
            ServerConfig initConfig = new ServerConfig();
            initConfig.setDbPwd("123456");
            initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
            initConfig.setDbUser("root");

            ZkClient clientManage = new ZkClient(ZookeeperConstant.ZK_CONNECTION_STRING, 5000, 5000, new BytesPushThroughSerializer());
            manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH, CONFIG_PATH, clientManage, initConfig);
            manageServer.start();

            for (int i = 0; i < CLIENT_QTY; ++i) {
                ZkClient client = new ZkClient(ZookeeperConstant.ZK_CONNECTION_STRING, 5000, 5000, new BytesPushThroughSerializer());
                clients.add(client);
                ServerData serverData = new ServerData();
                serverData.setId(i);
                serverData.setName("WorkServer#" + i);
                serverData.setAddress("192.168.1." + i);

                WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
                workServers.add(workServer);
                workServer.start();
            }
            System.out.println("敲回車鍵退出!\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();

        } finally {
            System.out.println("Shutting down...");

            for (WorkServer workServer : workServers) {
                try {
                    workServer.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for (ZkClient client : clients) {
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

啟動SubscribeZkClient


 

在zookeeper客戶端上輸出命令

 

managerServer訂閱了commod的變化后,輸出變化

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM