Flink是如何實現exactly-once語義的


轉自:https://blog.csdn.net/xianpanjia4616/article/details/86375224

 

最少一次:斷了之后 重新執行 再去重

嚴格一次:根據檢查點,再執行一次

-------------------------------------------------------------------------------------------

Flink跟其他的流計算引擎相比,最突出或者做的最好的就是狀態的管理.什么是狀態呢?比如我們在平時的開發中,需要對數據進行count,sum,max等操作,這些中間的結果(即是狀態)是需要保存的,因為要不斷的更新,這些值或者變量就可以理解為是一種狀態,拿讀取kafka為例,我們需要記錄數據讀取的位置(即是偏移量),並保存offest,這時offest也可以理解為是一種狀態.

Flink是怎么保證容錯恢復的時候保證數據沒有丟失也沒有數據的冗余呢?checkpoint是使Flink 能從故障恢復的一種內部機制。檢查點是 Flink 應用狀態的一個一致性副本,包括了輸入的讀取位點。在發生故障時,Flink 通過從檢查點加載應用程序狀態來恢復,並從恢復的讀取位點繼續處理,就好像什么事情都沒發生一樣。Flink的狀態存儲在Flink的內部,這樣做的好處就是不再依賴外部系統,降低了對外部系統的依賴,在Flink的內部,通過自身的進程去訪問狀態變量.同時會定期的做checkpoint持久化,把checkpoint存儲在一個分布式的持久化系統中,如果發生故障,就會從最近的一次checkpoint中將整個流的狀態進行恢復.

下面就來介紹一下Flink從Kafka中獲取數據,怎么管理offest實現exactly-once的.

Apache Flink 中實現的 Kafka 消費者是一個有狀態的算子(operator),它集成了 Flink 的檢查點機制,它的狀態是所有 Kafka 分區的讀取偏移量。當一個檢查點被觸發時,每一個分區的偏移量都被存到了這個檢查點中。Flink 的檢查點機制保證了所有 operator task 的存儲狀態都是一致的。這里的“一致的”是什么意思呢?意思是它們存儲的狀態都是基於相同的輸入數據。當所有的 operator task 成功存儲了它們的狀態,一個檢查點才算完成。因此,當從潛在的系統故障中恢復時,系統提供了 excatly-once 的狀態更新語義。

下面我們將一步步地介紹 Apache Flink 中的 Kafka 消費位點是如何做檢查點的。在本文的例子中,數據被存在了 Flink 的 JobMaster 中。值得注意的是,在 POC 或生產用例下,這些數據最好是能存到一個外部文件系統(如HDFS或S3)中。

第一步:
如下所示,一個 Kafka topic,有兩個partition,每個partition都含有 “A”, “B”, “C”, ”D”, “E” 5條消息。我們將兩個partition的偏移量(offset)都設置為0.

 

 

 第二步:
Kafka comsumer(消費者)開始從 partition 0 讀取消息。消息“A”正在被處理,第一個 consumer 的 offset 變成了1。

 

 

 第三步:
消息“A”到達了 Flink Map Task。兩個 consumer 都開始讀取他們下一條消息(partition 0 讀取“B”,partition 1 讀取“A”)。各自將 offset 更新成 2 和 1 。同時,Flink 的 JobMaster 開始在 source 觸發了一個檢查點。

 

 

 第四步:
接下來,由於 source 觸發了檢查點,Kafka consumer 創建了它們狀態的第一個快照(”offset = 2, 1”),並將快照存到了 Flink 的 JobMaster 中。Source 在消息“B”和“A”從partition 0 和 1 發出后,發了一個 checkpoint barrier。Checkopint barrier 用於各個 operator task 之間對齊檢查點,保證了整個檢查點的一致性。消息“A”到達了 Flink Map Task,而上面的 consumer 繼續讀取下一條消息(消息“C”)。

 

 

 第五步:
Flink Map Task 收齊了同一版本的全部 checkpoint barrier 后,那么就會將它自己的狀態也存儲到 JobMaster。同時,consumer 會繼續從 Kafka 讀取消息。

 

 

 第六步:
Flink Map Task 完成了它自己狀態的快照流程后,會向 Flink JobMaster 匯報它已經完成了這個 checkpoint。當所有的 task 都報告完成了它們的狀態 checkpoint 后,JobMaster 就會將這個 checkpoint 標記為成功。從此刻開始,這個 checkpoint 就可以用於故障恢復了。值得一提的是,Flink 並不依賴 Kafka offset 從系統故障中恢復。

 

 

 故障恢復
在發生故障時(比如,某個 worker 掛了),所有的 operator task 會被重啟,而他們的狀態會被重置到最近一次成功的 checkpoint。Kafka source 分別從 offset 2 和 1 重新開始讀取消息(因為這是完成的 checkpoint 中存的 offset)。當作業重啟后,我們可以期待正常的系統操作,就好像之前沒有發生故障一樣。如下圖所示:

 

 

Flink的checkpoint是基於Chandy-Lamport算法的分布式一致性快照,如果想更加深入的了解Flink的checkpoint可以去了解一下這個算法.

原文地址:https://www.da-platform.com/blog/how-apache-flink-manages-kafka-consumer-offsets


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM