HDFS pipeline寫 -- datanode


站在DataNode的視角,看看pipeline寫的流程,本文不分析客戶端部分,從客戶端寫數據之前拿到了3個可寫的block位置說起。

每個datanode會創建一個線程DataXceiverServer,接收上游過來的TCP連接,對於每個新建的TCP連接,都會創建一個叫做DataXceiver的線程處理這個連接. 這個線程不斷的從TCP連接中讀op,然后調用processOp(op)處理這個op,這里以write block 這個op為例.

對於datanode來說,write block操作由DataXceiver的writeBlock函數實現.

大體步驟如下:

  1. new 一個BlockReceiver對象,隨后用於接收上游(client或者datanode)的block數據.

  2. 根據傳進來的DatanodeInfo數組,向數組的第一個元素代表的datanode建立TCP連接,targets參數是從上游的TCP連接中解析出來的,邏輯在Receiver的opWriteBlock方法中,Receiver是DataXceiver的基類.然后調用Sender的writeBlock方法給下游datanode發送write block相關元信息,包括DatanodeInfo數組(刨去第一個元素),clientname,block的當前gs,minBytesRcvd,maxBytesRcvd(對於append,recovery操作有用)等。然后讀取下游的回復封裝在BlockOpResponseProto對象中,可以通過內部成員firstBadLink知道建pipeline中第一個失敗的datanode節點。接着將BlockOpResponseProto回復給上游
    (datanode或者client),最后調用第一步new的BlockReceiver的receiveBlock方法用於接收一個完整的block.如下:

    receiveBlock內部根據clientname發現是一個客戶端在寫block,創建一個PacketResponder線程用於處理下游datanode對packet的ack.PacketResponder后面分析。接着,不斷的調用receivePacket()方法從上游(datanode或者client)接收一個個的packet,接收一個完整的packet的邏輯是由內部的PacketReceiver來處理的.
    對於一個接收到的packet,寫入block file文件,同時checksum信息寫meta文件,然后放入PacketResponder的ack queue隊列,然后將packet寫給下游的datanode。最后調用PacketResponder的 close方法,這個方法會等到ack queue為空,即所有packet都已經從下游收到,並且已經給上游ack.

  3. receiveBlock()結束后,關掉和上下游的連接.

清空ack queue的邏輯由專門處理下游ack包的PacketResponder線程處理,邏輯如下:

  1. 如果datanode是pipeline的中間node(通過PacketResponder的type屬性來決定,LAST_IN_PIPELINE和HAS_DOWNSTREAM_IN_PIPELINE),
    那么從下游讀一個PipelineAck,從ack中拿到seqno,然后從ack queue中get(不刪除)第一個packet,拿出seqno,記作expected_seq_no,然后比較是否相等,如果不相等,說明寫出錯. 如果seqno相同,往下.

  2. 如果從ack queue中get的packet是block的最后一個packet,說明一個block接收完成.那么調用finalizeBlock方法.finalizeBlock方法邏輯如下:

    關閉block file和meta file文件,調用FsDatasetImpl的finalizeBlock(block)將block文件以及對應的meta文件移動到對應的block pool下的finalized目錄下,然后生成一個FinalizedReplica對象,將bpid->FinalizedReplica的映射關系記錄在內存中的volumnMap中,對象位於FsDatasetImpl下的ReplicaMap volumnMap(從ReplicaMap中定位一個ReplicaInfo,需要拿着bpid和block id去找)最后調用datanode的closeBlock()方法,將block回報給namenode,該方法邏輯如下:

    拿着block的bpid從BlockPoolManager中拿到相應的BPOfferService,通知namenode這個block。在data node這邊,data node和每個namenode的接口由一
    個BPServiceActor來承擔,這是一個線程, 這個線程會向namenode匯報received block或者指示namenode去刪除block.最后調用DatanodeProtocolClientSideTranslatorPB bpNamenode的blockReceivedAndDeleted()將block信息匯報上去.

  3. 給從下游接收的ack回復給上游。

  4. 將packet從ack queue的頭部刪除。

可以看出,一個block的寫操作對於每個data node來說,由兩個線程參與,一個是DataXceiver,用於接收上游的數據,一個是PacketResponder,用於處理下游回來的ack。還沒有接收到下游的ack並且沒有給上游回復ack的packet都存在在ack queue中。

參考資料

hadoop-hdfs-2.4.1.jar


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM