語法:
To start:
start-balancer.sh
用默認的10%的閾值啟動balancer
hfs dfs balancer -threshold 3
start-balancer.sh -threshold 3
指定3%的閾值啟動balancer
To stop:
stop-balancer.sh
Threshold參數是1%~100%范圍內的一個數,默認值是10%。Threshold參數為集群是否處於均衡狀態設置了一個目標。
如果每一個datanode的利用率(已使用空間/節點總容量)和集群利用率(集群已使用空間/集群總容量)不超過Threshold參數值,則認為這個集群是均衡的。
Threshold參數值越小,集群會越均衡。但是以一個較小的threshold值運行balancer帶到平衡時花費的時間會更長。同時,如果設置了一個較小的threshold,但是在線上環境中,hadoop集群在進行balance時,還在並發的進行數據的寫入和刪除,所以有可能無法到達設定的平衡參數值。
每次移動,一個datanode不會接受或移出少於10G的block或少於datanode總容量threshold百分比的block。每次移動不會運行超過20分鍾。在每次移動結束,balancer會向namenode提交更新后的datanodes信息。
系統限制了balancer可以使用的帶寬最大值,由如下參數設置:
dfs.balance.bandwidthPerSec
這個參數設置了一個數據塊從一個datanode移到另一個時達到的最大速度。默認是1MB/s。該參數設置的越大,集群達到均衡的速度越快,但對應用進程帶寬資源的競爭也就越大,會導致mapred應用運行緩慢。
❤該參數在集群重啟后生效
❤該工具在一個hdfs集群中只能啟動一個實例
❤balancer在如下5種情況下會自動退出:
①集群已達到均衡狀態;
②沒有block能被移動;
③連續5次迭代移動沒有任何一個block被移動;
④當與namenode交互式出現了IOException;
⑤另一個balancer在運行中。
對應5個退出信息:
* The cluster is balanced. Exiting
* No block can be moved. Exiting...
* No block has been moved for 5 iterations. Exiting...
* Received an IO exception: failure reason. Exiting...
* Another balancer is running. Exiting...
二、下面是對Balancer類的分析:
首先看下主要的屬性:
MAX_NUM_CONCURRENT_MOVES:允許同時並發復制的塊數 默認為5
threshold:閾值 默認10%
支持的協議類型:NamenodeProtocol ClientProtocol
四個鏈表:
overUtilizedDatanodes:過載的datanode信息
aboveAvgUtilizedDatanodes:大於閾值的datanode信息
belowAvgUtilizedDatanodes:小於閾值的datanode信息
underUtilizedDatanodes:空載的datanode信息
兩個集合:
sources:源地址集合
targets:目標地址集合
兩個Map:
globalBlockList:記錄balance過程中全部塊和balance的塊的對應信息
datanodes:記錄datanode和balance的塊的對應信息
兩個線程池
moverExecutor:用於移動的線程池 默認1000
dispatcherExecutor:用於分發的線程池 默認200
首先看主要的概念類:
1、BalancerBlock類來跟蹤Balancer過程中的塊信息,其中包括塊信息和所對應的datanode的列表信息
2、BytesMoved類記錄移動的字節數
3、MovedBlocks類,這個類維護了兩個窗口,一個是舊數據,一個是最近的數據,之所以說最近是為了區別最新,要保證窗口中存儲1.5小時內被移動的塊的信息,可以通過參數dfs.balancer.movedWinWidth來配置窗口時間,過期的塊信息會被刪除。
4、NodeTask類代表了一個需要復制byte的對象,這個對象存在於source節點,包含了目標節點和byte長度
5、BalancerDatanode類主要是在balance過程中跟蹤datanode的信息,其中記錄了datanode中多個PendingBlockMove
初始化的時候設置了節點的磁盤利用率、最大可以移動的塊大小等等。
6、Source類繼承了BalancerDatanode,繼承了BalancerDatanode類中記錄的節點信息,又新定義了幾個特殊的針對源datanode的操作:
dispatchBlocks():分發線程的主方法,首先選擇要移動的塊,然后調用給代理源發送移動請求,當源節點的利用率小於閾值的時候,向請求namenode請求更多的塊,當分發了足夠的塊或是接收到了namenode發出的足夠塊信息或是超過運行的限制時間時停止(默認20分鍾)。
chooseNextBlockToMove():返回一個可以被立即分發的塊的信息
7、內部類PendingBlockMove記錄了塊移動過程中的跟蹤信息
包括要移動的塊、源地址、目標地址、代理源,這里有必要說下代理源的原理,源node先把數據塊拷貝給代理,然后由代理復制到目標節點,這么設計的好處我想應該是為了保證數據不丟失吧,但是也相當於多了一個中間環節,可以說有利有弊吧。
這個內部類提供了下面的方法:
chooseBlockAndProxy():為本次拷貝選擇要拷貝的塊和代理
markMovedIfGoodBlock(BalancerBlock block):如果塊是可以移動的,那么標記並放到movedBlocks隊列中
這里面所說的可以移動是指能馬上移動,並且已經找到了一個不繁忙的代理
dispatch():把塊分發給代理
sendRequest(DataOutputStream out):發出replace命令
receiveResponse(DataInputStream in):解析操作結果
下面分析Balancer類的主方法:
createNamenode(Configuration conf):和namenode創建一個基於NamenodeProtocol協議的連接
getUtilization(DatanodeInfo datanode):獲得datanode的使用情況
checkAndMarkRunningBalancer():用標記文件的方法保證守護進程的唯一性
chooseSource(BalancerDatanode target,Iterator<Source> sourceCandidates, boolean onRackSource) 選擇源節點,最后一個參數指出源和目標是不是同一rack,選擇好更新源隊列和目標隊列
chooseTarget(Source source,Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget)和選擇源類似
dispatchBlockMoves():提交分發請求到分發線程池中,並等待復制結束
waitForMoveCompletion():通過檢查target方法的pendingMove隊列判斷復制是否結束。
三、日志解讀
①確認基本信息
2016-05-26 16:07:54,572 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: Using a threshold of 3.0
2016-05-26 16:07:54,574 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: namenodes = [hdfs://ochadoopcluster]
2016-05-26 16:07:54,575 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: p = Balancer.Parameters[BalancingPolicy.Node, threshold=3.0]
②確認all data node lists
org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack/rack7/IP:PORT
③搜集四個鏈表,確認需要移動block的節點,以及第一次iteration需要移動的數據量
overUtilizedDatanodes:過載的datanode信息
aboveAvgUtilizedDatanodes:大於閾值的datanode信息
belowAvgUtilizedDatanodes:小於閾值的datanode信息
underUtilizedDatanodes:空載的datanode信息
2016-05-26 16:07:57,665 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: 0 over-utilized: []
2016-05-26 16:07:57,665 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: 2 underutilized: [BalancerDatanode[A, utilization=70.97845296326437], BalancerDatanode[B, utilization=77.14335325387569]]
2016-05-26 16:07:57,668 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: Need to move 2.22 TB to make the cluster balanced.
2016-05-26 16:07:57,669 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: Decided to move 10 GB bytes from C to A:50011
2016-05-26 16:07:57,669 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: Decided to move 10 GB bytes from D to B
2016-05-26 16:07:57,669 INFO org.apache.hadoop.hdfs.server.balancer.Balancer: Will move 20 GB in this iteration
其中A\C同機架,B\D同機架
④開始執行一次iteration
然后循環執行②-④直至集群均衡
目前可以確定的是,均衡原則,首先在同機架內均衡,
四、源碼解讀
均衡原則:從源節點列表和目標節點列表中各自選擇節點組成一個個對,選擇順序優先為同節點組,同機架,然后是針對所有
// all data node lists 列出所有datanode節點
// compute average utilization 計算出平均利用率
// create network topology and classify utilization collections
// over-utilized, above-average, below-average and under-utilized. 創建network topology收集四個鏈表集合
// return number of bytes to be moved in order to make the cluster balanced 返回為達到集群平衡需要移動的bytes數
/* log the over utilized & under utilized nodes */ 記錄the over utilized & under utilized
// First, match nodes on the same node group if cluster is node group aware 首先,如果集群有節點組則匹配同一組內節點
// Then, match nodes on the same rack 然后,匹配同機架內節點
// At last, match all remaining nodes 最后匹配所有剩余節點
/** Decide all pairs according to the matcher. */ 決定需要匹配的節點對
/* first step: match each overUtilized datanode (source) to one or more underUtilized datanodes (targets).*/ 匹配每個過載數據節點(src)轉移到一個或多個空載的數據節點(targets)
/* match each remaining overutilized datanode (source) to below average utilized datanodes (targets). Note only overutilized datanodes that haven't had that max bytes to move satisfied in step 1 are selected*/ 匹配剩下的過載數據節點(source)轉移到低於平均利用率的數據節點(tartget)。注意:這些數據節點為firststep中沒有達到最大移動值的過載節點。
/* match each remaining underutilized datanode (target) to above average utilized datanodes (source). Note only underutilized datanodes that have not had that max bytes to move satisfied in step 1 are selected.*/ 匹配剩下的每個空載節點(target)和高於平均利用率的數據節點(source)。轉移到低於平均利用率的數據節點(tartget)。注意:這些數據節點為firststep中沒有達到最大移動值的過載節點。
/**For each datanode, choose matching nodes from the candidates. Either the datanodes or the candidates are source nodes with (utilization > Avg), and the others are target nodes with (utilization < Avg).*/ 對每個數據節點,從候選節點中選擇匹配節點。所有高於平均利用率的節點(所有數據節點和候選節點)作為source節點,其他的低於平均率的節點作為target節點
/**For the given datanode, choose a candidate and then schedule it.@return true if a candidate is chosen; false if no candidates is chosen.*/
/** Choose a candidate for the given datanode. */
/* reset all fields in a balancer preparing for the next iteration */
/** Run an iteration for all datanodes. */
/* Decide all the nodes that will participate in the block move and the number of bytes that need to be moved from one node to another in this iteration. Maximum bytes to be moved per node is Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).*/
/* For each pair of , start a thread that repeatedly decide a block to be moved and its proxy source, then initiates the move until all bytes are moved or no more block available to move. Exit no byte has been moved for 5 consecutive iterations.*/
/** Balance all namenodes.For each iteration,for each namenode,execute a {@link Balancer} to work through all datanodes once. */
/* Given elaspedTime in ms, return a printable string */
// exclude the nodes in this set from balancing operations
//include only these nodes in balancing operations
總結:
1.balance的步驟:
1)從namenode獲取datanode磁盤使用情況
2)計算哪些節點需要把哪些數據移動到哪里(是可以跨機架均衡的)
3)分別移動,完成后刪除舊的block信息
4)循環執行,直到達到平衡標准
- 大於平均使用;
- 過度使用;
- 小於平均使用;
- 幾乎沒有被使用。
跨機架數據平衡步驟如下所示:
- 首先,數據從類型 2 機器移動到類型 4 機器;
- 其次,數據從類型 2 機器移動到類型 3 機器;
- 最后,數據從類型 1 機器移動到類型 3 機器。
以上步驟在負載沒有達到指定閾值前會迭代式執行。
*滿足了balancer的threshold百分比即haodop前台的|datanodes usages%(min)-datanodes usages%(median)|和|datanodes usages%(median)-datanodes usages%(median)|都<=threshold
