Hadoop之HDFS原理及文件上傳下載源碼分析(下)


  上篇Hadoop之HDFS原理及文件上傳下載源碼分析(上)樓主主要介紹了hdfs原理及FileSystem的初始化源碼解析, Client如何與NameNode建立RPC通信。本篇將繼續介紹hdfs文件上傳、下載源解析。

文件上傳

  先上文件上傳的方法調用過程時序圖:

  

  

   其主要執行過程:

  1.    FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(樓主上篇已經介紹過了)
  2.    調用FileSystem的create()方法,由於實現類為DistributedFileSystem,所有是調用該類中的create()方法
  3.    DistributedFileSystem持有DFSClient的引用,繼續調用DFSClient中的create()方法
  4.    DFSOutputStream提供的靜態newStreamForCreate()方法中調用NameNodeRpcServer服務端的create()方法並創建DFSOutputStream輸出流對象返回
  5.    通過hadoop提供的IOUtil工具類將輸出流輸出到本地

  下面我們來看下源碼:

  首先初始化文件系統,建立與服務端的RPC通信

  

1 HDFSDemo.java
2 OutputStream os = fs.create(new Path("/test.log"));

 

  調用FileSystem的create()方法,由於FileSystem是一個抽象類,這里實際上是調用的該類的子類create()方法

  

1  //FileSystem.java
2 public abstract FSDataOutputStream create(Path f,
3       FsPermission permission,
4       boolean overwrite,
5       int bufferSize,
6       short replication,
7       long blockSize,
8       Progressable progress) throws IOException;

   前面我們已經說過FileSystem.get()返回的是DistributedFileSystem對象,所以這里我們直接進入DistributedFileSystem:

 

  

 1   //DistributedFileSystem.java
 2 @Override
 3   public FSDataOutputStream create(final Path f, final FsPermission permission,
 4     final EnumSet<CreateFlag> cflags, final int bufferSize,
 5     final short replication, final long blockSize, final Progressable progress,
 6     final ChecksumOpt checksumOpt) throws IOException {
 7     statistics.incrementWriteOps(1);
 8     Path absF = fixRelativePart(f);
 9     return new FileSystemLinkResolver<FSDataOutputStream>() {
10       @Override
11       public FSDataOutputStream doCall(final Path p)
12           throws IOException, UnresolvedLinkException {
13         final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
14                 cflags, replication, blockSize, progress, bufferSize,
15                 checksumOpt);
16         //dfs為DistributedFileSystem所持有的DFSClient對象,這里調用DFSClient中的create()方法
17         return dfs.createWrappedOutputStream(dfsos, statistics);
18       }
19       @Override
20       public FSDataOutputStream next(final FileSystem fs, final Path p)
21           throws IOException {
22         return fs.create(p, permission, cflags, bufferSize,
23             replication, blockSize, progress, checksumOpt);
24       }
25     }.resolve(this, absF);
26   }

  DFSClient的create()返回一個DFSOutputStream對象:

  

 1  //DFSClient.java
 2 public DFSOutputStream create(String src, 
 3                              FsPermission permission,
 4                              EnumSet<CreateFlag> flag, 
 5                              boolean createParent,
 6                              short replication,
 7                              long blockSize,
 8                              Progressable progress,
 9                              int buffersize,
10                              ChecksumOpt checksumOpt,
11                              InetSocketAddress[] favoredNodes) throws IOException {
12     checkOpen();
13     if (permission == null) {
14       permission = FsPermission.getFileDefault();
15     }
16     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
17     if(LOG.isDebugEnabled()) {
18       LOG.debug(src + ": masked=" + masked);
19     }
20     //調用DFSOutputStream的靜態方法newStreamForCreate,返回輸出流
21     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
22         src, masked, flag, createParent, replication, blockSize, progress,
23         buffersize, dfsClientConf.createChecksum(checksumOpt),
24         getFavoredNodesStr(favoredNodes));
25     beginFileLease(result.getFileId(), result);
26     return result;
27   }

  我們繼續看下newStreamForCreate()中的業務邏輯:

  

 1 //DFSOutputStream.java
 2  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
 3       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
 4       short replication, long blockSize, Progressable progress, int buffersize,
 5       DataChecksum checksum, String[] favoredNodes) throws IOException {
 6     TraceScope scope =
 7         dfsClient.getPathTraceScope("newStreamForCreate", src);
 8     try {
 9       HdfsFileStatus stat = null;
10       boolean shouldRetry = true;
11       int retryCount = CREATE_RETRY_COUNT;
12       while (shouldRetry) {
13         shouldRetry = false;
14         try {
15           //這里通過dfsClient的NameNode代理對象調用NameNodeRpcServer中實現的create()方法
16           stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
17               new EnumSetWritable<CreateFlag>(flag), createParent, replication,
18               blockSize, SUPPORTED_CRYPTO_VERSIONS);
19           break;
20         } catch (RemoteException re) {
21           IOException e = re.unwrapRemoteException(
22               AccessControlException.class,
23               DSQuotaExceededException.class,
24               FileAlreadyExistsException.class,
25               FileNotFoundException.class,
26               ParentNotDirectoryException.class,
27               NSQuotaExceededException.class,
28               RetryStartFileException.class,
29               SafeModeException.class,
30               UnresolvedPathException.class,
31               SnapshotAccessControlException.class,
32               UnknownCryptoProtocolVersionException.class);
33           if (e instanceof RetryStartFileException) {
34             if (retryCount > 0) {
35               shouldRetry = true;
36               retryCount--;
37             } else {
38               throw new IOException("Too many retries because of encryption" +
39                   " zone operations", e);
40             }
41           } else {
42             throw e;
43           }
44         }
45       }
46       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
47      //new輸出流對象
48       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
49           flag, progress, checksum, favoredNodes);
50       out.start();//調用內部類DataStreamer的start()方法,DataStreamer繼承Thread,所以說這是一個線程,從NameNode中申請新的block信息;
                同時前面我們介紹hdfs原理的時候提到的流水線作業(Pipeline)也是在這里實現,有興趣的同學可以去研究下,這里就不帶大家看了
51 return out; 52 } finally { 53 scope.close(); 54 } 55 }

    

  到此,Client拿到了服務端的輸出流對象,那么后面就容易了,都是一些簡答的文件輸出,輸入流的操作(hadoop提供的IOUitl)。

文件下載

  文件上傳的大致流程與文件下載類似,與上傳一樣,我們先上程序方法調用時序圖:

  

  主要執行過程:  

  1.    FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(與前面一樣)
  2.    調用FileSystem的open()方法,由於實現類為DistributedFileSystem,所有是調用該類中的open()方法
  3.    DistributedFileSystem持有DFSClient的引用,繼續調用DFSClient中的open()方法
  4.    實例化DFSInputStream輸入流
  5.    調用openinfo()方法
  6.    調用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息並獲取最后block長度
  7.        調用DFSClient中的getLocatedBlocks()方法,獲取block信息
  8.    在callGetBlockLocations()方法中通過NameNode代理對象調用NameNodeRpcServer的getBlockLocations()方法
  9.        將block信息寫入輸出流
  10.        交給IOUtil,下載文件到本地

  接下來,我們開始看源碼:

  首先任然是FileSystem的初始化,前面有,這里就不貼出來了,我們直接從DistributedFileSystem的open()開始看。

  

 1 //DistributedFifeSystem.java
 2 @Override
 3   public FSDataInputStream open(Path f, final int bufferSize)
 4       throws IOException {
 5     statistics.incrementReadOps(1);
 6     Path absF = fixRelativePart(f);
 7     return new FileSystemLinkResolver<FSDataInputStream>() {
 8       @Override
 9       public FSDataInputStream doCall(final Path p)
10           throws IOException, UnresolvedLinkException {
11         final DFSInputStream dfsis =
12           dfs.open(getPathName(p), bufferSize, verifyChecksum);
13         //dfs為DFSClient對象,調用open()返回輸入流
14         return dfs.createWrappedInputStream(dfsis);
15       }
16       @Override
17       public FSDataInputStream next(final FileSystem fs, final Path p)
18           throws IOException {
19         return fs.open(p, bufferSize);
20       }
21     }.resolve(this, absF);
22   }

  DFSClient中並沒有直接使用NameNode的代理對象,而是傳給了DFSInputStream:

  

 1 //DFSClient.java
 2 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
 3       throws IOException, UnresolvedLinkException {
 4     checkOpen();   
 5     TraceScope scope = getPathTraceScope("newDFSInputStream", src);
 6     try {
 7       //這里並沒有直接通過NameNode的代理對象調用服務端的方法,直接new輸入流並把當前對象作為參數傳入
 8       return new DFSInputStream(this, src, verifyChecksum);
 9     } finally {
10       scope.close();
11     }
12   }

  那么在DFSInputStream必須持有DFSClient的引用:

  

 1 //DFSInputStream.java 構造
 2 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
 3                  ) throws IOException, UnresolvedLinkException {
 4     this.dfsClient = dfsClient;//只有DFSClient的引用
 5     this.verifyChecksum = verifyChecksum;
 6     this.src = src;
 7     synchronized (infoLock) {
 8       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
 9     }
10     openInfo();//調openInfo()
11   }

  openInfo()用來抓取block信息:

 1 void openInfo() throws IOException, UnresolvedLinkException {
 2     synchronized(infoLock) {
 3       lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();//抓取block信息
 4       int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;//獲取配置信息,嘗試抓取的次數,樓主記得在2.6以前這里寫的3;當然,現在的默認值也為3
 5       while (retriesForLastBlockLength > 0) {
 6         if (lastBlockBeingWrittenLength == -1) {
 7           DFSClient.LOG.warn("Last block locations not available. "
 8               + "Datanodes might not have reported blocks completely."
 9               + " Will retry for " + retriesForLastBlockLength + " times");
10           waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
11           lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
12         } else {
13           break;
14         }
15         retriesForLastBlockLength--;
16       }
17       if (retriesForLastBlockLength == 0) {
18         throw new IOException("Could not obtain the last block locations.");
19       }
20     }
21   }

 

  獲取block信息:

 1 //DFSInputStream.java
 2 private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
 3     final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
 4     //回到DFSClient中來獲取當前block信息
 5     if (DFSClient.LOG.isDebugEnabled()) {
 6       DFSClient.LOG.debug("newInfo = " + newInfo);
 7     }
 8     if (newInfo == null) {
 9       throw new IOException("Cannot open filename " + src);
10     }
11 
12     if (locatedBlocks != null) {
13       Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
14       Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
15       while (oldIter.hasNext() && newIter.hasNext()) {
16         if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
17           throw new IOException("Blocklist for " + src + " has changed!");
18         }
19       }
20     }
21     locatedBlocks = newInfo;
22     long lastBlockBeingWrittenLength = 0;
23     if (!locatedBlocks.isLastBlockComplete()) {
24       final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
25       if (last != null) {
26         if (last.getLocations().length == 0) {
27           if (last.getBlockSize() == 0) {         
28             return 0;
29           }
30           return -1;
31         }
32         final long len = readBlockLength(last);
33         last.getBlock().setNumBytes(len);
34         lastBlockBeingWrittenLength = len; 
35       }
36     }
37 
38     fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
39     //返回block開始寫的位置
40     return lastBlockBeingWrittenLength;
41   }

  回到DFSClient中:

  

 1 DFSClient.java
 2 @VisibleForTesting
 3   public LocatedBlocks getLocatedBlocks(String src, long start, long length)
 4       throws IOException {
 5     TraceScope scope = getPathTraceScope("getBlockLocations", src);
 6     try {
 7       //這里NameNode作為參數傳遞到callGetBlockLocations()中
 8       return callGetBlockLocations(namenode, src, start, length);
 9     } finally {
10       scope.close();
11     }
12   }

  調用服務端方法,返回block信息:

 1 //DFSClient.java
 2 static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
 3       String src, long start, long length) 
 4       throws IOException {
 5     try {
 6      //看到這里,不用做過多的解釋了吧?
 7       return namenode.getBlockLocations(src, start, length);
 8     } catch(RemoteException re) {
 9       throw re.unwrapRemoteException(AccessControlException.class,
10                                      FileNotFoundException.class,
11                                      UnresolvedPathException.class);
12     }
13   }

  最終將文件block相關信息寫入輸入流,通過工具類IOUtil輸出到本地文件。

  那關於hadoop之hdfs原理及文件上傳下載源碼解析就寫到這里,下系列的文章,樓主會寫一些關於mapreduce或者hive相關的文章分享給大家。

  示例代碼地址:https://github.com/LJunChina/hadoop

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM