hbase replication原理分析


本文只是從總體流程來分析replication過程,很多細節沒有提及,下一篇文章准備多分析分析細節。
 
replicationSource啟動過程
org.apache.hadoop.hbase.regionserver.HRegionServer#startServiceThreads ->
org.apache.hadoop.hbase.replication.regionserver.Replication#startReplicationService ->
 //初始化replicationManager
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager#init ->
//在init階段for循環把所有的replicationPeers添加到source里,即每個replicationPeer對應一個source,也就是可以添加多個slave cluster,replicationPeers從zookeeper /hbase/replication/peers目錄取
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager#addSource ->
//在addSource階段生成ReplicationSource並啟動ReplicationSource,ReplicationSource本身是一個線程
org.apache.hadoop.hbase.replication.regionserver.ReplicationSource#startup
//ReplicationSource線程啟動,進入while循環工作
 
 
replicationSource大致工作流程
  1. while(isAlive())進行主體循環
  2. 從WAL文件獲取List<WAL.Entry>
  3. 通過調用shipEdits方法發送數據
  4. 調用replicationEndpoint replicate方法發送數據
  5. 最終調用admin.replicateWALEntry通過rpc發送數據
 
regionserver如何從slave cluster中選取regionserver當做復制節點
  1. replication過程需要連接peer(slave cluster),首先要獲取這個peer所有活着的regionservers
  2. 拿到所有regionservers信息之后,開始選擇哪些regionservers作為replication的對象
  3. 選哪些regionservers當做sink由peer活着的regionserver個數*ratio(默認值0.1)決定,regionservers先shuffle打亂順序后再截取
  4. 如果選擇的sink(regionserver)個數為0,一直等待peer上線,也就是slave cluster沒有啟動的情況
  5. 下面源碼可以解釋如何選擇regionserver當做sink
  private void connectToPeers() {
    getRegionServers();

    int sleepMultiplier = 1;

    // Connect to peer cluster first, unless we have to stop
    while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
      replicationSinkMgr.chooseSinks(); if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
          sleepMultiplier++;     //倍數最多為默認配置的300倍,也就是每次sleep最長間隔是300秒
        }
      }
    }
  }

  void chooseSinks() {
    List<ServerName> slaveAddresses = endpoint.getRegionServers();
    Collections.shuffle(slaveAddresses, random);
    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
    sinks = slaveAddresses.subList(0, numSinks);
    lastUpdateToPeers = System.currentTimeMillis();
    badReportCounts.clear();
  }

  /**
   * Do the sleeping logic
   * @param msg Why we sleep
   * @param sleepMultiplier by how many times the default sleeping time is augmented
   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
   */
  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
    try {
      if (LOG.isTraceEnabled()) {
        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
      }
      Thread.sleep(this.sleepForRetries * sleepMultiplier);
    } catch (InterruptedException e) {
      LOG.debug("Interrupted while sleeping between retries");
    }
    return sleepMultiplier < maxRetriesMultiplier;
  }

this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);

 

    總結
  1. 每個slave cluster對應一個replicationSource線程,各個slave復制互不干擾
  2. 每個replicationSource是單線程進行傳輸數據,改成多線程並發傳可能更好
  3. 數據是通過rpc發送過去,調用slave cluster regionserver RSRpcServices的replicateWALEntry方法

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM