深入剖析SolrCloud(三)


上一篇中介紹了SolrCloud的第一個模塊---構建管理solr集群狀態信息的zookeeper集群。當我們在solr服務器啟動時擁有了這樣一個Zookeeper集群后,顯然我們需要連接到Zookeeper集群的方便手段,在這一篇中我將對Zookeeper客戶端相關的各個封裝類進行分析。

SolrZkClient類是Solr服務器用來與Zookeeper集群進行通信的接口類,它包含的主要組件有:

   private ConnectionManager connManager;
   private  volatile SolrZooKeeper keeper;
   private ZkCmdExecutor zkCmdExecutor =  new ZkCmdExecutor();

    其中ConnectionManagerWatcher的實現類,主要負責對客戶端與Zookeeper集群之間連接的狀態變化信息進行響應,關於Watcher的詳細介紹,可以參考http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkWatches

SolrZooKeeper類是一個包裝類,沒有實際意義,ZkCmdExecutor類是負責在連接失敗的情況下,重試某種操作特定次數,具體的操作是ZkOperation這個抽象類的具體實現子類,其execute方法中包含了具體操作步驟,這些操作包括新建一個Znode節點,讀取Znode節點數據,創建Znode路徑,刪除Znode節點等Zookeeper操作。

首先來看它的構造函數,先創建ConnectionManager對象來響應兩端之間的狀態變化信息,然后ZkClientConnectionStrategy類是一個連接策略抽象類,它包含連接和重連兩種策略,並且采用模板方法模式,具體的實現是通過靜態累不類ZkUpdate來實現的,DefaultConnectionStrategy是它的一個實現子類,它覆寫了connectreconnect兩個連接策略方法。

   public SolrZkClient(String zkServerAddress,  int zkClientTimeout,
      ZkClientConnectionStrategy strat,  final OnReconnect onReconnect,  int clientConnectTimeout)  throws InterruptedException,
      TimeoutException, IOException {
    connManager =  new ConnectionManager("ZooKeeperConnection Watcher:"
        + zkServerAddress,  this, zkServerAddress, zkClientTimeout, strat, onReconnect);
    strat.connect(zkServerAddress, zkClientTimeout, connManager,
         new ZkUpdate() {
          @Override
           public  void update(SolrZooKeeper zooKeeper) {
            SolrZooKeeper oldKeeper = keeper;
            keeper = zooKeeper;
             if (oldKeeper !=  null) {
               try {
                oldKeeper.close();
              }  catch (InterruptedException e) {
                 //  Restore the interrupted status
                Thread.currentThread().interrupt();
                log.error("", e);
                 throw  new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
                    "", e);
              }
            }
          }
        });
    connManager.waitForConnected(clientConnectTimeout);
    numOpens.incrementAndGet();
  }

值得注意的是,構造函數中生成的ZkUpdate匿名類對象,它的update方法會被調用,

在這個方法里,會首先將已有的老的SolrZooKeeperg關閉掉,然后放置上一個新的SolrZooKeeper。做好這些准備工作以后,就會去連接Zookeeper服務器集群,

connManager.waitForConnected(clientConnectTimeout);//連接zk服務器集群,默認30秒超時時間

其實具體的連接動作是new SolrZooKeeper(serverAddress, timeout, watcher)引發的,上面那句代碼只是在等待指定時間,看是否已經連接上。

如果連接Zookeeper服務器集群成功,那么就可以進行Zookeeper的常規操作了:

1) 是否已經連接

   public  boolean isConnected() {
     return keeper !=  null && keeper.getState() == ZooKeeper.States.CONNECTED;
  }

2)  是否存在某個路徑的Znode

   public Stat exists( final String path,  final Watcher watcher,  boolean retryOnConnLoss)  throws KeeperException, InterruptedException {
     if (retryOnConnLoss) {
       return zkCmdExecutor.retryOperation( new ZkOperation() {
        @Override
         public Stat execute()  throws KeeperException, InterruptedException {
           return keeper.exists(path, watcher);
        }
      });
    }  else {
       return keeper.exists(path, watcher);
    }
  }

3) 創建一個Znode節點

   public String create( final String path,  final  byte data[],  final List<ACL> acl,  final CreateMode createMode,  boolean retryOnConnLoss)  throws KeeperException, InterruptedException {
     if (retryOnConnLoss) {
       return zkCmdExecutor.retryOperation( new ZkOperation() {
        @Override
         public String execute()  throws KeeperException, InterruptedException {
           return keeper.create(path, data, acl, createMode);
        }
      });
    }  else {
       return keeper.create(path, data, acl, createMode);
    }
  }

4)  獲取指定路徑下的孩子Znode節點

   public List<String> getChildren( final String path,  final Watcher watcher,  boolean retryOnConnLoss)  throws KeeperException, InterruptedException {
     if (retryOnConnLoss) {
       return zkCmdExecutor.retryOperation( new ZkOperation() {
        @Override
         public List<String> execute()  throws KeeperException, InterruptedException {
           return keeper.getChildren(path, watcher);
        }
      });
    }  else {
       return keeper.getChildren(path, watcher);
    }
  }

5) 獲取指定Znode上附加的數據

   public  byte[] getData( final String path,  final Watcher watcher,  final Stat stat,  boolean retryOnConnLoss)  throws KeeperException, InterruptedException {
     if (retryOnConnLoss) {
       return zkCmdExecutor.retryOperation( new ZkOperation() {
        @Override
         public  byte[] execute()  throws KeeperException, InterruptedException {
           return keeper.getData(path, watcher, stat);
        }
      });
    }  else {
       return keeper.getData(path, watcher, stat);
    }
  }

6)  在指定Znode上設置數據

   public Stat setData( final String path,  final  byte data[],  final  int version,  boolean retryOnConnLoss)  throws KeeperException, InterruptedException {
     if (retryOnConnLoss) {
       return zkCmdExecutor.retryOperation( new ZkOperation() {
        @Override
         public Stat execute()  throws KeeperException, InterruptedException {
           return keeper.setData(path, data, version);
        }
      });
    }  else {
       return keeper.setData(path, data, version);
    }
  }

7) 創建路徑

   public  void makePath(String path,  byte[] data, CreateMode createMode, Watcher watcher,  boolean failOnExists,  boolean retryOnConnLoss)  throws KeeperException, InterruptedException {
     if (log.isInfoEnabled()) {
      log.info("makePath: " + path);
    }
     boolean retry =  true;
    
     if (path.startsWith("/")) {
      path = path.substring(1, path.length());
    }
    String[] paths = path.split("/");
    StringBuilder sbPath =  new StringBuilder();
     for ( int i = 0; i < paths.length; i++) {
       byte[] bytes =  null;
      String pathPiece = paths[i];
      sbPath.append("/" + pathPiece);
       final String currentPath = sbPath.toString();
      Object exists = exists(currentPath, watcher, retryOnConnLoss);
       if (exists ==  null || ((i == paths.length -1) && failOnExists)) {
        CreateMode mode = CreateMode.PERSISTENT;
         if (i == paths.length - 1) {
          mode = createMode;
          bytes = data;
           if (!retryOnConnLoss) retry =  false;
        }
         try {
           if (retry) {
             final CreateMode finalMode = mode;
             final  byte[] finalBytes = bytes;
            zkCmdExecutor.retryOperation( new ZkOperation() {
              @Override
               public Object execute()  throws KeeperException, InterruptedException {
                keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode);
                 return  null;
              }
            });
          }  else {
            keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
          }
        }  catch (NodeExistsException e) {
          
           if (!failOnExists) {
             //  TODO: version ? for now, don't worry about race
            setData(currentPath, data, -1, retryOnConnLoss);
             //  set new watch
            exists(currentPath, watcher, retryOnConnLoss);
             return;
          }
          
           //  ignore unless it's the last node in the path
           if (i == paths.length - 1) {
             throw e;
          }
        }
         if(i == paths.length -1) {
           //  set new watch
          exists(currentPath, watcher, retryOnConnLoss);
        }
      }  else  if (i == paths.length - 1) {
         //  TODO: version ? for now, don't worry about race
        setData(currentPath, data, -1, retryOnConnLoss);
         //  set new watch
        exists(currentPath, watcher, retryOnConnLoss);
      }
    }
  }

8) 刪除指定Znode

   public  void delete( final String path,  final  int version,  boolean retryOnConnLoss)  throws InterruptedException, KeeperException {
     if (retryOnConnLoss) {
      zkCmdExecutor.retryOperation( new ZkOperation() {
        @Override
         public Stat execute()  throws KeeperException, InterruptedException {
          keeper.delete(path, version);
           return  null;
        }
      });
    }  else {
      keeper.delete(path, version);
    }
  }

         我們再回過頭來看看ConnectionManager類是如何響應兩端的連接狀態信息的變化的,它最重要的方法是process方法,當它被觸發回調時,會從WatchedEvent參數中得到事件的各種狀態信息,比如連接成功,會話過期(此時需要進行重連),連接斷開等。

   public  synchronized  void process(WatchedEvent event) {
     if (log.isInfoEnabled()) {
      log.info("Watcher " +  this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType());
    }

    state = event.getState();
     if (state == KeeperState.SyncConnected) {
      connected =  true;
      clientConnected.countDown();
    }  else  if (state == KeeperState.Expired) {
      connected =  false;
      log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
       // 嘗試重新連接zk服務器
       try {
        connectionStrategy.reconnect(zkServerAddress, zkClientTimeout,  this,
             new ZkClientConnectionStrategy.ZkUpdate() {
              @Override
               public  void update(SolrZooKeeper keeper)  throws InterruptedException, TimeoutException, IOException {
                 synchronized (connectionStrategy) {
                  waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
                  client.updateKeeper(keeper);
                   if (onReconnect !=  null) {
                    onReconnect.command();
                  }
                   synchronized (ConnectionManager. this) {
                    ConnectionManager. this.connected =  true;
                  }
                }
                
              }
            });
      }  catch (Exception e) {
        SolrException.log(log, "", e);
      }
      log.info("Connected:" + connected);
    }  else  if (state == KeeperState.Disconnected) {
      connected =  false;
    }  else {
      connected =  false;
    }
    notifyAll();
  }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM