HDFS dfsclient寫文件過程 源碼分析


HDFS寫入文件的重要概念

HDFS一個文件由多個block構成。HDFS在進行block讀寫的時候是以packet(默認每個packet為64K)為單位進行的。每一個packet由若干個chunk(默認512Byte)組成。Chunk是進行數據校驗的基本單位,對每一個chunk生成一個校驗和(默認4Byte)並將校驗和進行存儲。

在寫入一個block的時候,數據傳輸的基本單位是packet,每個packet由若干個chunk組成。

 

HDFS客戶端寫文件示例代碼

FileSystem hdfs = FileSystem.get(new Configuration());
Path path = new Path("/testfile");

// writing
FSDataOutputStream dos = hdfs.create(path);
byte[] readBuf = "Hello World".getBytes("UTF-8");
dos.write(readBuf, 0, readBuf.length);
dos.close();

hdfs.close();

 

文件的打開

上傳一個文件到hdfs,一般會調用DistributedFileSystem.create,其實現如下:

public FSDataOutputStream create(Path f, FsPermission permission,boolean overwrite,int bufferSize, short replication, long blockSize,Progressable progress) throws IOException {
    return new FSDataOutputStream
       (dfs.create(getPathName(f), permission,overwrite, replication, blockSize, progress, bufferSize),
        statistics);
}

其最終生成一個FSDataOutputStream用於向新生成的文件中寫入數據。其成員變量dfs的類型為DFSClient,DFSClient的create函數如下:

public OutputStream create(String src,FsPermission permission,boolean overwrite,short replication,long blockSize,Progressable progress,int buffersize) throws IOException {
    checkOpen();
    if (permission == null) {
      permission = FsPermission.getDefault();
    }
    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
    OutputStream result = new DFSOutputStream(src, masked,overwrite, replication, blockSize, progress, buffersize,
        conf.getInt("io.bytes.per.checksum", 512));
    leasechecker.put(src, result);
    return result;
}

其中構造了一個DFSOutputStream,在其構造函數中,同過RPC調用NameNode的create來創建一個文件。 
當然,構造函數中還做了一件重要的事情,就是streamer.start(),也即啟動了一個pipeline,用於寫數據,在寫入數據的過程中,我們會仔細分析。

DFSOutputStream(String src, FsPermission masked, boolean overwrite,short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum); computePacketChunkSize(writePacketSize, bytesPerChecksum); try { namenode.create(src, masked, clientName, overwrite, replication, blockSize); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class,QuotaExceededException.class); } streamer.start(); }

 通過rpc調用NameNode的create函數,調用namesystem.startFile函數,其又調用startFileInternal函數,它創建一個新的文件,狀態為under construction,沒有任何data block與之對應。

 

dfsclient文件的寫入

下面輪到客戶端向新創建的文件中寫入數據了,一般會使用FSDataOutputStream的write方法:

按照hdfs的設計,對block的數據寫入使用的是pipeline的方式,也即將數據分成一個個的package,如果需要復制三分,分別寫入DataNode 1, 2, 3,則會進行如下的過程:

  • 首先將package 1寫入DataNode 1
  • 然后由DataNode 1負責將package 1寫入DataNode 2,同時客戶端可以將pacage 2寫入DataNode 1
  • 然后DataNode 2負責將package 1寫入DataNode 3, 同時客戶端可以講package 3寫入DataNode 1,DataNode 1將package 2寫入DataNode 2
  • 就這樣將一個個package排着隊的傳遞下去,直到所有的數據全部寫入並復制完畢

FSDataOutputStream的write方法會調用DFSOutputStream的write方法,而DFSOutputStream繼承自FSOutputSummer,所以實際上是調用FSOutputSummer的write方法,如下:

public synchronized void write(byte b[], int off, int len)
  throws IOException {
    //參數檢查
    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
    }
  }

FSOutputSummer的write1的方法如下:

private int write1(byte b[], int off, int len) throws IOException {
    if(count==0 && len>=buf.length) {
      // buf初始化的大小是chunk的大小,默認是512,這里的代碼會在寫入的數據的剩余內容大於或等於一個chunk的大小時調用
      // 這里避免多余一次復制
      final int length = buf.length;
      sum.update(b, off, length);//length是一個完整chunk的大小,默認是512,這里根據一個chunk內容計算校驗和
      writeChecksumChunk(b, off, length, false);
      return length;
    }
    
    // buf初始化的大小是chunk的大小,默認是512,這里的代碼會在寫入的數據的剩余內容小於一個chunk的大小時調用
    // 規避了數組越界問題
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    sum.update(b, off, bytesToCopy);//bytesToCopy不足一個chunk,是寫入的內容的最后一個chunk的剩余字節數目
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {//如果不足一個chunk,就緩存到本地buffer,如果還有下一次寫入,就填充這個chunk,滿一個chunk再flush,count清0
      // local buffer is full
      flushBuffer();//最終調用writeChecksumChunk方法實現
    } 
    return bytesToCopy;
  }

writeChecksumChunk的實現如下:

//寫入一個chunk的數據長度(默認512),忽略len的長度
private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
  throws IOException {
    int tempChecksum = (int)sum.getValue();
    if (!keep) {
      sum.reset();
    }
    int2byte(tempChecksum, checksum);//把當前chunk的校驗和從int轉換為字節
    writeChunk(b, off, len, checksum);
}

writeChunk由子類DFSOutputStream實現,如下:

 protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)throws IOException {

      //創建一個package,並寫入數據
      currentPacket = new Packet(packetSize, chunksPerPacket,bytesCurBlock);
      currentPacket.writeChecksum(checksum, 0, cklen);
      currentPacket.writeData(b, offset, len);
      currentPacket.numChunks++;
      bytesCurBlock += len;

      //如果此package已滿,則放入隊列中准備發送
      if (currentPacket.numChunks == currentPacket.maxChunks ||bytesCurBlock == blockSize) {
          ......
          dataQueue.addLast(currentPacket);
          //喚醒等待dataqueue的傳輸線程,也即DataStreamer
          dataQueue.notifyAll();
          currentPacket = null;
          ......
      }
 }

 writeChunk比較簡單,就是把數據填充packet,填充完畢,就放到dataQueue,再喚醒DataStreamer。

DataStreamer完成了數據的傳輸,DataStreamer的run函數如下:

  public void run() {
    while (!closed && clientRunning) {
      Packet one = null;
      synchronized (dataQueue) {
      boolean doSleep = processDatanodeError(hasError, false);//如果ack出錯,則處理IO錯誤
        //如果隊列中沒有package,則等待
        while ((!closed && !hasError && clientRunning && dataQueue.size() == 0) || doSleep) {
          try {
            dataQueue.wait(1000);
          } catch (InterruptedException  e) {
          }
          doSleep = false;
        }
        try {
          //得到隊列中的第一個package
          one = dataQueue.getFirst();
          long offsetInBlock = one.offsetInBlock;
          //由NameNode分配block,並生成一個寫入流指向此block
          if (blockStream == null) {
            nodes = nextBlockOutputStream(src);
            response = new ResponseProcessor(nodes);
            response.start();
          }

          ByteBuffer buf = one.getBuffer();
          //將packet從dataQueue移至ackQueue,等待確認
          dataQueue.removeFirst();
          dataQueue.notifyAll();
          synchronized (ackQueue) {
            ackQueue.addLast(one);
            ackQueue.notifyAll();
          }

          //利用生成的寫入流將數據寫入DataNode中的block
          blockStream.write(buf.array(), buf.position(), buf.remaining());
          if (one.lastPacketInBlock) {
            blockStream.writeInt(0); //表示此block寫入完畢
          }
          blockStream.flush();
        } catch (Throwable e) {
        }
        
        if (one.lastPacketInBlock) {
            //數據塊寫滿,做一些清理工作,下次再申請塊
            response.close();        // ignore all errors in Response
            
            synchronized (dataQueue) {
              IOUtils.cleanup(LOG, blockStream, blockReplyStream);
              nodes = null;
              response = null;
              blockStream = null;//設置為null,下次就會判斷blockStream為null,申請新的塊
              blockReplyStream = null;
            }
        }
    }
      ......
  }

DataStreamer線程負責把准備好的數據packet,順序寫入到DataNode,未確認寫入成功的packet則移動到ackQueue,等待確認。

DataStreamer線程傳輸數據到DataNode時,要向namenode申請數據塊,方法是nextBlockOutputStream,再調用locateFollowingBlock,通過RPC調用namenode.addBlock(src, clientName),在NameNode分配了DataNode和block以后,createBlockOutputStream開始寫入數據。

客戶端在DataStreamer的run函數中創建了寫入流后,調用blockStream.write將packet寫入DataNode

 

DataStreamer還會啟動ResponseProcessor線程,它負責接收datanode的ack,當接收到所有datanode對一個packet確認成功的ack,ResponseProcessor從ackQueue中刪除相應的packet。在出錯時,從ackQueue中移除packet到dataQueue,移除失敗的datanode,恢復數據塊,建立新的pipeline。實現如下:

public void run() {
...
PipelineAck ack = new PipelineAck();
while (!closed && clientRunning && !lastPacketInBlock) {
  try {
    // read an ack from the pipeline
    ack.readFields(blockReplyStream);
    ...
    //處理所有DataNode響應的狀態
    for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
        short reply = ack.getReply(i);  
      if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {//ack驗證,如果DataNode寫入packet失敗,則出錯    
        errorIndex = i; //記錄損壞的DataNode,會在processDatanodeError方法移除該失敗的DataNode
        throw new IOException("Bad response " + reply + " for block " + block +  " from datanode " + targets[i].getName());    
      }   
    }

    long seqno = ack.getSeqno();
    if (seqno == Packet.HEART_BEAT_SEQNO) {  // 心跳ack,忽略
      continue;
    }
    Packet one = null;
    synchronized (ackQueue) {
      one = ackQueue.getFirst();
    }
    ...
    synchronized (ackQueue) {
      assert ack.getSeqno() == lastAckedSeqno + 1;//驗證ack
      lastAckedSeqno = ack.getSeqno();
      ackQueue.removeFirst();//移除確認寫入成功的packet
      ackQueue.notifyAll();
    }
  } catch (Exception e) {
    if (!closed) {
      hasError = true;//設置ack錯誤,讓
      ...
      closed = true;
    }
  }
}
}

當ResponseProcessor在確認packet失敗時,processDatanodeError方法用於處理datanode的錯誤,當調用返回后需要休眠一段時間時,返回true。下面是其簡單的處理流程:

1.關閉blockStream和blockReplyStream
2.將packet從ackQueue移到dataQueue
3.刪除壞datanode
4.通過RPC調用datanode的recoverBlock方法來恢復塊,如果有錯,返回true
5.如果沒有可用的datanode,關閉DFSOutputStream和streamer,返回false
6.創建塊輸出流,如果不成功,轉到3

實現如下:

private boolean processDatanodeError(boolean hasError, boolean isAppend) {
  if (!hasError) {//DataNode沒有發生錯誤,直接返回
    return false;
  }
  
  //將未確認寫入成功的packets從ack queue移動到data queue的前面
  synchronized (ackQueue) {
    dataQueue.addAll(0, ackQueue);
    ackQueue.clear();
  }

  boolean success = false;
  while (!success && clientRunning) {
    DatanodeInfo[] newnodes = null;
    
    //根據errorIndex確定失敗的DataNode,從所有的DataNode nodes移除失敗的DataNode,復制到newnodes

    // 通知primary datanode做數據塊恢復,更新合適的時間戳
    LocatedBlock newBlock = null;
    ClientDatanodeProtocol primary =  null;
    DatanodeInfo primaryNode = null;
    try {
      // Pick the "least" datanode as the primary datanode to avoid deadlock.
      primaryNode = Collections.min(Arrays.asList(newnodes));
      primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken, socketTimeout);
      newBlock = primary.recoverBlock(block, isAppend, newnodes);//恢復數據塊
    } catch (IOException e) {
        //循環創建塊輸出流,如果不成功,移除失敗的DataNode
          return true;          // 需要休眠
    } finally {
      RPC.stopProxy(primary);
    }
    recoveryErrorCount = 0; // 數據塊恢復成功
    block = newBlock.getBlock();
    accessToken = newBlock.getBlockToken();
    nodes = newBlock.getLocations();

    this.hasError = false;
    lastException = null;
    errorIndex = 0;
    success = createBlockOutputStream(nodes, clientName, true);
  }

  response = new ResponseProcessor(nodes);
  response.start();//啟動ResponseProcessor做ack確認處理
  return false; // 不休眠,繼續處理
}

 

總結 

hdfs文件的寫入是比較復雜的,所以本文重點介紹了dfsclient端的處理邏輯,對namenode和datanode的響應,就不做詳細分析了。

更多參考

      http://www.cnblogs.com/forfuture1978/archive/2010/11/10/1874222.html (HDFS讀寫過程解析)

      http://blog.jeoygin.org/2012/07/hdfs-source-analysis-hdfs-input-output-stream.html (講解dfsclient的重要類的職責)

      http://caibinbupt.iteye.com/blog/286259 (datanode對於塊寫入的處理)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM