站在DataNode的視角,看看pipeline寫的流程,本文不分析客戶端部分,從客戶端寫數據之前拿到了3個可寫的block位置說起。
每個datanode會創建一個線程DataXceiverServer,接收上游過來的TCP連接,對於每個新建的TCP連接,都會創建一個叫做DataXceiver的線程處理這個連接. 這個線程不斷的從TCP連接中讀op,然后調用processOp(op)處理這個op,這里以write block 這個op為例.
對於datanode來說,write block操作由DataXceiver的writeBlock函數實現.
大體步驟如下:
-
new 一個BlockReceiver對象,隨后用於接收上游(client或者datanode)的block數據.
-
根據傳進來的DatanodeInfo數組,向數組的第一個元素代表的datanode建立TCP連接,targets參數是從上游的TCP連接中解析出來的,邏輯在Receiver的opWriteBlock方法中,Receiver是DataXceiver的基類.然后調用Sender的writeBlock方法給下游datanode發送write block相關元信息,包括DatanodeInfo數組(刨去第一個元素),clientname,block的當前gs,minBytesRcvd,maxBytesRcvd(對於append,recovery操作有用)等。然后讀取下游的回復封裝在BlockOpResponseProto對象中,可以通過內部成員firstBadLink知道建pipeline中第一個失敗的datanode節點。接着將BlockOpResponseProto回復給上游
(datanode或者client),最后調用第一步new的BlockReceiver的receiveBlock方法用於接收一個完整的block.如下:receiveBlock內部根據clientname發現是一個客戶端在寫block,創建一個PacketResponder線程用於處理下游datanode對packet的ack.PacketResponder后面分析。接着,不斷的調用receivePacket()方法從上游(datanode或者client)接收一個個的packet,接收一個完整的packet的邏輯是由內部的PacketReceiver來處理的.
對於一個接收到的packet,寫入block file文件,同時checksum信息寫meta文件,然后放入PacketResponder的ack queue隊列,然后將packet寫給下游的datanode。最后調用PacketResponder的 close方法,這個方法會等到ack queue為空,即所有packet都已經從下游收到,並且已經給上游ack. -
receiveBlock()結束后,關掉和上下游的連接.
清空ack queue的邏輯由專門處理下游ack包的PacketResponder線程處理,邏輯如下:
-
如果datanode是pipeline的中間node(通過PacketResponder的type屬性來決定,LAST_IN_PIPELINE和HAS_DOWNSTREAM_IN_PIPELINE),
那么從下游讀一個PipelineAck,從ack中拿到seqno,然后從ack queue中get(不刪除)第一個packet,拿出seqno,記作expected_seq_no,然后比較是否相等,如果不相等,說明寫出錯. 如果seqno相同,往下. -
如果從ack queue中get的packet是block的最后一個packet,說明一個block接收完成.那么調用finalizeBlock方法.finalizeBlock方法邏輯如下:
關閉block file和meta file文件,調用FsDatasetImpl的finalizeBlock(block)將block文件以及對應的meta文件移動到對應的block pool下的finalized目錄下,然后生成一個FinalizedReplica對象,將bpid->FinalizedReplica的映射關系記錄在內存中的volumnMap中,對象位於FsDatasetImpl下的ReplicaMap volumnMap(從ReplicaMap中定位一個ReplicaInfo,需要拿着bpid和block id去找)最后調用datanode的closeBlock()方法,將block回報給namenode,該方法邏輯如下:
拿着block的bpid從BlockPoolManager中拿到相應的BPOfferService,通知namenode這個block。在data node這邊,data node和每個namenode的接口由一
個BPServiceActor來承擔,這是一個線程, 這個線程會向namenode匯報received block或者指示namenode去刪除block.最后調用DatanodeProtocolClientSideTranslatorPB bpNamenode的blockReceivedAndDeleted()將block信息匯報上去. -
給從下游接收的ack回復給上游。
-
將packet從ack queue的頭部刪除。
可以看出,一個block的寫操作對於每個data node來說,由兩個線程參與,一個是DataXceiver,用於接收上游的數據,一個是PacketResponder,用於處理下游回來的ack。還沒有接收到下游的ack並且沒有給上游回復ack的packet都存在在ack queue中。
參考資料
hadoop-hdfs-2.4.1.jar