HDFS讀取文件的重要概念
HDFS一個文件由多個block構成。HDFS在進行block讀寫的時候是以packet(默認每個packet為64K)為單位進行的。每一個packet由若干個chunk(默認512Byte)組成。Chunk是進行數據校驗的基本單位,對每一個chunk生成一個校驗和(默認4Byte)並將校驗和進行存儲。在讀取一個block的時候,數據傳輸的基本單位是packet,每個packet由若干個chunk組成。
HDFS客戶端讀文件示例代碼
FileSystem hdfs = FileSystem.get(new Configuration()); Path path = new Path("/testfile");// reading FSDataInputStream dis = hdfs.open(path); byte[] writeBuf = new byte[1024]; int len = dis.read(writeBuf); System.out.println(new String(writeBuf, 0, len, "UTF-8")); dis.close(); hdfs.close();
文件的打開
HDFS打開一個文件,需要在客戶端調用DistributedFileSystem.open(Path f, int bufferSize),其實現為:
public FSDataInputStream open(Path f, int bufferSize) throws IOException { return new DFSClient.DFSDataInputStream( dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); }
其中dfs為DistributedFileSystem的成員變量DFSClient,其open函數被調用,其中創建一個DFSInputStream(src, buffersize, verifyChecksum)並返回。
DFSClient.DFSDataInputStream實現了HDFS的FSDataInputStream,里面簡單包裝了DFSInputStream,實際實現是DFSInputStream完成的。
在DFSInputStream的構造函數中,openInfo函數被調用,其主要從namenode中得到要打開的文件所對應的blocks的信息,實現如下:
synchronized void openInfo() throws IOException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); this.locatedBlocks = newInfo; this.currentNode = null; } private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,String src, long start, long length) throws IOException { return namenode.getBlockLocations(src, start, length); }
LocatedBlocks主要包含一個鏈表的List<LocatedBlock> blocks,其中每個LocatedBlock包含如下信息:
- Block b:此block的信息
- long offset:此block在文件中的偏移量
- DatanodeInfo[] locs:此block位於哪些DataNode上
上面namenode.getBlockLocations是一個RPC調用,最終調用NameNode類的getBlockLocations函數。
NameNode返回的是根據客戶端請求的文件名字,文件偏移量,數據長度,返回文件對應的數據塊列表,數據塊所在的DataNode節點。
文件的順序讀取
hdfs文件的順序讀取是最經常使用的.
文件順序讀取的時候,客戶端利用文件打開的時候得到的FSDataInputStream.read(byte[] buffer, int offset, int length)函數進行文件讀操作。
FSDataInputStream會調用其封裝的DFSInputStream的read(byte[] buffer, int offset, int length)函數,實現如下:
public synchronized int read(byte buf[], int off, int len) throws IOException { ... if (pos < getFileLength()) { int retries = 2; while (retries > 0) { try { if (pos > blockEnd) {//首次pos=0,blockEnd=-1,必定調用方法blockSeekTo,初始化blockEnd,以后是讀完了當前塊,需要讀下一個塊,才會調用blockSeekTo currentNode = blockSeekTo(pos);//根據pos選擇塊和數據節點,選擇算法是遍歷塊所在的所有數據節點,選擇第一個非死亡節點 } int realLen = Math.min(len, (int) (blockEnd - pos + 1)); int result = readBuffer(buf, off, realLen); if (result >= 0) { pos += result; } else { throw new IOException("Unexpected EOS from the reader"); } ... return result; } catch (ChecksumException ce) { throw ce; } catch (IOException e) { ... if (currentNode != null) { addToDeadNodes(currentNode); }//遇到無法讀的DataNode,添加到死亡節點 if (--retries == 0) {//嘗試讀三次都失敗,就拋出異常 throw e; } } } } return -1; }
blockSeekTo函數會更新blockEnd,並創建對應的BlockReader,這里的BlockReader的初始化和上面的fetchBlockByteRange差不多,如果客戶端和塊所屬的DataNode是同個節點,則初始化一個通過本地讀取的BlockReader,否則創建一個通過Socket連接DataNode的BlockReader。
BlockReader的創建也是通過BlockReader.newBlockReader創建的,具體分析請看后面。
readBuffer方法比較簡單,直接調用BlockReader的read方法直接讀取數據。
BlockReader的read方法就根據請求的塊起始偏移量,長度,通過socket連接DataNode,獲取塊內容,BlockReader的read方法不會做緩存優化。
文件的隨機讀取
對於MapReduce,在提交作業時,已經確定了每個map和reduce要讀取的文件,文件的偏移量,讀取的長度,所以MapReduce使用的大部分是文件的隨機讀取。
文件隨機讀取的時候,客戶端利用文件打開的時候得到的FSDataInputStream.read(long position, byte[] buffer, int offset, int length)函數進行文件讀操作。
FSDataInputStream會調用其封裝的DFSInputStream的read(long position, byte[] buffer, int offset, int length)函數,實現如下:
public int read(long position, byte[] buffer, int offset, int length)throws IOException { long filelen = getFileLength(); int realLen = length; if ((position + length) > filelen) { realLen = (int)(filelen - position); } //首先得到包含從offset到offset + length內容的block列表 //比如對於64M一個block的文件系統來說,欲讀取從100M開始,長度為128M的數據,則block列表包括第2,3,4塊block List<LocatedBlock> blockRange = getBlockRange(position, realLen); int remaining = realLen; //對每一個block,從中讀取內容 //對於上面的例子,對於第2塊block,讀取從36M開始,讀取長度28M,對於第3塊,讀取整一塊64M,對於第4塊,讀取從0開始,長度為36M,共128M數據 for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset); remaining -= bytesToRead; position += bytesToRead; offset += bytesToRead; } ...return realLen; }
getBlockRange方法根據文件的偏移量和長度,獲取對應的數據塊信息。主要是根據NameNode類的getBlockLocations方法實現,並做了緩存和二分查找等優化。
fetchBlockByteRange方法真正從數據塊讀取內容,實現如下:
private void fetchBlockByteRange(LocatedBlock block, long start,long end, byte[] buf, int offset) throws IOException { Socket dn = null; int numAttempts = block.getLocations().length; //此while循環為讀取失敗后的重試次數 while (dn == null && numAttempts-- > 0 ) { //選擇一個DataNode來讀取數據 DNAddrPair retval = chooseDataNode(block); DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; BlockReader reader = null; int len = (int) (end - start + 1); try { if (shouldTryShortCircuitRead(targetAddr)) { //如果要讀取的塊所屬的DataNode與客戶端是同一個節點,直接通過本地磁盤訪問,減少網絡流量 reader = getLocalBlockReader(conf, src, block.getBlock(),accessToken, chosenNode, DFSClient.this.socketTimeout, start); } else { //創建Socket連接到DataNode dn = socketFactory.createSocket(); dn.connect(targetAddr, socketTimeout); dn.setSoTimeout(socketTimeout); //利用建立的Socket鏈接,生成一個reader負責從DataNode讀取數據 reader = BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(), accessToken,block.getBlock().getGenerationStamp(), start, len, buffersize, verifyChecksum, clientName); } //讀取數據 int nread = reader.readAll(buf, offset, len); return; } finally { IOUtils.closeStream(reader); IOUtils.closeSocket(dn); dn = null; } //如果讀取失敗,則將此DataNode標記為失敗節點 addToDeadNodes(chosenNode); } }
讀取塊內容,會嘗試該數據塊所在的所有DataNode,如果失敗,就把對應的DataNode加入到失敗節點,下次選擇節點就會忽略失敗節點(只在獨立的客戶端緩存失敗節點,不上報到namenode)。
BlockReader的創建也是通過BlockReader.newBlockReader創建的,具體分析請看后面。
最后,通過BlockReader的readAll方法讀取塊的完整內容。
dfsclient和datanode的通信協議
dfsclient的連接
dfsclient首次連接datanode時,通信協議實現主要是BlockReader.newBlockReader方法的實現,如下:
public static BlockReader newBlockReader( Socket sock, String file,long blockId,long genStamp,long startOffset, long len,int bufferSize, boolean verifyChecksum,String clientName) throws IOException { //使用Socket建立寫入流,向DataNode發送讀指令 DataOutputStream out = new DataOutputStream( new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))); out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); out.write( DataTransferProtocol.OP_READ_BLOCK ); out.writeLong( blockId ); out.writeLong( genStamp ); out.writeLong( startOffset ); out.writeLong( len ); Text.writeString(out, clientName); out.flush(); //使用Socket建立讀入流,用於從DataNode讀取數據 DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock),bufferSize)); short status = in.readShort();//塊讀取的狀態標記,一般是成功 DataChecksum checksum = DataChecksum.newDataChecksum( in ); long firstChunkOffset = in.readLong(); //生成一個reader,主要包含讀入流,用於讀取數據 return new BlockReader( file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock ); }
這里的startOffset是相對於塊的起始偏移量,len是要讀取的長度。
DataChecksum.newDataChecksum(in),會從DataNode獲取該塊的checksum加密方式,加密長度。
BlockReader的readAll函數就是用上面生成的DataInputStream讀取數據。
下面是是讀數據塊時,客戶端發送的信息:
version | operator | blockid | generationStamp | startOffset | length | clientName | accessToken |
operator:byte Client所需要的操作,讀取一個block、寫入一個block等等
version:short Client所需要的數據與Datanode所提供數據的版本是否一致
blockId:long 所要讀取block的blockId
generationStamp:long 所需要讀取block的generationStamp
startOffset:long 讀取block的的起始位置
length:long 讀取block的長度
clientName:String Client的名字
accessToken:Token Client提供的驗證信息,用戶名密碼等
DataNode對dfsclient的響應
DataNode負責與客戶端代碼的通信協議交互的邏輯,主要是DataXceiver的readBlock方法實現的:
private void readBlock(DataInputStream in) throws IOException { //讀取指令 long blockId = in.readLong(); Block block = new Block( blockId, 0 , in.readLong()); long startOffset = in.readLong(); long length = in.readLong(); String clientName = Text.readString(in); //創建一個寫入流,用於向客戶端寫數據 OutputStream baseStream = NetUtils.getOutputStream(s,datanode.socketWriteTimeout); DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); //生成BlockSender用於讀取本地的block的數據,並發送給客戶端 //BlockSender有一個成員變量InputStream blockIn用於讀取本地block的數據 BlockSender blockSender = new BlockSender(block, startOffset, length,true, true, false, datanode, clientTraceFmt); out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 發送操作成功的狀態 //向客戶端寫入數據 long read = blockSender.sendBlock(out, baseStream, null); …… } finally { IOUtils.closeStream(out); IOUtils.closeStream(blockSender); } }
DataXceiver的sendBlock用於發送數據,數據發送包括應答頭和后續的數據包。應答頭如下(包含DataXceiver中發送的成功標識):
DataXceiver的sendBlock的實現如下:
long sendBlock(DataOutputStream out, OutputStream baseStream, BlockTransferThrottler throttler) throws IOException { ... try { try { checksum.writeHeader(out);//寫入checksum的加密類型和加密長度 if ( chunkOffsetOK ) { out.writeLong( offset ); } out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); } ... ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); while (endOffset > offset) {//循環寫入數據包 long len = sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks); offset += len; totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*checksumSize); seqno++; } try { out.writeInt(0); //標記結束 out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); } } ... return totalRead; }
DataXceiver的sendChunks盡可能在一個packet發送多個chunk,chunk的個數由maxChunks和剩余的塊內容決定,實現如下:
//默認是crc校驗,bytesPerChecksum默認是512,checksumSize默認是4,表示數據塊每512個字節,做一次checksum校驗,checksum的結果是4個字節 private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) throws IOException { int len = Math.min((int) (endOffset - offset),bytesPerChecksum * maxChunks);//len是要發送的數據長度 if (len == 0) { return 0; } int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;//這次要發送的chunk數量 int packetLen = len + numChunks*checksumSize + 4;//packetLen是整個包的長度,包括包頭,校驗碼,數據 pkt.clear(); // write packet header pkt.putInt(packetLen);//整個packet的長度 pkt.putLong(offset);//塊的偏移量 pkt.putLong(seqno);//序列號 pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));//是否最后一個packet pkt.putInt(len);//發送的數據長度 int checksumOff = pkt.position(); int checksumLen = numChunks * checksumSize; byte[] buf = pkt.array(); if (checksumSize > 0 && checksumIn != null) { try { checksumIn.readFully(buf, checksumOff, checksumLen);//填充chucksum的內容 } catch (IOException e) { ... } } int dataOff = checksumOff + checksumLen; if (blockInPosition < 0) { IOUtils.readFully(blockIn, buf, dataOff, len);//填充塊數據的內容 if (verifyChecksum) {//默認是false,不驗證 //校驗處理 } } try { //通過socket發送數據到客戶端 } catch (IOException e) { throw ioeToSocketException(e); } ... return len; }
數據組織成數據包來發送,數據包結構如下:
packetLen | offset | sequenceNum | isLastPacket | startOffset | dataLen | checksum | data |
packetLen:int packet的長度,包括數據、數據的校驗等等
offset:long packet在block中的偏移量
sequenceNum:long 該packet在這次block讀取時的序號
isLastPacket:byte packet是否是最后一個
dataLen:int 該packet所包含block數據的長度,純數據不包括校驗和其他
checksum:該packet每一個chunk的校驗和,有多少個chunk就有多少個校驗和
data:該packet所包含的block數據
數據傳輸結束的標志,是一個packetLen長度為0的包。客戶端可以返回一個兩字節的應答OP_STATUS_CHECKSUM_OK(5)
dfsclient讀取塊內容
hdfs文件的隨機和順序分析邏輯,都分析到BlockReader的readAll方法和read方法,這兩個方法完成對數據塊的內容讀取。
而readAll方法最后也是調用read方法,所以這里重點分析BlockReader的read方法,實現如下:
public synchronized int read(byte[] buf, int off, int len) throws IOException { //第一次read, 忽略前面的額外數據 if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { int toSkip = (int)(startOffset - firstChunkOffset); if ( skipBuf == null ) { skipBuf = new byte[bytesPerChecksum]; } if ( super.read(skipBuf, 0, toSkip) != toSkip ) {//忽略 // should never happen throw new IOException("Could not skip required number of bytes"); } } boolean eosBefore = gotEOS; int nRead = super.read(buf, off, len); // if gotEOS was set in the previous read and checksum is enabled : if (dnSock != null && gotEOS && !eosBefore && nRead >= 0 && needChecksum()) { //checksum is verified and there are no errors. checksumOk(dnSock); } return nRead; }
super.read即是FSInputChecker的read方法,實現如下
public synchronized int read(byte[] b, int off, int len) throws IOException { //參數檢查 int n = 0; for (;;) { int nread = read1(b, off + n, len - n); if (nread <= 0) return (n == 0) ? nread : n; n += nread; if (n >= len) return n; } } //read1的len被忽略,只返回一個chunk的數據長度(最后一個chunk可能不足一個完整chunk的長度) private int read1(byte b[], int off, int len) throws IOException { int avail = count-pos; if( avail <= 0 ) { if(len>=buf.length) { //直接讀取一個數據chunk到用戶buffer,避免多余一次復制
//很巧妙,buf初始化的大小是chunk的大小,默認是512,這里的代碼會在塊的剩余內容大於一個chunk的大小時調用
int nread = readChecksumChunk(b, off, len); return nread; } else { //讀取一個數據chunk到本地buffer,也是調用readChecksumChunk方法
//很巧妙,buf初始化大小是chunk的大小,默認是512,這里的代碼會在塊的剩余內容不足一個chunk的大小時進入調用
fill(); if( count <= 0 ) { return -1; } else { avail = count; } } } //從本地buffer拷貝數據到用戶buffer,避免最后一個chunk導致數組越界 int cnt = (avail < len) ? avail : len; System.arraycopy(buf, pos, b, off, cnt); pos += cnt; return cnt; }
FSInputChecker的readChecksumChunk會讀取一個數據塊的chunk,並做校驗,實現如下:
//只返回一個chunk的數據長度(默認512,最后一個chunk可能不足一個完整chunk的長度) private int readChecksumChunk(byte b[], int off, int len) throws IOException { // invalidate buffer count = pos = 0; int read = 0; boolean retry = true; int retriesLeft = numOfRetries; //本案例中,numOfRetries是1,也就是說不會多次嘗試 do { retriesLeft--; try { read = readChunk(chunkPos, b, off, len, checksum); if( read > 0 ) { if( needChecksum() ) {//這里會做checksum校驗 sum.update(b, off, read); verifySum(chunkPos); } chunkPos += read; } retry = false; } catch (ChecksumException ce) { ... if (retriesLeft == 0) {//本案例中,numOfRetries是1,也就是說不會多次嘗試,失敗了,直接拋出異常 throw ce; } //如果讀取的chunk校驗失敗,以當前的chunkpos為起始偏移量,嘗試新的副本 if (seekToNewSource(chunkPos)) { seek(chunkPos); } else { //找不到新的副本,拋出異常 throw ce; } } } while (retry); return read; }
readChunk方法由BlockReader實現,分析如下:
//只返回一個chunk的數據長度(默認512,最后一個chunk可能不足一個完整chunk的長度) protected synchronized int readChunk(long pos, byte[] buf, int offset,int len, byte[] checksumBuf) throws IOException { //讀取一個 DATA_CHUNK. long chunkOffset = lastChunkOffset; if ( lastChunkLen > 0 ) { chunkOffset += lastChunkLen; } //如果先前的packet已經讀取完畢,就讀下一個packet。 if (dataLeft <= 0) { //讀包的頭部 int packetLen = in.readInt(); long offsetInBlock = in.readLong(); long seqno = in.readLong(); boolean lastPacketInBlock = in.readBoolean(); int dataLen = in.readInt(); //校驗長度 lastSeqNo = seqno; isLastPacket = lastPacketInBlock; dataLeft = dataLen; adjustChecksumBytes(dataLen); if (dataLen > 0) { IOUtils.readFully(in, checksumBytes.array(), 0,checksumBytes.limit());//讀取當前包的所有數據塊內容對應的checksum,后面的流程會講checksum和讀取的chunk內容做校驗 } } int chunkLen = Math.min(dataLeft, bytesPerChecksum); //確定此次讀取的chunk長度,正常情況下是一個bytesPerChecksum(512字節),當文件最后不足一個bytesPerChecksum,讀取剩余的內容。 if ( chunkLen > 0 ) { IOUtils.readFully(in, buf, offset, chunkLen);//讀取一個數據塊的chunk checksumBytes.get(checksumBuf, 0, checksumSize); } dataLeft -= chunkLen; lastChunkOffset = chunkOffset; lastChunkLen = chunkLen; ... if ( chunkLen == 0 ) { return -1; } return chunkLen; }
總結
本文前面概要介紹了dfsclient讀取文件的示例代碼,順序讀取文件和隨機讀取文件的概要流程,最后還基於dfsclient和datanode讀取塊的過程,做了一個詳細的分析。
參考 http://caibinbupt.iteye.com/blog/284979
http://www.cnblogs.com/forfuture1978/archive/2010/11/10/1874222.html