上篇Hadoop之HDFS原理及文件上傳下載源碼分析(上)樓主主要介紹了hdfs原理及FileSystem的初始化源碼解析, Client如何與NameNode建立RPC通信。本篇將繼續介紹hdfs文件上傳、下載源解析。
文件上傳
先上文件上傳的方法調用過程時序圖:

其主要執行過程:
- FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(樓主上篇已經介紹過了)
- 調用FileSystem的create()方法,由於實現類為DistributedFileSystem,所有是調用該類中的create()方法
- DistributedFileSystem持有DFSClient的引用,繼續調用DFSClient中的create()方法
- DFSOutputStream提供的靜態newStreamForCreate()方法中調用NameNodeRpcServer服務端的create()方法並創建DFSOutputStream輸出流對象返回
- 通過hadoop提供的IOUtil工具類將輸出流輸出到本地
下面我們來看下源碼:
首先初始化文件系統,建立與服務端的RPC通信
1 HDFSDemo.java 2 OutputStream os = fs.create(new Path("/test.log"));
調用FileSystem的create()方法,由於FileSystem是一個抽象類,這里實際上是調用的該類的子類create()方法
1 //FileSystem.java 2 public abstract FSDataOutputStream create(Path f, 3 FsPermission permission, 4 boolean overwrite, 5 int bufferSize, 6 short replication, 7 long blockSize, 8 Progressable progress) throws IOException;
前面我們已經說過FileSystem.get()返回的是DistributedFileSystem對象,所以這里我們直接進入DistributedFileSystem:

1 //DistributedFileSystem.java 2 @Override 3 public FSDataOutputStream create(final Path f, final FsPermission permission, 4 final EnumSet<CreateFlag> cflags, final int bufferSize, 5 final short replication, final long blockSize, final Progressable progress, 6 final ChecksumOpt checksumOpt) throws IOException { 7 statistics.incrementWriteOps(1); 8 Path absF = fixRelativePart(f); 9 return new FileSystemLinkResolver<FSDataOutputStream>() { 10 @Override 11 public FSDataOutputStream doCall(final Path p) 12 throws IOException, UnresolvedLinkException { 13 final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, 14 cflags, replication, blockSize, progress, bufferSize, 15 checksumOpt); 16 //dfs為DistributedFileSystem所持有的DFSClient對象,這里調用DFSClient中的create()方法 17 return dfs.createWrappedOutputStream(dfsos, statistics); 18 } 19 @Override 20 public FSDataOutputStream next(final FileSystem fs, final Path p) 21 throws IOException { 22 return fs.create(p, permission, cflags, bufferSize, 23 replication, blockSize, progress, checksumOpt); 24 } 25 }.resolve(this, absF); 26 }
DFSClient的create()返回一個DFSOutputStream對象:
1 //DFSClient.java 2 public DFSOutputStream create(String src, 3 FsPermission permission, 4 EnumSet<CreateFlag> flag, 5 boolean createParent, 6 short replication, 7 long blockSize, 8 Progressable progress, 9 int buffersize, 10 ChecksumOpt checksumOpt, 11 InetSocketAddress[] favoredNodes) throws IOException { 12 checkOpen(); 13 if (permission == null) { 14 permission = FsPermission.getFileDefault(); 15 } 16 FsPermission masked = permission.applyUMask(dfsClientConf.uMask); 17 if(LOG.isDebugEnabled()) { 18 LOG.debug(src + ": masked=" + masked); 19 } 20 //調用DFSOutputStream的靜態方法newStreamForCreate,返回輸出流 21 final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, 22 src, masked, flag, createParent, replication, blockSize, progress, 23 buffersize, dfsClientConf.createChecksum(checksumOpt), 24 getFavoredNodesStr(favoredNodes)); 25 beginFileLease(result.getFileId(), result); 26 return result; 27 }
我們繼續看下newStreamForCreate()中的業務邏輯:
1 //DFSOutputStream.java 2 static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, 3 FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, 4 short replication, long blockSize, Progressable progress, int buffersize, 5 DataChecksum checksum, String[] favoredNodes) throws IOException { 6 TraceScope scope = 7 dfsClient.getPathTraceScope("newStreamForCreate", src); 8 try { 9 HdfsFileStatus stat = null; 10 boolean shouldRetry = true; 11 int retryCount = CREATE_RETRY_COUNT; 12 while (shouldRetry) { 13 shouldRetry = false; 14 try { 15 //這里通過dfsClient的NameNode代理對象調用NameNodeRpcServer中實現的create()方法 16 stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, 17 new EnumSetWritable<CreateFlag>(flag), createParent, replication, 18 blockSize, SUPPORTED_CRYPTO_VERSIONS); 19 break; 20 } catch (RemoteException re) { 21 IOException e = re.unwrapRemoteException( 22 AccessControlException.class, 23 DSQuotaExceededException.class, 24 FileAlreadyExistsException.class, 25 FileNotFoundException.class, 26 ParentNotDirectoryException.class, 27 NSQuotaExceededException.class, 28 RetryStartFileException.class, 29 SafeModeException.class, 30 UnresolvedPathException.class, 31 SnapshotAccessControlException.class, 32 UnknownCryptoProtocolVersionException.class); 33 if (e instanceof RetryStartFileException) { 34 if (retryCount > 0) { 35 shouldRetry = true; 36 retryCount--; 37 } else { 38 throw new IOException("Too many retries because of encryption" + 39 " zone operations", e); 40 } 41 } else { 42 throw e; 43 } 44 } 45 } 46 Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); 47 //new輸出流對象 48 final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, 49 flag, progress, checksum, favoredNodes); 50 out.start();//調用內部類DataStreamer的start()方法,DataStreamer繼承Thread,所以說這是一個線程,從NameNode中申請新的block信息;
同時前面我們介紹hdfs原理的時候提到的流水線作業(Pipeline)也是在這里實現,有興趣的同學可以去研究下,這里就不帶大家看了 51 return out; 52 } finally { 53 scope.close(); 54 } 55 }
到此,Client拿到了服務端的輸出流對象,那么后面就容易了,都是一些簡答的文件輸出,輸入流的操作(hadoop提供的IOUitl)。
文件下載
文件上傳的大致流程與文件下載類似,與上傳一樣,我們先上程序方法調用時序圖:

主要執行過程:
- FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(與前面一樣)
- 調用FileSystem的open()方法,由於實現類為DistributedFileSystem,所有是調用該類中的open()方法
- DistributedFileSystem持有DFSClient的引用,繼續調用DFSClient中的open()方法
- 實例化DFSInputStream輸入流
- 調用openinfo()方法
- 調用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息並獲取最后block長度
- 調用DFSClient中的getLocatedBlocks()方法,獲取block信息
- 在callGetBlockLocations()方法中通過NameNode代理對象調用NameNodeRpcServer的getBlockLocations()方法
- 將block信息寫入輸出流
- 交給IOUtil,下載文件到本地
接下來,我們開始看源碼:
首先任然是FileSystem的初始化,前面有,這里就不貼出來了,我們直接從DistributedFileSystem的open()開始看。
1 //DistributedFifeSystem.java 2 @Override 3 public FSDataInputStream open(Path f, final int bufferSize) 4 throws IOException { 5 statistics.incrementReadOps(1); 6 Path absF = fixRelativePart(f); 7 return new FileSystemLinkResolver<FSDataInputStream>() { 8 @Override 9 public FSDataInputStream doCall(final Path p) 10 throws IOException, UnresolvedLinkException { 11 final DFSInputStream dfsis = 12 dfs.open(getPathName(p), bufferSize, verifyChecksum); 13 //dfs為DFSClient對象,調用open()返回輸入流 14 return dfs.createWrappedInputStream(dfsis); 15 } 16 @Override 17 public FSDataInputStream next(final FileSystem fs, final Path p) 18 throws IOException { 19 return fs.open(p, bufferSize); 20 } 21 }.resolve(this, absF); 22 }
DFSClient中並沒有直接使用NameNode的代理對象,而是傳給了DFSInputStream:
1 //DFSClient.java 2 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) 3 throws IOException, UnresolvedLinkException { 4 checkOpen(); 5 TraceScope scope = getPathTraceScope("newDFSInputStream", src); 6 try { 7 //這里並沒有直接通過NameNode的代理對象調用服務端的方法,直接new輸入流並把當前對象作為參數傳入 8 return new DFSInputStream(this, src, verifyChecksum); 9 } finally { 10 scope.close(); 11 } 12 }
那么在DFSInputStream必須持有DFSClient的引用:
1 //DFSInputStream.java 構造 2 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum 3 ) throws IOException, UnresolvedLinkException { 4 this.dfsClient = dfsClient;//只有DFSClient的引用 5 this.verifyChecksum = verifyChecksum; 6 this.src = src; 7 synchronized (infoLock) { 8 this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); 9 } 10 openInfo();//調openInfo() 11 }
openInfo()用來抓取block信息:
1 void openInfo() throws IOException, UnresolvedLinkException { 2 synchronized(infoLock) { 3 lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();//抓取block信息 4 int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;//獲取配置信息,嘗試抓取的次數,樓主記得在2.6以前這里寫的3;當然,現在的默認值也為3 5 while (retriesForLastBlockLength > 0) { 6 if (lastBlockBeingWrittenLength == -1) { 7 DFSClient.LOG.warn("Last block locations not available. " 8 + "Datanodes might not have reported blocks completely." 9 + " Will retry for " + retriesForLastBlockLength + " times"); 10 waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength); 11 lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); 12 } else { 13 break; 14 } 15 retriesForLastBlockLength--; 16 } 17 if (retriesForLastBlockLength == 0) { 18 throw new IOException("Could not obtain the last block locations."); 19 } 20 } 21 }
獲取block信息:
1 //DFSInputStream.java 2 private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { 3 final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0); 4 //回到DFSClient中來獲取當前block信息 5 if (DFSClient.LOG.isDebugEnabled()) { 6 DFSClient.LOG.debug("newInfo = " + newInfo); 7 } 8 if (newInfo == null) { 9 throw new IOException("Cannot open filename " + src); 10 } 11 12 if (locatedBlocks != null) { 13 Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); 14 Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); 15 while (oldIter.hasNext() && newIter.hasNext()) { 16 if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { 17 throw new IOException("Blocklist for " + src + " has changed!"); 18 } 19 } 20 } 21 locatedBlocks = newInfo; 22 long lastBlockBeingWrittenLength = 0; 23 if (!locatedBlocks.isLastBlockComplete()) { 24 final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); 25 if (last != null) { 26 if (last.getLocations().length == 0) { 27 if (last.getBlockSize() == 0) { 28 return 0; 29 } 30 return -1; 31 } 32 final long len = readBlockLength(last); 33 last.getBlock().setNumBytes(len); 34 lastBlockBeingWrittenLength = len; 35 } 36 } 37 38 fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); 39 //返回block開始寫的位置 40 return lastBlockBeingWrittenLength; 41 }
回到DFSClient中:
1 DFSClient.java 2 @VisibleForTesting 3 public LocatedBlocks getLocatedBlocks(String src, long start, long length) 4 throws IOException { 5 TraceScope scope = getPathTraceScope("getBlockLocations", src); 6 try { 7 //這里NameNode作為參數傳遞到callGetBlockLocations()中 8 return callGetBlockLocations(namenode, src, start, length); 9 } finally { 10 scope.close(); 11 } 12 }
調用服務端方法,返回block信息:
1 //DFSClient.java 2 static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, 3 String src, long start, long length) 4 throws IOException { 5 try { 6 //看到這里,不用做過多的解釋了吧? 7 return namenode.getBlockLocations(src, start, length); 8 } catch(RemoteException re) { 9 throw re.unwrapRemoteException(AccessControlException.class, 10 FileNotFoundException.class, 11 UnresolvedPathException.class); 12 } 13 }
最終將文件block相關信息寫入輸入流,通過工具類IOUtil輸出到本地文件。
那關於hadoop之hdfs原理及文件上傳下載源碼解析就寫到這里,下系列的文章,樓主會寫一些關於mapreduce或者hive相關的文章分享給大家。
示例代碼地址:https://github.com/LJunChina/hadoop
