該系列總覽: Hadoop3.1.1架構體系——設計原理闡述與Client源碼圖文詳解 : 總覽
流水線(PipeLine),簡單地理解就是客戶端向DataNode傳輸數據(Packet)和接收DataNode回復(ACK)[Acknowledge]的數據通路。
整條流水線由若干個DataNode串聯而成,數據由客戶端流向PipeLine,在流水線上,假如DataNode A 比 DataNode B 更接近流水線
那么稱A在B的上游(Upstream),稱B在A的下游(Downstream)。
流水線上傳輸數據步驟
1. 客戶端向整條流水線的第一個DataNode發送Packet,第一個DataNode收到Packet就向下個DataNode轉發,下游DataNode照做。
2. 接收到Packet的DataNode將Packet數據寫入磁盤
3. 流水線上最后一個DataNode接收到Packet后向前一個DataNode發送ACK響應,表示自己已經收到Packet,上游DataNode照做
4. 當客戶端收到第一個DataNode的ACK,表明此次Packet的傳輸成功
一.流水線基礎概念
流水線就像一條水管,數據(Packets)從一端流進去,依次經過流水線上的各個DataNode。
回復(ACK)則是相反,ACK從最后一個節點依次向前傳遞,流回客戶端
多么藝術的設計!
但是,有一個問題,要知道,若干個Packet才能傳輸完一個Block,並且多個Block組成一個文件
所以從文件或者Block的角度來看,即使每台機器的效率接近,也可能出現流水線不均勻的情況(接收文件數據量不均勻)
出現的情況往往是第一個節點接收的數據量最多,其后的節點遞減,所以我們可以考慮把第一個DataNode選為性能較好的節點,或者是離客戶端盡可能近的節點。但實際上,節點的選擇是由NameNode根據機架感知等技術實現的。並且客戶端的流水線節點選取是由NameNode決定的。
還有一個問題。HDFS是支持一寫多讀機制的,意味着在流水線上的DataNode(正在被寫)允許被其他客戶端讀取(Reader 以下均稱此類讀客戶端為Reader)。這樣就會產生讀的不一致性,比如說我在流水線上游的某個DataNode中讀到“武漢加油!”這條數據,但是去下游的DataNode讀,卻讀不到。這是因為下游的DataNode可能還沒收到數據。
雖然說一般客戶端只會讀取一個DataNode的信息,但如果被讀取的DataNode宕機,那么客戶端就要另選DataNode,可能造成前后數據不一致。
或者有多個客戶端需要根據對方的數據協調工作,每個客戶端讀的不是一個DataNode,那么對同一讀取目標,讀出來的數據不一致。這種水平上的不一致可能也會導致業務出錯。
那么,怎么解決呢?
二.流水線讀一致性設計
我們先來定義一下概念
首先提出問題,在流水線中的某個DataNode,怎么樣判斷自己的數據是否可以給Reader讀取。
就比如上面那張圖,不能一致性讀的原因是下游的DataNode3沒有接收到DataNode1已經接收的Packet。那么如果DataNode1確定DataNode3已經接收到Packet了,那不就能放心地把Packet的數據給Reader了嗎?就算Reader再去DataNode3讀,也會讀到同樣的數據,而不會出現數據找不到或者數據不一致的情況。
於是有了定義:對於一個數據塊,一個DataNode接收到的數據為DR(Data Received),根據下游收到的ACK,已被下游確認接收的數據為DA(Data Acknowledged)
順便定義:對於 i 節點的DA是DAi , DR是DRi , 對於客戶端,客戶端發出去的數據為CS(Clent Send) ,而客戶端確認的數據為CA(Client Acknowlege)
DA和DR其實是一個增量的概念,並且針對的是一個Block。下圖是一個DataNode中的Replica(Block在DataNode中稱為Replica,強調是Block的副本)在逐漸被寫滿的過程
我們可以分析一下,整個流水線上,各個節點的DR和DA的走勢
以及從圖形上看,DR和DA在一來一回的流水線上的分布情況
我們發現Writer發送數據(第一個DataNode的DR)最多,但是確認了的數據DA最少,原因是Packet和ACK在流水線一來一回需要路程時間
Reader直接訪問一個DataNode中Replica的數據時,需要提供四個數據<BlockId,BGS(Block Generation Stamp 可以理解成Block的版本號) , offset, len>
BlockId 和 BGS 用來識別一個Block,當DataNode中不存在指定BlockId的Replica或者Replica的BGS比Reader給出的BGS舊,那么DataNode將拒絕這次讀請求
offset 表示Reader將從哪里開始讀取數據,len表示欲讀取的數據長度,因為DA是線性增長的,所以只要保證 offset + len <= DA ,DataNode就允許這次讀請求(當然offset 和 len 都大於0)
具體怎么做才能實現呢?有兩種做法。
做法一,當其他應用請求一個Reader客戶端讀取數據的時候,Reader會向將要讀的DataNode發送請求,詢問DataNode的DA。如果應用請求的數據規模(offset + len)大於DA,那么將拋出異常
否則,Reader將獲取DataNode的Min(DR, offset + len)長度數據放到緩存Q中,並且安全地返回 off + len 數據給應用,隨后Reader監聽這個DataNode的DA的變化,直到應用放棄對文件的讀取。如果DA增加,表示Reader能從緩存Q中讀到的最大數據量增加,也就是offset + len能達到更大的值。當讀取任意一個DataNode P,假設他的DA是m,如果這個DataNode剛好宕機,1. Reader轉而訪問上游的DataNode,上游DataNode的DR比下游的DR大,隨着時間的推遲,上游DataNode會把整個DR暴露給Reader,其中包含下游DR的數據,下游的數據在上游仍然能訪問。2.Reader轉而訪問下游的DataNode,下游的DataNode的DA比P的要大,所以在P讀到的數據在下游中仍然找得到。一致性讀達成。
這種做法的缺點是客戶端的代碼和算法實現復雜,要時刻監聽DA的變化。
做法二,為了更清楚地描述,分一下步驟
1.Reader向DataNode a發出<BlockId,BGS(Block Generation Stamp 可以理解成Block的版本號) , offset, len>,DataNode a的DA必須大於等於offset + len
2.讀取的請求不是發給DataNode a,而是將請求發給另外一個DataNode b
3.如果
1.offset + len <= DAb,那么可以安全地返回數據
2.如果offset + len > DAb ,因為DAa >= offset + len > DAb。所以DAa > DAb,所以b在a的上游,所以DRb > DRa,所以在b上有a已經ACK了的數據。所以b也可以安全地返回offset + len的數據給Reader
3.如果offset + len > DRb,那么將拋出異常。
雖然上述步驟2訪問了DR,但是DR中被訪問的數據已經在下游被ACK了,只是Reader自己移動到了上游去找數據。
當前訪問的DataNode a如果宕機
1.向下游讀,下游的DA大於上游,故在上游的數據一般能在下游找得到,經過步驟1將數據返回
2.向上游讀,因為之前已經規定好,只能訪問offset + len范圍的數據,並且上游的BR總是包含DAa,所以 offset + len 長度的數據總是能在上游找到。
一致性讀解決
做法二雖然簡單但是要訪問兩個節點。網絡上的切換的開銷不小。
具體HDFS實現了哪一個,需要看版本決定,筆者暫時還沒有找到官方給定哪些版本實現哪種方案和研究源碼,日后填坑。
三.流水線的生命周期
1.流水線被建立(Setup) : 客戶端Writer通告NameNode獲得Block信息,通知信息里locations(Replica所在)包含的DataNode,告知這些DataNode將要創建一條流水線,DataNode收到后會回復。
2.數據傳輸(DataStream) : 當Writer在步驟1接收到如數的DataNode的回應后,流水線正式創建,Writer能夠在流水線上以Packet為單位傳輸數據。
3.恢復(Recovery) : 恢復分三種情況 : 1.流水線創建時失敗 2.流水線傳輸過程失敗 3.流水線關閉失敗
4.關閉(Close) : 當一個塊被寫滿,Writer將通知DataNode流水線關閉,DataNode可以將塊的狀態設置為FINALIZED並且DataNode向NameNode匯報
四.流水線的建立
流水線建立的時機:
1.客戶端請求新建一個Block,需要新建流水線,以便將新Block的數據寫入到DataNode的Replica里
2.客戶端請求打開一個文件並且對這個文件進行append操作,這個文件末尾的最后一個塊如果沒有滿,那么所有擁有這個Block的Replica的DataNode將被連起來成為一條流水線,以便對這些沒寫滿的Replica進行追加,(其實是對Block進行追加)
3.在恢復過程中需要建立流水線
流水線建立流程:
客戶端的行為:
1.客戶端首先需要詢問NameNode相關信息,比如對應Block的Replica在哪,Block的BGS和ID等信息。如果流水線的建立的是為了恢復流水線,或者文件被打開用來append,那么客戶端還會為Block向NameNode申請新的BGS。
2.根據1中獲取的信息,客戶端試圖和流水線的第一個DataNode通過Socket建立連接。
3.客戶端將1中獲得的信息發布到流水線上,告知線上的DataNode,該Block對應的Replica需要被操作。
發送的信息具體按流水線的用途分為:
DataNode行為:
.1.當DataNode從3中得知信息后,將按情況進行如下操作
最后一步:
如果建立的流水線是用來恢復或者Append的,那么將會通知NameNode,流水線完成,告知NameNode更新流水線信息(塊的位置等)。
重新架構流水線:
如果上述所有步驟不成功,則會重新建立流水線(進行流水線恢復)。
五:流水線的恢復
請見:Hadoop架構: 關於Recovery (Lease Recovery , Block Recovery, PipeLine Recovery)