設置ZooKeeper服務器地址列表源碼解析及擴展


設置ZooKeeper服務器地址列表源碼解析及擴展

ZooKeeper zooKeeper = new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new ZooKeeperFirstBlood());

在創建zk連接的時候,必須要獲取到zk服務器集群的地址,最簡單的方式是在構造函數中傳入ip:port,ip2:port2,...,ipn:portn的形式,優勢是簡單,劣勢也很明顯,擴展性不強,一旦zk集群發生變動,整個就gg了。

本文主要先分析zk是如何去解析服務器地址,然后會給去使用動態獲取zk服務器集群的方式。

zk最簡單的解析

zk在構造函數對傳入的connectString進行解析構造了 ConnectStringParser connectStringParser = new ConnectStringParser(connectString); 對象,ConnectStringParser是一個服務器地址列表解析器,主要作用是解析根目錄和獲取zk server地址

  1. 定義根目錄

    相當於Linux的chroot命令,如果客戶端設置了這個屬性,那么所有的操作都會被限制在該命名空間下。具體使用方式:

    new Zookeeper("192.168.0.1:2181,192.168.0.2:2181/zktest/wpr")

  2. 解析zk地址

      String hostsList[] = connectString.split(",");
      for (String host : hostsList) {
     int port = DEFAULT_PORT;
     int pidx = host.lastIndexOf(':');
     if (pidx >= 0) {
         // otherwise : is at the end of the string, ignore
         if (pidx < host.length() - 1) {
             port = Integer.parseInt(host.substring(pidx + 1));
         }
         host = host.substring(0, pidx);
     }
       serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
    
  3. 生成地址列表管理器HostProvider

    在ConnectStringParser解析器中將字符串封裝為List 對象,經過處理后地址列表會被進一步封裝到StaticHostPorvider類中。

   /**
     * The next host to try to connect to.
     * 
     * For a spinDelay of 0 there should be no wait.
 * 
 * 用於返回一個InetSocketAddress地址,這個方法的調用必須返回一個InetSocketAddress,不能為null和報錯
     * @param spinDelay
     *            Milliseconds to wait if all hosts have been tried once.
     */
    public InetSocketAddress next(long spinDelay);
    /**
     * Notify the HostProvider of a successful connection.
     * 
     * The HostProvider may use this notification to reset it's inner state.
     */
    public void onConnected();

環形解析很多書已經解釋過了,不是本文的重點

問題:

  1. zk服務器一旦遷移或者個別機器變更,會導致大批客戶端應用變更。
  2. 部分情況下,為了增進系統的穩定性和容災特性,需要配置一些特殊的規則,原來的環形解析肯定是不滿足需求的

針對上面,主流的解決方案:地址列表管理器能夠定時從DNS或者一個配置管理中心上解析出Zk服務器值列表,如果這個列表變更了,可以同時更新到serverAddress集合,這樣在下次獲取服務器地址的時候(調用next),可以獲取到最新的服務器地址。

整體結構:

核心代碼

本次主要是從配置中心去獲取zk服務器的地址,在創建ZooKeeper對象的時候,僅僅使用一個HTTP地址,建立一個HTTP的長連接,去這個服務器上獲取zk地址列表,定時去獲取更新地址

public class DynamicHTTPHostProvider implements HostProvider {
  /**
 * zk服務器列表
 */
private final List<InetSocketAddress> domainAddresses = new ArrayList(5);
	

public DynamicHTTPHostProvider(String domainURL)  {
    ADDRESS_SERVER_URL = domainURL;
    start();
}

 public synchronized void start(){
    //這個其實可以重新寫到一個類里面
    if(isStart){
        log.warn("DynamicHTTPHostProvider already run");
        return ;
    }
    GetServerListTask getServersTask = new GetServerListTask(ADDRESS_SERVER_URL);
    for (int i = 0; i < 3 && domainAddresses.isEmpty(); ++i) {
        getServersTask.run();
        try {
            Thread.sleep(100L);
        } catch (Exception e) {
        }
    }
    if (domainAddresses.isEmpty()) {
        log.error("DynamicHTTPHostProvider-0001|cannnot get zookeeper address");
        throw new RuntimeException("fail to get zk-server serverlist! env:" + ADDRESS_SERVER_URL);
    }
    TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
    isStart = true;
}

在getServerListTask的線程中,去獲取並解析地址

class GetServerListTask implements Runnable {
    final String url;

    GetServerListTask(String url) {
        this.url = url;
    }
    public void run() {
        //獲取服務器地址
        List<String> result = getZkServerList();
        updateIfChanged(result);
    }

    private synchronized void  updateIfChanged(List<String> result) {
		//更新domainAddresses
}

備注

詳細代碼可以查看 https://github.com/wpr7280/zktest


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM