最近工作需要,看了HDFS讀寫數據塊這部分。不過可能跟網上大部分帖子不一樣,本文主要寫了${dfs.data.dir}的選擇策略,也就是block在DataNode上的放置策略。我主要是從我們工作需要的角度來讀這部分代碼的。
1 hdfs-site.xml 2 <property>
3 <name>dfs.data.dir</name>
4 <value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value>
5 </property>
所謂${dfs.data.dir}的選擇策略,就是當DataNode配置有多個${dfs.data.dir}目錄時(如上面的配置),該選擇哪個目錄來存放block。一般多個硬盤分別掛載到不同的${dfs.data.dir}下,所以存儲block是要決定block該放到哪個磁盤上。
創建文件總共有兩步:
1、在寫block之前,需要與NameNode通信來生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中發起寫請求,然后通過RPC由NameNode最終調用FSNameSystem的startFileInternal()方法來創建文件。

1 private void startFileInternal(String src, 2 PermissionStatus permissions, 3 String holder, 4 String clientMachine, 5 boolean overwrite, 6 boolean append, 7 boolean createParent, 8 short replication, 9 long blockSize 10 ) throws IOException { 11 if (NameNode.stateChangeLog.isDebugEnabled()) { 12 NameNode.stateChangeLog.debug("DIR* startFile: src=" + src 13 + ", holder=" + holder 14 + ", clientMachine=" + clientMachine 15 + ", createParent=" + createParent 16 + ", replication=" + replication 17 + ", overwrite=" + overwrite 18 + ", append=" + append); 19 } 20 21 FSPermissionChecker pc = getPermissionChecker(); 22 synchronized (this) { 23 if (isInSafeMode()) 24 throw new SafeModeException("Cannot create " + src, safeMode); 25 if (!DFSUtil.isValidName(src)) { 26 throw new IOException("Invalid name: " + src); 27 } 28 29 // Verify that the destination does not exist as a directory already. 30 boolean pathExists = dir.exists(src); 31 if (pathExists && dir.isDir(src)) { 32 throw new IOException("Cannot create "+ src + "; already exists as a directory"); 33 } 34 35 if (isPermissionEnabled) { 36 if (append || (overwrite && pathExists)) { 37 checkPathAccess(pc, src, FsAction.WRITE); 38 } else { 39 checkAncestorAccess(pc, src, FsAction.WRITE); 40 } 41 } 42 43 if (!createParent) { 44 verifyParentDir(src); 45 } 46 47 try { 48 INode myFile = dir.getFileINode(src); //根據路徑尋找該文件 49 recoverLeaseInternal(myFile, src, holder, clientMachine, false); 50 51 try { 52 verifyReplication(src, replication, clientMachine); 53 } catch (IOException e) { 54 throw new IOException("failed to create " + e.getMessage()); 55 } 56 if (append) {//若是追加操作 57 if (myFile == null) { 58 throw new FileNotFoundException("failed to append to non-existent " 59 + src + " on client " + clientMachine); 60 } else if (myFile.isDirectory()) { 61 throw new IOException("failed to append to directory " + src 62 + " on client " + clientMachine); 63 } 64 } else if (!dir.isValidToCreate(src)) { 65 if (overwrite) {//允許覆蓋原來的文件 66 delete(src, true); 67 } else { 68 throw new IOException("failed to create file " + src 69 + " on client " + clientMachine 70 + " either because the filename is invalid or the file exists"); 71 } 72 } 73 74 DatanodeDescriptor clientNode = host2DataNodeMap 75 .getDatanodeByHost(clientMachine); 76 77 if (append) { 78 // 79 // Replace current node with a INodeUnderConstruction. 80 // Recreate in-memory lease record. 81 // 82 INodeFile node = (INodeFile) myFile; 83 INodeFileUnderConstruction cons = new INodeFileUnderConstruction( 84 node.getLocalNameBytes(), node.getReplication(), 85 node.getModificationTime(), node.getPreferredBlockSize(), 86 node.getBlocks(), node.getPermissionStatus(), holder, 87 clientMachine, clientNode); 88 dir.replaceNode(src, node, cons); 89 leaseManager.addLease(cons.clientName, src); 90 91 } else { 92 // Now we can add the name to the filesystem. This file has no 93 // blocks associated with it. 94 // 95 checkFsObjectLimit(); 96 97 // increment global generation stamp 98 long genstamp = nextGenerationStamp(); 99 INodeFileUnderConstruction newNode = dir.addFile(src, permissions, 100 replication, blockSize, holder, clientMachine, clientNode, 101 genstamp); 102 if (newNode == null) { 103 throw new IOException("DIR* startFile: Unable to add to namespace"); 104 } 105 leaseManager.addLease(newNode.clientName, src); 106 if (NameNode.stateChangeLog.isDebugEnabled()) { 107 NameNode.stateChangeLog.debug("DIR* startFile: " 108 +"add "+src+" to namespace for "+holder); 109 } 110 } 111 } catch (IOException ie) { 112 NameNode.stateChangeLog.warn("DIR* startFile: " 113 +ie.getMessage()); 114 throw ie; 115 } 116 } 117 }
該方法的主要內容如下:
1)首先做一些檢查(安全模式、權限、該路徑是否已經以文件夾形式存在等)
2)若不是追加操作:
生成generation stamp(針對每個文件生成一個);並構造INodeFileUnderConstruction對象(preferredBlockSize);將這個文件添加到filesystem;添加租約(即有時間限制的寫鎖);
若是追加操作:
將src下的INodeFile替換成INodeFileUnderConstruction;添加租約;
2、在NameNode端生成文件之后,client向NameNode申請block,並將其寫入到DataNode。在上面的工作完成后,就啟動DataStreamer線程來向DataNode中寫入block。整個流程如下:
1)一些前期檢查
2)向NameNode申請block(與NameNode有一次通信)
a. 根據副本放置策略,選擇N個DataNode作為block的放置位置;
b. 隨機生成一個不重復的blockID;
c. 把該block添加到對應的文件;
3)將目標DN組織成pipeline,並向第一個DN發送Packet
選擇其中幾個比較重要的方法分析下:

1 /** 2 * The client would like to obtain an additional block for the indicated 3 * filename (which is being written-to). Return an array that consists 4 * of the block, plus a set of machines. The first on this list should 5 * be where the client writes data. Subsequent items in the list must 6 * be provided in the connection to the first datanode. 7 * 8 * Make sure the previous blocks have been reported by datanodes and 9 * are replicated. Will return an empty 2-elt array if we want the 10 * client to "try again later". 11 */ 12 //向NameNode申請block 13 public LocatedBlock getAdditionalBlock(String src, 14 String clientName, 15 HashMap<Node, Node> excludedNodes 16 ) throws IOException { 17 long fileLength, blockSize; 18 int replication; 19 DatanodeDescriptor clientNode = null; 20 Block newBlock = null; 21 22 NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: " 23 +src+" for "+clientName); 24 25 synchronized (this) { 26 if (isInSafeMode()) {//check safemode first for failing-fast 27 throw new SafeModeException("Cannot add block to " + src, safeMode); 28 } 29 // have we exceeded the configured limit of fs objects. 30 checkFsObjectLimit(); 31 32 INodeFileUnderConstruction pendingFile = checkLease(src, clientName); 33 34 // 35 // If we fail this, bad things happen! 36 // 37 if (!checkFileProgress(pendingFile, false)) { 38 throw new NotReplicatedYetException("Not replicated yet:" + src); 39 } 40 fileLength = pendingFile.computeContentSummary().getLength(); 41 blockSize = pendingFile.getPreferredBlockSize(); 42 clientNode = pendingFile.getClientNode(); 43 replication = (int)pendingFile.getReplication(); 44 } 45 46 // choose targets for the new block to be allocated. 47 //選擇副本存放的位置 48 DatanodeDescriptor targets[] = replicator.chooseTarget(src, 49 replication, 50 clientNode, 51 excludedNodes, 52 blockSize); 53 if (targets.length < this.minReplication) { 54 throw new IOException("File " + src + " could only be replicated to " + 55 targets.length + " nodes, instead of " + 56 minReplication); 57 } 58 59 // Allocate a new block and record it in the INode. 60 synchronized (this) { 61 if (isInSafeMode()) { //make sure it is not in safemode again. 62 throw new SafeModeException("Cannot add block to " + src, safeMode); 63 } 64 INode[] pathINodes = dir.getExistingPathINodes(src); 65 int inodesLen = pathINodes.length; 66 checkLease(src, clientName, pathINodes[inodesLen-1]); 67 INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) 68 pathINodes[inodesLen - 1]; 69 70 if (!checkFileProgress(pendingFile, false)) { 71 throw new NotReplicatedYetException("Not replicated yet:" + src); 72 } 73 74 // allocate new block record block locations in INode. 75 //分配block,並隨機生成一個不重復的blockID,然后在INode中記錄該block 76 newBlock = allocateBlock(src, pathINodes); 77 pendingFile.setTargets(targets); 78 79 for (DatanodeDescriptor dn : targets) { 80 dn.incBlocksScheduled(); 81 } 82 dir.persistBlocks(src, pendingFile); 83 } 84 if (persistBlocks) { 85 getEditLog().logSync(); 86 } 87 88 // Create next block 89 LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength); 90 if (isAccessTokenEnabled) { 91 b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 92 EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE))); 93 } 94 return b; 95 }
上面的方法還涉及到了塊的選擇策略,這個留在下一篇再說。下面這個圖來總結下上面方法的調用層次:
最后重點說一下block在DataNode上的存儲策略。其調度層次如下:
首先說一下其中涉及到的數據結構:
1 class FSVolume { //卷信息,代表${dfs.data.dir}
2 private File currentDir; //存放block,即${dfs.data.dir}/current
3 private FSDir dataDir; //表示currentDir有哪些塊文件
4 private File tmpDir; //存放一些臨時文件,即${dfs.data.dir}/tmp
5 private File blocksBeingWritten; //放置正在寫的block,即${dfs.data.dir}/ blocksBeingWritten
6 private File detachDir; //是否寫分離,即${dfs.data.dir}/detach
7 private DF usage; 8 private DU dfsUsage; 9 private long reserved;
1 static class FSVolumeSet { //卷信息集合,代表多個${dfs.data.dir}
2 FSVolume[] volumes = null; //代表多個FSVolume,並將其組織成一個數組
3 int curVolume = 0; //指示當前正在使用哪一個FSVolume
FSVolumeSet 代表多個${dfs.data.dir}目錄的集合,它將這些目錄組織成一個數組volumes,然后用curVolume來指示當前正在使用的是哪個${dfs.data.dir}目錄。${dfs.data.dir}的選擇策略如下:
當有多個${dfs.data.dir}時,DataNode順序地從volumes選擇一個FSVolume用來存放block(先放在blocksBeingWritten目錄下,寫完后再轉移到current目錄下);
每次寫完一個block, curVolume增1。以此實現多個${dfs.data.dir}目錄的輪流寫。該策略在FSDataSet.FSVolumeSet的getNextVolume()方法中實現。
1 synchronized FSVolume getNextVolume(long blockSize) throws IOException { 2
3 if(volumes.length < 1) { 4 throw new DiskOutOfSpaceException("No more available volumes"); 5 } 6
7 // since volumes could've been removed because of the failure 8 // make sure we are not out of bounds
9 if(curVolume >= volumes.length) { 10 curVolume = 0; 11 } 12
13 int startVolume = curVolume; 14
15 while (true) { 16 FSVolume volume = volumes[curVolume]; 17 curVolume = (curVolume + 1) % volumes.length; //增1
18 if (volume.getAvailable() > blockSize) { return volume; } 19 if (curVolume == startVolume) { 20 throw new DiskOutOfSpaceException("Insufficient space for an additional block"); 21 } 22 } 23 }
接着來說一下讀block的過程。在Map Task執行時,nextKeyValue()方法來從block中讀取數據,主要步驟如下:
1、根據創建Map Task時指定的文件偏移量和長度,來確定應該讀取哪個block,並獲取這個block的詳細信息。(與NameNode有一次通信)。
2、根據block所在的DataNode,選擇一個最好的DN,並建立與該DN的socket連接(默認不啟用本地讀)。
其方法的調用層次如下:
Map Task讀取數據是由RecordReader類來完成的。它是個接口,有兩個子類:
BlockReaderLocal:讀取本地block(不通過DataNode)
RemoteBlockReader:讀取遠程block(通過DataNode)
Map Task在讀取數據時,即使是本地數據也是使用RemoteBlockReader來讀的,也就是通過socket,默認不開啟本地讀。通過這個鏈接的方法可以開啟本地讀(http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html),也就是使用BlockReaderLocal直接來從本地讀block,而不通過DataNode。以下的分析都是基於BlockReaderLocal來完成的。
先說一下涉及到的數據結構:
1 public class BlockLocalPathInfo implements Writable { //用來描述block的位置信息
2
3 private Block block; //特定的塊文件
4 private String localBlockPath = ""; //塊文件的本地存儲路徑
5 private String localMetaPath = ""; //塊校驗文件的本地存儲路徑
1 //Stores the cache and proxy for a local datanode.
2 private static class LocalDatanodeInfo { //代表本機上的某個DataNode(一個機器上可能運行多個DataNode)
3 private final Map<Block, BlockLocalPathInfo> cache; //其中維護的表(block-->block位置信息)
1 // Multiple datanodes could be running on the local machine. Store proxies in
2 // a map keyed by the ipc port of the datanode.
3 //BlockReaderLocal中維護的表:
4 private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>(); 5 // Integer:表示端口號 6 // LocalDatanodeInfo:表示某個DataNode
1 /**
2 * This class is used by the datanode to maintain the map from a block 3 * to its metadata. 4 */
5 class DatanodeBlockInfo { //表示該DN上的所有block信息(block-->block元信息)
6
7 private FSVolume volume; //block所在的FSVolume
8 private File file; // block file
9 private boolean detached; // block的寫復制是否完成
1 //block與block元信息映射表
2 HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;
在讀block時,首先根據localDatanodeInfoMap確定要訪問的DataNode;然后從volumeMap中找到block對應的DatanodeBlockInfo信息(這其中就包括block對應的FSVolume,這是在存儲block時確定的。本文前邊有寫);然后根據DatanodeBlockInfo來構造BlockLocalPathInfo對象,將block的相關信息存放到BlockLocalPathInfo對象中。最后BlockReaderLocal根據BlockLocalPathInfo對象來讀取相應的block。 具體在BlockReaderLocal.newBlockReader()方法中。
本文基於hadoop1.2.1
如有錯誤,還請指正