HDFS的副本存放策略
HDFS采用一種稱為機架感知的策略來改進數據的可靠性、可用性和網絡帶寬的利用率。通過一個機架感知的過程,NameNode可以確定每一個DataNode所屬的機架id(這也是NameNode采用NetworkTopology數據結構來存儲數據節點的原因,也是我在前面詳細介紹NetworkTopology類的原因)。一個簡單但沒有優化的策略就是將副本存放在不同的機架上,這樣可以防止當整個機架失效時數據的丟失,並且允許讀數據的時候充分利用多個機架的帶寬。這種策略設置可以將副本均勻分布在集群中,有利於當組件失效的情況下的均勻負載,但是,因為這種策略的一個寫操作需要傳輸到多個機架,這增加了寫的代價。
在大多數情況下,副本系數是3,HDFS的存放策略是將一個副本存放在本地機架節點上,一個副本存放在同一個機架的另一個節點上,最后一個副本放在不同機架的節點上。這種策略減少了機架間的數據傳輸,提高了寫操作的效率。機架的錯誤遠遠比節點的錯誤少,所以這種策略不會影響到數據的可靠性和可用性。與此同時,因為數據塊只存放在兩個不同的機架上,所以此策略減少了讀取數據時需要的網絡傳輸總帶寬。在這種策略下,副本並不是均勻的分布在不同的機架上:三分之一的副本在一個節點上,三分之二的副本在一個機架上,其它副本均勻分布在剩下的機架中,這種策略在不損害數據可靠性和讀取性能的情況下改進了寫的性能。下面就來看看HDFS是如何來具體實現這一策略的。
NameNode是通過類來為每一分數據塊選擇副本的存放位置的,這個ReplicationTargetChooser的一般處理過程如下:
上面的流程圖詳細的描述了Hadoop-0.2.0版本中副本的存放位置的選擇策略,當然,這當中還有一些細節問題,如:如何選擇一個本地數據節點,如何選擇一個本地機架數據節點等,所以下面我還將繼續展開討論。
1.選擇一個本地節點
這里所說的本地節點是相對於客戶端來說的,也就是說某一個用戶正在用一個客戶端來向HDFS中寫數據,如果該客戶端上有數據節點,那么就應該最優先考慮把正在寫入的數據的一個副本保存在這個客戶端的數據節點上,它即被看做是本地節點,但是如果這個客戶端上的數據節點空間不足或者是當前負載過重,則應該從該數據節點所在的機架中選擇一個合適的數據節點作為此時這個數據塊的本地節點。另外,如果客戶端上沒有一個數據節點的話,則從整個集群中隨機選擇一個合適的數據節點作為此時這個數據塊的本地節點。那么,如何判定一個數據節點合不合適呢,它是通過isGoodTarget方法來確定的:
- private boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerLoc, boolean considerLoad, List<DatanodeDescriptor> results) {
- Log logr = FSNamesystem.LOG;
- // 節點不可用了
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node is (being) decommissioned");
- return false;
- }
- long remaining = node.getRemaining() - (node.getBlocksScheduled() * blockSize);
- // 節點剩余的容量夠不夠
- if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node does not have enough space");
- return false;
- }
- // 節點當前的負載情況
- if (considerLoad) {
- double avgLoad = 0;
- int size = clusterMap.getNumOfLeaves();
- if (size != 0) {
- avgLoad = (double)fs.getTotalLoad()/size;
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node is too busy");
- return false;
- }
- }
- // 該節點坐在的機架被選擇存放當前數據塊副本的數據節點過多
- String rackname = node.getNetworkLocation();
- int counter=1;
- for(Iterator<DatanodeDescriptor> iter = results.iterator(); iter.hasNext();) {
- Node result = iter.next();
- if (rackname.equals(result.getNetworkLocation())) {
- counter++;
- }
- }
- if (counter>maxTargetPerLoc) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the rack has too many chosen nodes");
- return false;
- }
- return true;
- }
實際上,選擇本地節假節點和遠程機架節點都需要以一個節點為參考,這樣才是有意義,所以在上面的流程圖中,我用紅色字體標出了參考點。那么,ReplicationTargetChooser是如何根據一個節點選擇它的一個本地機架節點呢?
這個過程很簡單,如果參考點為空,則從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點;否則就從參考節點所在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點,若這個集群中沒有合適的數據節點的話,則從已選擇的數據節點中找出一個作為新的參考點,如果找到了一個新的參考點,則從這個新的參考點在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點;否則從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點。如果新的參考點所在的機架中仍然沒有合適的數據節點,則只能從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點了。
- <font xmlns="http://www.w3.org/1999/xhtml">private DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, List<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results)throws NotEnoughReplicasException {
- // 如果參考點為空,則從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點
- if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
- }
- //從參考節點所在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點
- try {
- return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results);
- } catch (NotEnoughReplicasException e1) {
- //若這個集群中沒有合適的數據節點的話,則從已選擇的數據節點中找出一個作為新的參考點
- DatanodeDescriptor newLocal=null;
- for(Iterator<DatanodeDescriptor> iter=results.iterator(); iter.hasNext();) {
- DatanodeDescriptor nextNode = iter.next();
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
- if (newLocal != null) {//找到了一個新的參考點
- try {
- //從這個新的參考點在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點
- return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results);
- } catch(NotEnoughReplicasException e2) {
- //新的參考點所在的機架中仍然沒有合適的數據節點,從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點
- return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
- }
- } else {
- //從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點
- return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
- }
- }
- }</font>
選擇一個遠程機架節點就是隨機的選擇一個合適的不在參考點坐在的機架中的數據節點,如果沒有找到這個合適的數據節點的話,就只能從參考點所在的機架中選擇一個合適的數據節點作為此時的遠程機架節點了。
4.隨機選擇若干數據節點
這里的隨機隨機選擇若干個數據節點實際上指的是從某一個范圍內隨機的選擇若干個節點,它的實現需要利用前面提到過的NetworkTopology數據結構。隨機選擇所使用的范圍本質上指的是一個路徑,這個路徑表示的是NetworkTopology所表示的樹狀網絡拓撲圖中的一個非葉子節點,隨機選擇針對的就是這個節點的所有葉子子節點,因為所有的數據節點都被表示成了這個樹狀網絡拓撲圖中的葉子節點。
5.優化數據傳輸的路徑
以前說過,HDFS對於Block的副本copy采用的是流水線作業的方式:client把數據Block只傳給一個DataNode,這個DataNode收到Block之后,傳給下一個DataNode,依次類推,...,最后一個DataNode就不需要下傳數據Block了。所以,在為一個數據塊確定了所有的副本存放的位置之后,就需要確定這種數據節點之間流水復制的順序,這種順序應該使得數據傳輸時花費的網絡延時最小。ReplicationTargetChooser用了非常簡單的方法來考量的,大家一看便知:
- private DatanodeDescriptor[] getPipeline( DatanodeDescriptor writer, DatanodeDescriptor[] nodes) {
- if (nodes.length==0) return nodes;
- synchronized(clusterMap) {
- int index=0;
- if (writer == null || !clusterMap.contains(writer)) {
- writer = nodes[0];
- }
- for(;index<nodes.length; index++) {
- DatanodeDescriptor shortestNode = nodes[index];
- int shortestDistance = clusterMap.getDistance(writer, shortestNode);
- int shortestIndex = index;
- for(int i=index+1; i<nodes.length; i++) {
- DatanodeDescriptor currentNode = nodes[i];
- int currentDistance = clusterMap.getDistance(writer, currentNode);
- if (shortestDistance>currentDistance) {
- shortestDistance = currentDistance;
- shortestNode = currentNode;
- shortestIndex = i;
- }
- }
- //switch position index & shortestIndex
- if (index != shortestIndex) {
- nodes[shortestIndex] = nodes[index];
- nodes[index] = shortestNode;
- }
- writer = shortestNode;
- }
- }
- return nodes;
- }
