1. 狀態一致性
當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法,也就是說在成功處理故障並恢復之后得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多
正確?舉例來說,假設要對最近一小時登錄的用戶計數。在系統經歷故障之后,計數結果是多少?如果有偏差,是有漏掉的計數還是重復計數?
有狀態的流處理,內部每個算子任務都可以有自己的狀態;
對於流處理器內部(沒有接入sink)來說,所謂的狀態一致性,其實就是我們所說的計算結果要保證准確;一條數據不應該丟失,也不應該重復計算;
在遇到故障時可以恢復狀態,恢復以后的重新計算,結果應該也是完全正常的;
狀態一致性級別
AT_MOST_ONCE(最多一次),當任務故障時最簡單做法是什么都不干,既不恢復丟失狀態,也不重播丟失數據。At-most-once語義的含義是最多處理一次事件。
AT_LEAST_ONCE(至少一次),在大多數真實應用場景,我們希望不丟失數據。這種類型的保障稱為at-least-once,意思是所有的事件都得到了處理,而一些事件還可能被處理多次。
EXACTLY_ONCE(精確一次),恰好處理一次是最嚴格的的保證,也是最難實現的。恰好處理一次語義不僅僅意味着沒有事件丟失,還意味着針對每一個數據,內部狀態僅僅更新一次。
曾經,at-least-once非常流行。第一代流處理器(如Storm和Samza)剛問世時只保證at-least-once,原因有二。
- 保證exactly-once的系統實現起來更復雜。這在基礎架構層(決定什么代表正確,以及exactly-once的范圍是什么)和實現層都很有挑戰性。
- 流處理系統的早期用戶願意接受框架的局限性,並在應用層想辦法彌補(例如使應用程序具有冪等性,或者用批量計算層再做一遍計算)。
最先保證exactly-once的系統(Storm Trident和Spark Streaming)在性能和表現力這兩個方面付出了很大的代價。為了保證exactly-once,這些系統無法單獨地對每條記錄運用應用邏
輯,而是同時處理多條(一批)記錄,保證對每一批的處理要么全部成功,要么全部失敗。這就導致在得到結果前,必須等待一批記錄處理結束。因此,用戶經常不得不使用兩個流處理
框架(一個用來保證exactly-once,另一個用來對每個元素做低延遲處理),結果使基礎設施更加復雜。曾經,用戶不得不在保證exactly-once與獲得低延遲和效率之間權衡利弊。Flink避
免了這種權衡。Flink的一個重大價值在於,它既保證了exactly-once,也具有低延遲和高吞吐的處理能力。
從根本上說,Flink通過使自身滿足所有需求來避免權衡,它是業界的一次意義重大的技術飛躍。
端到端(end-to-end)狀態一致性
目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在Flink流處理內部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如kafka)和輸出到
持久化系統; 端到端的一致性保證,意味着結果的正確性貫穿了整個流處理應用的始終,每個組件都保證了它自己的一致性;
整個端到端的一致性級別取決於所有組件中一致性最弱的組件;具體划分如下:
- 內部保證 --- checkpoint
- source端 --- 可重設數據的讀取位置;可重新讀取偏移量
- sink端 -- 從故障恢復時,數據不會重復寫入外部系統,有兩種實現方式:冪等(Idempotent)寫入和事務性(Transactional)寫入;
① 冪等寫入(Idempotent Writes)
冪等操作即一個操作可以重復執行很多次,但只導致一次結果更改,也就是說,后面再重復執行就不起作用了;
它的原理不是不重復寫入而是重復寫完之后結果還是一樣;它的瑕疵是不能做到完全意義上exactly-once(在故障恢復時,突然把外部系統寫入操作跳到之前的某個狀態然后繼續往里
邊寫,故障之前發生的那一段的狀態變化又重演了直到最后發生故障那一刻追上就正常了;假如中間這段又被讀取了就可能會有些問題);
② 事務寫入(Transactional Writes)
- 應用程序中一系列嚴密的操作,所有操作必須成功完成,否則在每個操作中所作的所有更改都會被撤銷;
- 具有原子性,一個事務中的一系列的操作要么全部成功,要么一個都不做。
實現思想:需要構建事務來寫入外部系統,構建的事務對應着checkpoint,等到checkpoint真正完成的時候,才把所有對應的結果寫入sink系統中。
實現方式:預寫日志(WAL)和兩階段提交(2PC);
DataStream API 提供了GenericWriteAheadSink模板類和TwoPhaseCommitSinkFunction 接口,可以方便地實現這兩種方式的事務性寫入。
預寫日志(Write-Ahead-Log,WAL)
把結果數據先當成狀態保存,然后在收到checkpoint完成的通知時,一次性寫入sink系統;
簡單易於實現,由於數據提前在狀態后端中做了緩存,所以無論什么sink系統,都能用這種方式一批搞定;
DataStream API提供了一個模板類:GenericWriteAheadSink,來實現這種事務性sink;
瑕疵:A. sink系統沒說它支持事務,有可能出現一部分寫進去了一部分沒寫進去(如果算失敗,再寫一次就寫了兩次了);
B. checkpoint做完了sink才去真正寫入(但其實得等sink都寫完checkpoint才能生效,所以WAL這個機制jobmanager確定它寫完還不算真正寫完,還得有一個外部系統已經確認完成的checkpoint)
兩階段提交(Two--Phase--Commit,2PC)-- 真正能夠實現exactly-once
對於每個checkpoint,sink任務會啟動一個事務,並將接下來所有接收的數據添加到事務里;
然后將這些數據寫入外部sink系統,但不提交他們 -- 這時只是預提交;
當它收到checkpoint完成時的通知,它才正式提交事務,實現結果的真正寫入;
這種方式真正實現了exactly-once,它需要一個提供事務支持的外部sink系統,Flink提供了TwoPhaseCommitSinkFunction接口。
2PC對外部sink的要求
外部sink系統必須事務支持,或者sink任務必須能夠模擬外部系統上的事務;
在checkpoint的間隔期間里,必須能夠開啟一個事務,並接受數據寫入;
在收到checkpoint完成的通知之前,事務必須是“等待提交”的狀態,在故障恢復的情況下,這可能需要一些時間。如果這個時候sink系統關閉事務(例如超時了),那么未提交的數據就會丟失;
sink任務必須能夠在進程失敗后恢復事務;
提交事務必須是冪等操作;
不同Source和sink的一致性保證
2. 一致性檢查點(Checkpoint)
checkpoint機制
Flink使用了一種輕量級快照機制 --- 檢查點(Checkpoint)來保證exactly-one語義,在出現故障時將系統重置回正確狀態;
有狀態流應用的一致檢查點,其實就是:所有任務的狀態,在某個時間點的一份拷貝(一份快照)。而這個時間點,應該是所有任務都恰好處理完一個相同的輸入數據的時候。
應用狀態的一致性檢查點,是Flink故障恢復機制的核心。
Flink+kafka端到端的exactly-once語義
Flink和kafka天生就是一對,用kafka做為source,用kafka做完sink <===> 實現端到端的一致性
- 內部 -- 利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復,保證內部的狀態一致性;
- source -- kafka consumer作為source,可以將偏移量保存下來,如果后續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性;
- sink -- kafka producer作為sink,采用兩階段提交sink,需要實現一個TwoPhaseCommitSinkFunction
內部的checkpoint機制見上;source和 sink見下:
默認是AT_LEAST_ONCE
Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。
當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在算子間傳遞下去。
每個算子會對當前的狀態做個快照,保存到狀態后端;
對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。
每個內部的 transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里。
sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務(還不能被消費);當遇到 barrier 時,把狀態保存到狀態后端,並開啟新的預提交事務。(以barrier為界之前的數
據屬於上一個事務,之后的數據屬於下一個新的事務);
當所有算子任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成。
當sink 任務收到確認通知,就會正式提交之前的事務,kafka 中未確認的數據就改為“已確認”,數據就真正可以被消費了。
所以我們看到,執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
具體的兩階段提交步驟總結如下:
- 第一條數據來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但標記為未提交,這就是“預提交”
- jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態存入狀態后端,並通知 jobmanager
- sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據
- jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成
- sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據
- 外部kafka關閉事務,提交的數據可以正常消費了。
所以我們也可以看到,如果宕機需要通過StateBackend進行恢復,只能恢復所有確認提交的操作。
在代碼中真正實現flink和kafak的端到端exactly-once語義:
A. 這里需要配置下,因為它默認的是AT_LEAST_ONCE;
B. 對於外部kafka讀取的消費者的隔離級別,默認是read_on_commited,如果默認是可以讀未提交的數據,就相當於整個一致性還沒得到保證(未提交的數據沒有最終確認那邊就可以讀了,相當於那邊已經消費數據了,事務就是假的了) 所以需要修改kafka的隔離級別;
C. timeout超時問題,flink和kafka 默認sink是超時1h,而kafak集群中配置的tranctraction事務的默認超時時間是15min,flink-kafak這邊的連接器的時間長,這邊還在等着做操作 ,kafak那邊等checkpoint等的時間太長直接關閉了。所以兩邊的超時時間最起碼前邊要比后邊的小。