HDFS dfsclient讀文件過程 源碼分析


HDFS讀取文件的重要概念

HDFS一個文件由多個block構成。HDFS在進行block讀寫的時候是以packet(默認每個packet為64K)為單位進行的。每一個packet由若干個chunk(默認512Byte)組成。Chunk是進行數據校驗的基本單位,對每一個chunk生成一個校驗和(默認4Byte)並將校驗和進行存儲。在讀取一個block的時候,數據傳輸的基本單位是packet,每個packet由若干個chunk組成。

 

HDFS客戶端讀文件示例代碼

FileSystem hdfs = FileSystem.get(new Configuration());
Path path = new Path("/testfile");// reading
FSDataInputStream dis = hdfs.open(path);
byte[] writeBuf = new byte[1024];
int len = dis.read(writeBuf);
System.out.println(new String(writeBuf, 0, len, "UTF-8"));
dis.close();

hdfs.close();

 

文件的打開

HDFS打開一個文件,需要在客戶端調用DistributedFileSystem.open(Path f, int bufferSize),其實現為:

public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  return new DFSClient.DFSDataInputStream(
        dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}

其中dfs為DistributedFileSystem的成員變量DFSClient,其open函數被調用,其中創建一個DFSInputStream(src, buffersize, verifyChecksum)並返回。

DFSClient.DFSDataInputStream實現了HDFS的FSDataInputStream,里面簡單包裝了DFSInputStream,實際實現是DFSInputStream完成的。

在DFSInputStream的構造函數中,openInfo函數被調用,其主要從namenode中得到要打開的文件所對應的blocks的信息,實現如下:

synchronized void openInfo() throws IOException {
LocatedBlocks newInfo
= callGetBlockLocations(namenode, src, 0, prefetchSize); this.locatedBlocks = newInfo; this.currentNode = null; } private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,String src, long start, long length) throws IOException { return namenode.getBlockLocations(src, start, length); }

LocatedBlocks主要包含一個鏈表的List<LocatedBlock> blocks,其中每個LocatedBlock包含如下信息:

  • Block b:此block的信息
  • long offset:此block在文件中的偏移量
  • DatanodeInfo[] locs:此block位於哪些DataNode上

上面namenode.getBlockLocations是一個RPC調用,最終調用NameNode類的getBlockLocations函數。

NameNode返回的是根據客戶端請求的文件名字,文件偏移量,數據長度,返回文件對應的數據塊列表,數據塊所在的DataNode節點。

 

文件的順序讀取

 hdfs文件的順序讀取是最經常使用的.

文件順序讀取的時候,客戶端利用文件打開的時候得到的FSDataInputStream.read(byte[] buffer, int offset, int length)函數進行文件讀操作。

FSDataInputStream會調用其封裝的DFSInputStream的read(byte[] buffer, int offset, int length)函數,實現如下:

public synchronized int read(byte buf[], int off, int len) throws IOException {
    ...
    if (pos < getFileLength()) {
    int retries = 2;
    while (retries > 0) {
      try {
        if (pos > blockEnd) {//首次pos=0,blockEnd=-1,必定調用方法blockSeekTo,初始化blockEnd,以后是讀完了當前塊,需要讀下一個塊,才會調用blockSeekTo
          currentNode = blockSeekTo(pos);//根據pos選擇塊和數據節點,選擇算法是遍歷塊所在的所有數據節點,選擇第一個非死亡節點
        }
        int realLen = Math.min(len, (int) (blockEnd - pos + 1));
        int result = readBuffer(buf, off, realLen);
        
        if (result >= 0) {
          pos += result;
        } else {
          throw new IOException("Unexpected EOS from the reader");
        }
        ...
        return result;
      } catch (ChecksumException ce) {
        throw ce;            
      } catch (IOException e) {
        ...
        if (currentNode != null) { addToDeadNodes(currentNode); }//遇到無法讀的DataNode,添加到死亡節點
        if (--retries == 0) {//嘗試讀三次都失敗,就拋出異常
          throw e;
        }
      }
    }
    }
    return -1;
}

 blockSeekTo函數會更新blockEnd,並創建對應的BlockReader,這里的BlockReader的初始化和上面的fetchBlockByteRange差不多,如果客戶端和塊所屬的DataNode是同個節點,則初始化一個通過本地讀取的BlockReader,否則創建一個通過Socket連接DataNode的BlockReader。

BlockReader的創建也是通過BlockReader.newBlockReader創建的,具體分析請看后面。

readBuffer方法比較簡單,直接調用BlockReader的read方法直接讀取數據。

BlockReader的read方法就根據請求的塊起始偏移量,長度,通過socket連接DataNode,獲取塊內容,BlockReader的read方法不會做緩存優化。

 

文件的隨機讀取

對於MapReduce,在提交作業時,已經確定了每個map和reduce要讀取的文件,文件的偏移量,讀取的長度,所以MapReduce使用的大部分是文件的隨機讀取。

文件隨機讀取的時候,客戶端利用文件打開的時候得到的FSDataInputStream.read(long position, byte[] buffer, int offset, int length)函數進行文件讀操作。

FSDataInputStream會調用其封裝的DFSInputStream的read(long position, byte[] buffer, int offset, int length)函數,實現如下:

public int read(long position, byte[] buffer, int offset, int length)throws IOException {  
  long filelen = getFileLength();
  int realLen = length;
  if ((position + length) > filelen) {
    realLen = (int)(filelen - position);
  }

  //首先得到包含從offset到offset + length內容的block列表
  //比如對於64M一個block的文件系統來說,欲讀取從100M開始,長度為128M的數據,則block列表包括第2,3,4塊block
  List<LocatedBlock> blockRange = getBlockRange(position, realLen);
  int remaining = realLen;
  //對每一個block,從中讀取內容
  //對於上面的例子,對於第2塊block,讀取從36M開始,讀取長度28M,對於第3塊,讀取整一塊64M,對於第4塊,讀取從0開始,長度為36M,共128M數據
  for (LocatedBlock blk : blockRange) {
    long targetStart = position - blk.getStartOffset();
    long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
    fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset);
    remaining -= bytesToRead;
    position += bytesToRead;
    offset += bytesToRead;
  }
  ...return realLen;
}

getBlockRange方法根據文件的偏移量和長度,獲取對應的數據塊信息。主要是根據NameNode類的getBlockLocations方法實現,並做了緩存和二分查找等優化。

 fetchBlockByteRange方法真正從數據塊讀取內容,實現如下:

private void fetchBlockByteRange(LocatedBlock block, long start,long end, byte[] buf, int offset) throws IOException {
  Socket dn = null;
  int numAttempts = block.getLocations().length;
  //此while循環為讀取失敗后的重試次數
  while (dn == null && numAttempts-- > 0 ) {
    //選擇一個DataNode來讀取數據
    DNAddrPair retval = chooseDataNode(block);
    DatanodeInfo chosenNode = retval.info;
    InetSocketAddress targetAddr = retval.addr;
    BlockReader reader = null;
    int len = (int) (end - start + 1);
    try {

      if (shouldTryShortCircuitRead(targetAddr)) {
        //如果要讀取的塊所屬的DataNode與客戶端是同一個節點,直接通過本地磁盤訪問,減少網絡流量
        reader = getLocalBlockReader(conf, src, block.getBlock(),accessToken, chosenNode, DFSClient.this.socketTimeout, start);
      } else {
        //創建Socket連接到DataNode
        dn = socketFactory.createSocket();
        dn.connect(targetAddr, socketTimeout);
        dn.setSoTimeout(socketTimeout);
        //利用建立的Socket鏈接,生成一個reader負責從DataNode讀取數據
        reader = BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(), accessToken,block.getBlock().getGenerationStamp(), start, len, buffersize, verifyChecksum, clientName);
      }        
      //讀取數據
      int nread = reader.readAll(buf, offset, len);
      return;
    } finally {
      IOUtils.closeStream(reader);
      IOUtils.closeSocket(dn);
      dn = null;
    }
    //如果讀取失敗,則將此DataNode標記為失敗節點
    addToDeadNodes(chosenNode);
  }
}

讀取塊內容,會嘗試該數據塊所在的所有DataNode,如果失敗,就把對應的DataNode加入到失敗節點,下次選擇節點就會忽略失敗節點(只在獨立的客戶端緩存失敗節點,不上報到namenode)。

BlockReader的創建也是通過BlockReader.newBlockReader創建的,具體分析請看后面。

最后,通過BlockReader的readAll方法讀取塊的完整內容。

 

dfsclient和datanode的通信協議

dfsclient的連接

dfsclient首次連接datanode時,通信協議實現主要是BlockReader.newBlockReader方法的實現,如下:

public static BlockReader newBlockReader( Socket sock, String file,long blockId,long genStamp,long startOffset, long len,int bufferSize, boolean verifyChecksum,String clientName) throws IOException {

  //使用Socket建立寫入流,向DataNode發送讀指令
  DataOutputStream out = new DataOutputStream(
    new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
  out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
  out.write( DataTransferProtocol.OP_READ_BLOCK );
  out.writeLong( blockId );
  out.writeLong( genStamp );
  out.writeLong( startOffset );
  out.writeLong( len );
  Text.writeString(out, clientName);
  out.flush();
  //使用Socket建立讀入流,用於從DataNode讀取數據
  DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock),bufferSize));
  short status = in.readShort();//塊讀取的狀態標記,一般是成功
  DataChecksum checksum = DataChecksum.newDataChecksum( in );
  long firstChunkOffset = in.readLong();
  //生成一個reader,主要包含讀入流,用於讀取數據
  return new BlockReader( file, blockId, in, checksum, verifyChecksum,
                          startOffset, firstChunkOffset, sock );
}

這里的startOffset是相對於塊的起始偏移量,len是要讀取的長度。

DataChecksum.newDataChecksum(in),會從DataNode獲取該塊的checksum加密方式,加密長度。

BlockReader的readAll函數就是用上面生成的DataInputStream讀取數據。

 下面是是讀數據塊時,客戶端發送的信息:

version operator blockid generationStamp startOffset length clientName  accessToken

 

 

operator:byte Client所需要的操作,讀取一個block、寫入一個block等等
version:short Client所需要的數據與Datanode所提供數據的版本是否一致
blockId:long 所要讀取block的blockId
generationStamp:long 所需要讀取block的generationStamp
startOffset:long 讀取block的的起始位置
length:long 讀取block的長度
clientName:String Client的名字
accessToken:Token Client提供的驗證信息,用戶名密碼等

 

DataNode對dfsclient的響應

DataNode負責與客戶端代碼的通信協議交互的邏輯,主要是DataXceiver的readBlock方法實現的:

private void readBlock(DataInputStream in) throws IOException {
  //讀取指令
  long blockId = in.readLong();         
  Block block = new Block( blockId, 0 , in.readLong());
  long startOffset = in.readLong();
  long length = in.readLong();
  String clientName = Text.readString(in);

  //創建一個寫入流,用於向客戶端寫數據
  OutputStream baseStream = NetUtils.getOutputStream(s,datanode.socketWriteTimeout);
  DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
  //生成BlockSender用於讀取本地的block的數據,並發送給客戶端
  //BlockSender有一個成員變量InputStream blockIn用於讀取本地block的數據
  BlockSender blockSender = new BlockSender(block, startOffset, length,true, true, false, datanode, clientTraceFmt);
  out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 發送操作成功的狀態
  //向客戶端寫入數據
  long read = blockSender.sendBlock(out, baseStream, null);
  ……
  } finally {
    IOUtils.closeStream(out);
    IOUtils.closeStream(blockSender);
  }
}

 

DataXceiver的sendBlock用於發送數據,數據發送包括應答頭和后續的數據包。應答頭如下(包含DataXceiver中發送的成功標識):

 

DataXceiver的sendBlock的實現如下:

long sendBlock(DataOutputStream out, OutputStream baseStream, 
                 BlockTransferThrottler throttler) throws IOException {
    ...
    try {
      try {
        checksum.writeHeader(out);//寫入checksum的加密類型和加密長度
        if ( chunkOffsetOK ) {
          out.writeLong( offset );
        }
        out.flush();
      } catch (IOException e) { //socket error
        throw ioeToSocketException(e);
      }
      
      ...
      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
      while (endOffset > offset) {//循環寫入數據包
        long len = sendChunks(pktBuf, maxChunksPerPacket, 
                              streamForSendChunks);
        offset += len;
        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*checksumSize);
        seqno++;
      }
      try {
        out.writeInt(0); //標記結束    
        out.flush();
      } catch (IOException e) { //socket error
        throw ioeToSocketException(e);
      }
    }
    ...
    return totalRead;
}

DataXceiver的sendChunks盡可能在一個packet發送多個chunk,chunk的個數由maxChunks和剩余的塊內容決定,實現如下:

//默認是crc校驗,bytesPerChecksum默認是512,checksumSize默認是4,表示數據塊每512個字節,做一次checksum校驗,checksum的結果是4個字節
private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
                         throws IOException {
    int len = Math.min((int) (endOffset - offset),bytesPerChecksum * maxChunks);//len是要發送的數據長度
    if (len == 0) {
      return 0;
    }

    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;//這次要發送的chunk數量
    int packetLen = len + numChunks*checksumSize + 4;//packetLen是整個包的長度,包括包頭,校驗碼,數據
    pkt.clear();
    
    // write packet header
    pkt.putInt(packetLen);//整個packet的長度
    pkt.putLong(offset);//塊的偏移量
    pkt.putLong(seqno);//序列號
    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));//是否最后一個packet
    pkt.putInt(len);//發送的數據長度
    
    int checksumOff = pkt.position();
    int checksumLen = numChunks * checksumSize;
    byte[] buf = pkt.array();
    
    if (checksumSize > 0 && checksumIn != null) {
      try {
        checksumIn.readFully(buf, checksumOff, checksumLen);//填充chucksum的內容
      } catch (IOException e) {
        ...
      }
    }
    
    int dataOff = checksumOff + checksumLen;
    if (blockInPosition < 0) {
      IOUtils.readFully(blockIn, buf, dataOff, len);//填充塊數據的內容
      if (verifyChecksum) {//默認是false,不驗證
        //校驗處理
      }
    }
    
    try {
      //通過socket發送數據到客戶端
      
    } catch (IOException e) {
      throw ioeToSocketException(e);
    }
    ...
    return len;
}

數據組織成數據包來發送,數據包結構如下:

packetLen offset sequenceNum isLastPacket startOffset dataLen checksum   data

 

 

packetLen:int packet的長度,包括數據、數據的校驗等等
offset:long packet在block中的偏移量
sequenceNum:long 該packet在這次block讀取時的序號
isLastPacket:byte packet是否是最后一個
dataLen:int 該packet所包含block數據的長度,純數據不包括校驗和其他
checksum:該packet每一個chunk的校驗和,有多少個chunk就有多少個校驗和
data:該packet所包含的block數據

數據傳輸結束的標志,是一個packetLen長度為0的包。客戶端可以返回一個兩字節的應答OP_STATUS_CHECKSUM_OK(5)

 

dfsclient讀取塊內容

 hdfs文件的隨機和順序分析邏輯,都分析到BlockReader的readAll方法和read方法,這兩個方法完成對數據塊的內容讀取。

而readAll方法最后也是調用read方法,所以這里重點分析BlockReader的read方法,實現如下:

public synchronized int read(byte[] buf, int off, int len) throws IOException {
      //第一次read, 忽略前面的額外數據
      if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
        int toSkip = (int)(startOffset - firstChunkOffset);
        if ( skipBuf == null ) {
          skipBuf = new byte[bytesPerChecksum];
        }
        if ( super.read(skipBuf, 0, toSkip) != toSkip ) {//忽略
          // should never happen
          throw new IOException("Could not skip required number of bytes");
        }
      }
      
      boolean eosBefore = gotEOS;
      int nRead = super.read(buf, off, len);
      // if gotEOS was set in the previous read and checksum is enabled :
      if (dnSock != null && gotEOS && !eosBefore && nRead >= 0
          && needChecksum()) {
        //checksum is verified and there are no errors.
        checksumOk(dnSock);
      }
      return nRead;
}

 

super.read即是FSInputChecker的read方法,實現如下

public synchronized int read(byte[] b, int off, int len) throws IOException {
   //參數檢查
    int n = 0;
    for (;;) {
      int nread = read1(b, off + n, len - n);
      if (nread <= 0) 
        return (n == 0) ? nread : n;
      n += nread;
      if (n >= len)
        return n;
    }
}
//read1的len被忽略,只返回一個chunk的數據長度(最后一個chunk可能不足一個完整chunk的長度)
private int read1(byte b[], int off, int len)
  throws IOException {
    int avail = count-pos;
    if( avail <= 0 ) {
      if(len>=buf.length) {
        //直接讀取一個數據chunk到用戶buffer,避免多余一次復制
//很巧妙,buf初始化的大小是chunk的大小,默認是512,這里的代碼會在塊的剩余內容大於一個chunk的大小時調用
int nread = readChecksumChunk(b, off, len); return nread; } else { //讀取一個數據chunk到本地buffer,也是調用readChecksumChunk方法
//很巧妙,buf初始化大小是chunk的大小,默認是512,這里的代碼會在塊的剩余內容不足一個chunk的大小時進入調用
fill(); if( count <= 0 ) { return -1; } else { avail = count; } } } //從本地buffer拷貝數據到用戶buffer,避免最后一個chunk導致數組越界 int cnt = (avail < len) ? avail : len; System.arraycopy(buf, pos, b, off, cnt); pos += cnt; return cnt; }

 

FSInputChecker的readChecksumChunk會讀取一個數據塊的chunk,並做校驗,實現如下:

//只返回一個chunk的數據長度(默認512,最后一個chunk可能不足一個完整chunk的長度)
private int readChecksumChunk(byte b[], int off, int len)
  throws IOException {
    // invalidate buffer
    count = pos = 0;
          
    int read = 0;
    boolean retry = true;
    int retriesLeft = numOfRetries; //本案例中,numOfRetries是1,也就是說不會多次嘗試
    do {
      retriesLeft--;

      try {
        read = readChunk(chunkPos, b, off, len, checksum);
        if( read > 0 ) {
          if( needChecksum() ) {//這里會做checksum校驗
            sum.update(b, off, read);
            verifySum(chunkPos);
          }
          chunkPos += read;
        } 
        retry = false;
      } catch (ChecksumException ce) {
          ...
          if (retriesLeft == 0) {//本案例中,numOfRetries是1,也就是說不會多次嘗試,失敗了,直接拋出異常
            throw ce;
          }
          
          //如果讀取的chunk校驗失敗,以當前的chunkpos為起始偏移量,嘗試新的副本
          if (seekToNewSource(chunkPos)) {
            seek(chunkPos);
          } else {
            //找不到新的副本,拋出異常
            throw ce;
          }
        }
    } while (retry);
    return read;
}

 

readChunk方法由BlockReader實現,分析如下:

//只返回一個chunk的數據長度(默認512,最后一個chunk可能不足一個完整chunk的長度)
protected synchronized int readChunk(long pos, byte[] buf, int offset,int len, byte[] checksumBuf) throws IOException {
      //讀取一個 DATA_CHUNK.
      long chunkOffset = lastChunkOffset;
      if ( lastChunkLen > 0 ) {
        chunkOffset += lastChunkLen;
      }
      
      //如果先前的packet已經讀取完畢,就讀下一個packet。
      if (dataLeft <= 0) {
        //讀包的頭部
        int packetLen = in.readInt();
        long offsetInBlock = in.readLong();
        long seqno = in.readLong();
        boolean lastPacketInBlock = in.readBoolean();
        int dataLen = in.readInt();
        //校驗長度
        
        lastSeqNo = seqno;
        isLastPacket = lastPacketInBlock;
        dataLeft = dataLen;
        adjustChecksumBytes(dataLen);
        if (dataLen > 0) {
          IOUtils.readFully(in, checksumBytes.array(), 0,checksumBytes.limit());//讀取當前包的所有數據塊內容對應的checksum,后面的流程會講checksum和讀取的chunk內容做校驗
        }
      }

      int chunkLen = Math.min(dataLeft, bytesPerChecksum); //確定此次讀取的chunk長度,正常情況下是一個bytesPerChecksum(512字節),當文件最后不足一個bytesPerChecksum,讀取剩余的內容。
      
      if ( chunkLen > 0 ) {
        IOUtils.readFully(in, buf, offset, chunkLen);//讀取一個數據塊的chunk
        checksumBytes.get(checksumBuf, 0, checksumSize);
      }
      
      dataLeft -= chunkLen;
      lastChunkOffset = chunkOffset;
      lastChunkLen = chunkLen;
      ...
      if ( chunkLen == 0 ) {
        return -1;
      }
      return chunkLen;
}

 

總結

 本文前面概要介紹了dfsclient讀取文件的示例代碼,順序讀取文件和隨機讀取文件的概要流程,最后還基於dfsclient和datanode讀取塊的過程,做了一個詳細的分析。

 參考 http://caibinbupt.iteye.com/blog/284979

        http://www.cnblogs.com/forfuture1978/archive/2010/11/10/1874222.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM