漫談流式計算的一致性


參考,

http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/

http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/

 

image

對於batch分析,fault-tolerant很容易做,失敗只需要replay,就可以完美做到容錯。

對於streaming分析, 數據流本身是動態,沒有所謂的開始或結束,雖然可以replay buffer的部分數據,但fault-tolerant做起來會復雜的多

當前主流的一些streaming分析平台,都有一些各自特有的fault-tolerant的機制,在此分析和總結一下,

無狀態流數據處理,

這是種比較簡單的流式數據的場景,典型的應用是數據ETL,數據存儲,數據流過是沒有狀態的

保證at least once語義,
分鍾級別,Storm的acker機制,就可以很好的保證, http://storm.apache.org/documentation/Guaranteeing-message-processing.html
message沒有被正確處理,收到ack時,可以選擇重發,這樣每條message對可以保證被處理到,但可能會被重復處理

小時,天級別,利用kafka的replay,一般達到天級別的cache

保證exactly once語義,
對於無狀態數據流,其實只要依賴最終存儲的去重性(deduplication), 就可以達到exactly once
比如對於數據庫,通過unique key和insert ignore就可以解決這個問題,無論你之前重復處理多少次,最終我只存儲一次。

如果最終存儲不支持去重,或者場景比較復雜不僅僅是存儲,比如做疊加計數 或 update
做疊加計數,當前的機制,你無法知道這個message是否加過
做update的時候,更新的時序性很重要,這個是ack機制無法保證的

Storm 0.7就提供transactional topology特性,http://storm.apache.org/documentation/Transactional-topologies.html

首先給message加上transaction id,這樣有兩個好處,可以保證時序性,在寫入存儲的時候,可以按transaction id順序寫入
並且在可以外部存儲上記錄當前最新的transaction id,保證相同的transaction,不會被重復寫入
這個是transactional topology的核心思路,這樣確實是可以保證強一致性,exactly once語義
但這個方案只適用於無狀態,或是依賴外部存儲的,狀態必須要存儲在外部存儲上

至於使用batch,或將topology分為processing和commit階段,都是對性能的優化,並不會提升一致性的保障
但由於使用micro-batch是必須的,所以也稱這類方案是micro-batch方案,除了transactional topology,還有Apache Spark Streaming
micro-batch的壞處,
1. 改變編程模型,偽流式
2. windows based聚合的限制,只能是micro-batch的倍數,比如micro-batch是3分鍾,你想做個5分鍾聚合,沒法做
2. 延遲變大,如果本身秒級別,但如果micro-batch是1分鍾,那延遲就至少1分鍾

有狀態流數據處理,

典型的場景,就是windows-based的聚合或計算,比如計算1分鍾內的計數或平均值,這樣會有部分數據需要cache在內存中
這樣當fail-over時,如何可以恢復cache,並保證exactly once語義

最直接的想法,

局部的snapshot

每個component對cache定期做snapshot,然后在fail-over后,各自恢復自己的cache,
這樣做的問題,
1. snapshot很難增量做,如果cache比較大,成本會比較高
2. snapshot只能定期做,會有部分丟失
3. 最關鍵的,對於分布式系統,各個compoent獨立的進行snapshot,很難達到同一個狀態,每個component的處理速度都是不一樣的,有的處理到n做了snapshot,而有的可能做到n+1才做,
缺乏一個統一的參照系。

 

change-log
每個 component,當接收到一個 message 的時候,產生一條 change log 記錄該 message 和更新的狀態,存入 transactional log 和數據庫
當做 fail-over 的時候,只需要每個 component 將數據庫中的 log,拿出來 replay 即可
這種方式使用的平台如 Google Cloud Dataflow,Apache Samza

對於 Apache Samza,會將 change log 放入kafka中,

image

當fail-over后,每個task從相應的kafka topic里面讀出change-log,完成local state的replay

這樣做的好處,是不用直接去snapshot local cache,如果cache比較大的話,這樣是比較划算的
但是如果數據流很big的話,這樣做也不合適了,因為change-log會非常大

 

Distributed Snapshots (Apache Flink),全局的 snapshot

針對前面提到的局部 snapshot 最關鍵的問題,提出全局 snapshot 的方法,
其實最大的問題仍然是分布式系統的根本問題,統一參照系的問題,如何讓每個 component 在同一的狀態下,進行 snapshot

這個原理來自 Chandy and Lamport, 1985,的paper “Distributed Snapshots: Determining Global States of Distributed Systems”

http://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

局部的snapshot會有的問題,

狀態丟失,如下圖,但狀態中傳輸的時候,對P和Q進行snapshot,會導致隊列中的綠藍橙狀態丟失

image

狀態重復,brown狀態中P和Q的snapshot里面同時出現

image

怎么解這樣的問題?分布式系統中缺乏統一參照系的情況下,只有通過通信才能確定偏序的問題
所以這里使用marker來做組件間的同步,並防止丟失狀態,會同時對組件,以及隊列同時做snapshot, 如下圖

image

P做snapshot,然后發送marker到Q
Q收到marker的時候,知道P做了snapshot,那么我也要做snapshot
同時還要對PQ channel做snapshot,此時channel中有個green,但是由於green是在marker后面的,說明它在P的snapshot里面已經做過,不需要再做,所以此時PQ的snapshot為空
Q在做完snapshot后,還需要把marker返回給P,因為在過程中orange從Q被發送到P
當P收到Q返回的marker時,由於P的snapshot已經做過,無法改變
所以把orange放在QP channel的snapshot中

最終做出的全局的snapshot為,

P(red, green, blue)
channel PQ ()
Q(brown, pink)
channel QP (orange)
這樣就解決了狀態丟或重復的問題

 

Flink’s distributed snapshotting實現基於stream barriers

image

可見,barrier可以將流拆分成一段段的數據,每個barrier都是一個snapshot點,但是這種拆分不同於micro-batch,並不會影響到正常的流式處理
在DAG,即有向無環圖的case下,是不需要對channel做snapshot的,場景會比較簡單
只是每個組件收到barrier的時候去做snapshot就好,該算法的幾個前提:
1. 網絡可靠,消息FIFO;
2. channel可以block,unblock,支持對所有output channel進行廣播
3. 可自動識別注入的barrier

完成過程如圖,這是個有兩條入邊的case,相對復雜些
當收到一條channel的barrier時,需要先block該channel,然后等待另一個channel中的barrier
當兩條channel的barrier都到達時,說明達到統一狀態,進行checkpoint
然后unblock之前block的channel,並對所有的output channel廣播該barrier

image

當DAG上的所有組件都完成snapshot時,那么一個全局的snapshot就完成了,以barrier為唯一標識

比較抽象,下圖以kafka為例子解釋一下,https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

image

對於kafka而言,不同的partition需要不同的線程讀,
圖中,4個source thread分別從4個partition讀取數據
其中由唯一的master來發起checkpoint流程,
過程是,
1. Master給所有的source thread發checkpoint請求
2. source thread接收到cp請求后,會記錄當前的offset,比如5791,並做該offset的message前發出streaming barrier
    並將offset返回給master

3. 這樣master收到所有source的ack offset,就相當於對source做了snapshot,恢復時只需要將相應的source置到該offset即可
4. 中間每個組件,當收到所有input channel的barrier時,將cp存入數據庫,並通知Master
5. 層層下去,直到所有Sink節點,最終節點,完成snapshot

6. master接收到所有節點的做完cp的ack,知道這次checkpoint全部完成

這個方案的最大的問題是,當多個input channel時,需要等所有的barrier到齊,這個明顯會增加latency
Flink的優化是,不等,看到barrier就打snapshot,這樣的問題就是無法保證exactly once,會重復,
因為后來的barrier打checkpoint時會覆蓋先前的cp,
此時barrier先到的channel已經處理了一些barrier之后的數據,這部分結果也會存在cp中

但當fail-over的時候,因為replay是根據你發送barrier的offset來重發的,所以這部分會重復


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM