深入理解Flink ---- 系統內部消息傳遞的exactly once語義


At Most once,At Least once和Exactly once

在分布式系統中,組成系統的各個計算機是獨立的。這些計算機有可能fail。

一個sender發送一條message到receiver。根據receiver出現fail時sender如何處理fail,可以將message delivery分為三種語義:

 

At Most once: 對於一條message,receiver最多收到一次(0次或1次).

可以達成At Most Once的策略:

sender把message發送給receiver.無論receiver是否收到message,sender都不再重發message.

 

At Least once: 對於一條message,receiver最少收到一次(1次及以上).

可以達成At Least Once的策略:

sender把message發送給receiver.當receiver在規定時間內沒有回復ACK或回復了error信息,那么sender重發這條message給receiver,直到sender收到receiver的ACK.

 

Exactly once: 對於一條message,receiver確保只收到一次

 

Flink的Exactly once模式

Flink實現Exactly once的策略: Flink會持續地對整個系統做snapshot,然后把global state(根據config文件設定)儲存到master node或HDFS.當系統出現failure,Flink會停止數據處理,然后把系統恢復到最近的一次checkpoint.

什么是分布式系統的global state?

分布式系統由空間上分立的process和連接這些process的channel組成.

空間上分立的含義是,這些process不共享memory,而是通過在communication channel上進行的message pass來異步交流.

分布式系統的global state就是所有process,channel的local state的集合.

process的local state取決於the state of local memory and the history of its activity.

channel的local state是上游process發送進channel的message集減去下游process從channel接收的message的差集.

什么是一致性global state?

假設有兩個銀行賬戶A,B.A中初始有600美元,B中初始有200美元. SA, SB, CAB, CBA由A和B分別記錄,組成了global state.

在t0時刻,A向B轉賬50美元;在t1時刻,B向A轉賬80美元.

如果SA, SB記錄於(t0, t1), CAB, CBA記錄於(t1, t2),那么global state = 550+200+50+80 = 880,比真實值多了$80. 這就是不一致性global state.

如果 SA, SB, CAB, CBA同屬於一個時間區間,那么得到的global state就是一致性的.

Snapshot算法獲得一致性global state的難點是什么?

分布式系統沒有共享內存(globally shared memory)和全局時鍾(global clock).

如果分布式系統有共享內存,那么可以從共享內存中直接獲取整個分布式系統的snapshot,無需分別獲得各個process,channel的local state再組合成global state.

如果分布式系統有global clock,那么所有的process能在同一時刻各自記錄local state,這樣就保證了state的一致性.

獲得一致性global state的算法 ---- Chandy-Lamport算法

精髓:該算法在普通message中插入了control message – marker

前提:

1)       message的傳輸可能有delay,但一定會到達

2)       每兩個process之間都有一條communication path(可能由多條channel組成)

3)       Channel是單向的FIFO

描述:

Marker sending rule for process Pi

(1)     Process Pi 記錄自身state

(2)     Pi在記錄自身state后,發送下一條message前,Pi向自己所有的outgoing channel發送marker

Marker receiving rule for process Pj on receiving a marker along channel C

如果Pj第一次接收到marker,那么

         把channel C的state記為空集

         執行marker sending rule

否則(並非第一次接收到marker)

         把記錄自身state(或最近一次記錄另一個channel的state)后,收到這個marker前的message集記為C的state

 

每個process會記錄自身的state和它的incoming channel的state

圖解:

A,B,C,D代表4個process.有向線段代表FIFO的channel.綠色圓形代表普通message,橙色矩形代表marker.藍色的節點和線段代表已經記錄state的process和channel

Process A啟動snapshot算法,A執行marker sending rule(記錄自身state,然后發送marker):

Process B接收到marker,執行marker receiving rule:將channel AB的state記為空集,然后記錄自身state並向下發送marker:

 

Process C接收到marker, 執行marker receiving rule:將channel AC的state記為空集,然后記錄自身state並向下發送marker:

 

Process D接收到來自於process B的marker, 執行marker receiving rule:將channel BD的state記為空集,然后記錄自身state並向下發送marker:

 

 

Process D接收到來自於process C的marker, 執行marker receiving rule:這是process D第二次接收到marker,將channel CD的state記為{5},不會向下發送marker:

自此process A,B,C,D的local state和所有Channel的state都記錄完畢. 將這些local state組合,所得到的就是global state

Flink的snapshot算法 ---- Asynchronous Barrier Snapshotting(ABS)

為了消去記錄channel state這一步驟,process在接收到第一個barrier后不會馬上做snapshot,

而是等待接受其他上游channel的barrier.

在等待期間,process會把barrier已到的channel的record放入input buffer.

當所有上游channel的barrier到齊后,process才記錄自身state,之后向所有下游channel發送barrier.

因為先到的barrier會等待后到的barrier,所有所有barrier相當於同時到達process,

因此,該process的上游channel的state都是空集.這就避免了去記錄channel的state

圖解:

A是JobManager, B C是source,D是普通task.

JobManager發起一次snapshot:向所有source發送barrier.

每個Barrier先后到達各自的source.Source在收到barrier后記錄自身state,然后向下游節點發送barrier

Barrier (from)B 到達process D,但不會進行snapshot

Barrier (from)B已經到達process D,

所以當來自於channel BD的record 6 7到達后,process D不會處理它們,而是將它們放入input buffer.

而Barrier (from)C尚未到達process D,所以當來自於channel CD的record 4到達后,process D會處理它.

Barrier C也到達process D.

這樣,process D已經接收到了所有上游process的barrier.process D記錄自身state,然后向下游節點發送barrier

ABS的at least once模式

當process接收到barrier后,會立刻做snapshot. Process會繼續處理所有channel的record.后來的snapshot會覆蓋之前的snapshot.

Record 6本不屬於這次checkpoint,卻包含在process D的local state中.

在recovery時,source認為record 6還沒有被處理過,所以重發record 6. 這就導致stream中出現了兩個record 6,造成了at least once.

 

這里的問題在於,當第二個barrier到達時,節點D再次對自身做了snapshot.

而在Chandy-Lamport的算法中,第二個barrier到達時,節點D應該對barrier來源的channel做snapshot.

 

對單一input channel的算子來說,沒有Alignment這個概念.這些算子在at least once模式下也是呈現exactly once的行為.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM