HDFS作為Hadoop中 的一個分布式文件系統,而且是專門為它的MapReduce設計,所以HDFS除了必須滿足自己作為分布式文件系統的高可靠性外,還必須為 MapReduce提供高效的讀寫性能,那么HDFS是如何做到這些的呢?首先,HDFS將每一個文件的數據進行分塊存儲,同時每一個數據塊又保存有多個 副本,這些數據塊副本分布在不同的機器節點上,這種數據分塊存儲+副本的策略是HDFS保證可靠性和性能的關鍵,這是因為:一.文件分塊存儲之后按照數據 塊來讀,提高了文件隨機讀的效率和並發讀的效率;二.保存數據塊若干副本到不同的機器節點實現可靠性的同時也提高了同一數據塊的並發讀效率;三.數據分塊 是非常切合MapReduce中任務切分的思想。在這里,副本的存放策略又是HDFS實現高可靠性和搞性能的關鍵。
HDFS采用一種稱為機架感知的策略來改進數據的可靠性、可用性和網絡帶寬的利用率。通過一個機架感知的過程,NameNode可以確定每一個 DataNode所屬的機架id(這也是NameNode采用NetworkTopology數據結構來存儲數據節點的原因,也是我在前面詳細介紹NetworkTopology類 的原因)。一個簡單但沒有優化的策略就是將副本存放在不同的機架上,這樣可以防止當整個機架失效時數據的丟失,並且允許讀數據的時候充分利用多個機架的帶 寬。這種策略設置可以將副本均勻分布在集群中,有利於當組件失效的情況下的均勻負載,但是,因為這種策略的一個寫操作需要傳輸到多個機架,這增加了寫的代 價。
在大多數情況下,副本系數是3,HDFS的存放策略是將一個副本存放在本地機架節點上,一個副本存放在同一個機架的另一個節點上,最后一個副本放在不同機 架的節點上。這種策略減少了機架間的數據傳輸,提高了寫操作的效率。機架的錯誤遠遠比節點的錯誤少,所以這種策略不會影響到數據的可靠性和可用性。與此同 時,因為數據塊只存放在兩個不同的機架上,所以此策略減少了讀取數據時需要的網絡傳輸總帶寬。在這種策略下,副本並不是均勻的分布在不同的機架上:三分之 一的副本在一個節點上,三分之二的副本在一個機架上,其它副本均勻分布在剩下的機架中,這種策略在不損害數據可靠性和讀取性能的情況下改進了寫的性能。下 面就來看看HDFS是如何來具體實現這一策略的。
NameNode是通過類來為每一分數據塊選擇副本的存放位置的,這個ReplicationTargetChooser的一般處理過程如下:
上面的流程圖詳細的描述了Hadoop-0.2.0版本中副本的存放位置的選擇策略,當然,這當中還有一些細節問題,如:如何選擇一個本地數據節點,如何選擇一個本地機架數據節點等,所以下面我還將繼續展開討論。
1.選擇一個本地節點
這里所說的本地節點是相對於客戶端來說的,也就是說某一個用戶正在用一個客戶端來向HDFS中寫數據,如果該客戶端上有數據節點,那么就應該最優先考慮把正在寫入的數據的一個副本保存在這個客戶端的數據節點上,它即被看做是本地節點,但是如果這個客戶端上的數據節點空間不足或者是當前負載過重,則應該從該數據節點所在的機架中選擇一個合適的數據節點作為此時這個數據塊的本地節點。另外,如果客戶端上沒有一個數據節點的話,則從整個集群中隨機選擇一個合適的數據節點作為此時這個數據塊的本地節點。那么,如何判定一個數據節點合不合適呢,它是通過isGoodTarget方法來確定的:
- private boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerLoc, boolean considerLoad, List<DatanodeDescriptor> results) {
- Log logr = FSNamesystem.LOG;
- // 節點不可用了
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node is (being) decommissioned");
- return false;
- }
- long remaining = node.getRemaining() - (node.getBlocksScheduled() * blockSize);
- // 節點剩余的容量夠不夠
- if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node does not have enough space");
- return false;
- }
- // 節點當前的負載情況
- if (considerLoad) {
- double avgLoad = 0;
- int size = clusterMap.getNumOfLeaves();
- if (size != 0) {
- avgLoad = (double)fs.getTotalLoad()/size;
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the node is too busy");
- return false;
- }
- }
- // 該節點坐在的機架被選擇存放當前數據塊副本的數據節點過多
- String rackname = node.getNetworkLocation();
- int counter=1;
- for(Iterator<DatanodeDescriptor> iter = results.iterator(); iter.hasNext();) {
- Node result = iter.next();
- if (rackname.equals(result.getNetworkLocation())) {
- counter++;
- }
- }
- if (counter>maxTargetPerLoc) {
- logr.debug("Node "+NodeBase.getPath(node)+ " is not chosen because the rack has too many chosen nodes");
- return false;
- }
- return true;
- }
2.選擇一個本地機架節點
實際上,選擇本地節假節點和遠程機架節點都需要以一個節點為參考,這樣才是有意義,所以在上面的流程圖中,我用紅色字體標出了參考點。那么,ReplicationTargetChooser是如何根據一個節點選擇它的一個本地機架節點呢?
這個過程很簡單,如果參考點為空,則從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點;否則就從參考節點所在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點,若這個集群中沒有合適的數據節點的話,則從已選擇的數據節點中找出一個作為新的參考點,如果找到了一個新的參考點,則從這個新的參考點在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點;否則從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點。如果新的參考點所在的機架中仍然沒有合適的數據節點,則只能從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點了。
- <font xmlns="http://www.w3.org/1999/xhtml">private DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, List<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results)throws NotEnoughReplicasException {
- // 如果參考點為空,則從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點
- if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
- }
- //從參考節點所在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點
- try {
- return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results);
- } catch (NotEnoughReplicasException e1) {
- //若這個集群中沒有合適的數據節點的話,則從已選擇的數據節點中找出一個作為新的參考點
- DatanodeDescriptor newLocal=null;
- for(Iterator<DatanodeDescriptor> iter=results.iterator(); iter.hasNext();) {
- DatanodeDescriptor nextNode = iter.next();
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
- if (newLocal != null) {//找到了一個新的參考點
- try {
- //從這個新的參考點在的機架中隨機選擇一個合適的數據節點作為此時的本地機架節點
- return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results);
- } catch(NotEnoughReplicasException e2) {
- //新的參考點所在的機架中仍然沒有合適的數據節點,從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點
- return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
- }
- } else {
- //從整個集群中隨機選擇一個合適的數據節點作為此時的本地機架節點
- return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
- }
- }
- }</font>
3.選擇一個遠程機架節點
選擇一個遠程機架節點就是隨機的選擇一個合適的不在參考點坐在的機架中的數據節點,如果沒有找到這個合適的數據節點的話,就只能從參考點所在的機架中選擇一個合適的數據節點作為此時的遠程機架節點了。
4.隨機選擇若干數據節點
這里的隨機隨機選擇若干個數據節點實際上指的是從某一個范圍內隨機的選擇若干個節點,它的實現需要利用前面提到過的NetworkTopology數據結構。隨機選擇所使用的范圍本質上指的是一個路徑,這個路徑表示的是NetworkTopology所表示的樹狀網絡拓撲圖中的一個非葉子節點,隨機選擇針對的就是這個節點的所有葉子子節點,因為所有的數據節點都被表示成了這個樹狀網絡拓撲圖中的葉子節點。
5.優化數據傳輸的路徑
以前說過,HDFS對於Block的副本copy采用的是流水線作業的方式:client把數據Block只傳給一個DataNode,這個DataNode收到Block之后,傳給下一個DataNode,依次類推,...,最后一個DataNode就不需要下傳數據Block了。所以,在為一個數據塊確定了所有的副本存放的位置之后,就需要確定這種數據節點之間流水復制的順序,這種順序應該使得數據傳輸時花費的網絡延時最小。ReplicationTargetChooser用了非常簡單的方法來考量的,大家一看便知:
- private DatanodeDescriptor[] getPipeline( DatanodeDescriptor writer, DatanodeDescriptor[] nodes) {
- if (nodes.length==0) return nodes;
- synchronized(clusterMap) {
- int index=0;
- if (writer == null || !clusterMap.contains(writer)) {
- writer = nodes[0];
- }
- for(;index<nodes.length; index++) {
- DatanodeDescriptor shortestNode = nodes[index];
- int shortestDistance = clusterMap.getDistance(writer, shortestNode);
- int shortestIndex = index;
- for(int i=index+1; i<nodes.length; i++) {
- DatanodeDescriptor currentNode = nodes[i];
- int currentDistance = clusterMap.getDistance(writer, currentNode);
- if (shortestDistance>currentDistance) {
- shortestDistance = currentDistance;
- shortestNode = currentNode;
- shortestIndex = i;
- }
- }
- //switch position index & shortestIndex
- if (index != shortestIndex) {
- nodes[shortestIndex] = nodes[index];
- nodes[index] = shortestNode;
- }
- writer = shortestNode;
- }
- }
- return nodes;
- }
可 惜的是,HDFS目前並沒有把副本存放策略的實現開放給用戶,也就是用戶無法根據自己的實際需求來指定文件的數據塊存放的具體位置。例如:我們可以將有關 系的兩個文件放到相同的數據節點上,這樣在進行map-reduce的時候,其工作效率會大大的提高。但是,又考慮到副本存放策略是與集群負載均衡休戚相 關的,所以要是真的把負載存放策略交給用戶來實現的話,對用戶來說是相當負載的
讀取
HDFS對文件的存儲是分塊來存儲的,即HDFS對於客戶端寫入的數據先按照固 定大小對這些數據進行分塊,然后把每一個數據塊的多個副本存儲在不同的DataNode節點上,同時不同的數據塊也可能存儲在不同的DataNode節點 上。那么,當客戶端要從HDFS上讀取某個文件時,它又是如何處理的呢?很明顯,客戶端也是按照分塊來讀取文件的數據的,關於客戶端如何分塊讀取文件的詳 細原理與過程我已經在前面的博文中詳細的闡述了,在這里就不多贅述了。但是,一個數據塊有多個副本,客戶端到底優先讀取那個DataNode節點上的該數 據塊的副本呢?這將是本要所要討論的重點了,即HDFS讀取副本的選擇策略,而這個工作具體是由NameNode來完成的。
當客戶端讀到一個文件的某個數據塊時,它就需要向NameNode節點詢問這個數據塊存儲在那些DataNode節點上,這個過程如下圖:
當然,客戶端至少需要向NameNode傳輸三個參數:文件路徑、讀取文件的起始位 置、讀取文件的長度,而NameNode會向該客戶端返回它所要讀取文件內容所在的位置——LocatedBlock對象,該對象包含對應的數據塊所有副 本所在的DataNode節點的位置信息,客戶端接着就會依次從這些DataNode節點上讀取該數據塊,直到成功讀取為止。這里就設計到了一個優化問題 了:客戶端應該總是選擇從距離它最近的可用DataNode節點上讀取需要的數據塊,所以此時的關鍵就是如何來計算客戶端與DataNode節點之間的距離。這個計算距離的問題本質上涉及到我前面介紹過的一種數據結構——NetworkTopology,這種數據結構被NameNode節點用來形象的表示HDFS集群中所有DataNode節點的物理位置,自然這個優化工作就交由NameNode來處理了。
NameNode對文件的讀優化的實現很簡單,基本原理就是按照客戶端與DataNode節點之間的距離進行排序,距客戶端越近的DataNode節點越被放在LocatedBlock的前面,該算法的基本思路如下:
1.如果該Block的一個副本存在於客戶端,則客戶端優先從本地讀取該數據塊;
2.如果該Block的一個副本與客戶端在同一個機架上,且沒有一個副本存放在客戶端,則客戶端優先讀取這個同機架上的副本;否則客戶端優先讀取同機器的副本,失敗的情況下然后再優先考慮這個同機架上的副本;
3.如果該Block既沒有一個副本存在客戶端,又沒有一個副本與客戶端在同一個機架上,則隨機選擇一個DataNode節點作為優先節點。
其詳細實現如下:
- /**
- * @param reader 客戶端
- * @param nodes 某個Block所在的DataNode節點
- */
- public void pseudoSortByDistance( Node reader, Node[] nodes ) {
- int tempIndex = 0;
- if (reader != null ) {
- int localRackNode = -1;
- //scan the array to find the local node & local rack node
- for(int i=0; i<nodes.length; i++) {
- if(tempIndex == 0 && reader == nodes[i]) { //local node
- //swap the local node and the node at position 0
- if( i != 0 ) {
- swap(nodes, tempIndex, i);
- }
- tempIndex=1;
- if(localRackNode != -1 ) {
- if(localRackNode == 0) {
- localRackNode = i;
- }
- break;
- }
- } else if(localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
- //local rack
- localRackNode = i;
- if(tempIndex != 0 ) break;
- }
- }
- // swap the local rack node and the node at position tempIndex
- if(localRackNode != -1 && localRackNode != tempIndex ) {
- swap(nodes, tempIndex, localRackNode);
- tempIndex++;
- }
- }
- // put a random node at position 0 if it is not a local/local-rack node
- if(tempIndex == 0 && nodes.length != 0) {
- swap(nodes, 0, r.nextInt(nodes.length));
- }
- }
從上面的具體實現可以看,這種優化策略存在一個很大的缺陷:完全沒有考慮到DataNode上的負載情況,從而致使HDFS的讀性能急速下降。這一點也正好反映了HDFS好友很大的優化空間,但遺憾的是,HDFS對這種讀Block時的副本選擇策略並沒有開放給開發者,以至於大大的增加了優化的難度。