1、背景
- Flink:1.4.0+
- Kakfa:0.11+
使用場景:flink的source和sink都是kafka,這里的source和sink不限於kafka,可以使用任何一種提供了類似協調機制(2PC)的sink/source。
關鍵點:
- Kafka source支持重新消費,手動commit
- Kafka sink支持2PC(two-phase commit protocol)
- Flink的checkpoint機制
2、End-to-end Exactly Once Applications with Apache Flink
kafka從0.11開始支持事務(exactly-once語義),這為實現端到端的精確一致性語義提供了支持,結合flink,介紹下如何實現End-to-end Exactly Once的應用:
一個典型例子:
- data source從kafka消費數據
- window聚合
- data sink將處理后的數據寫入到kafka
data sink為了提供exactly-once保證,必須將一個事務中的數據都寫入到kafka,一次commit包含了2個checkpoint之間的所有的寫操作,這保證了當失敗時,也會回滾所有的寫操作。
第一步:pre-commit階段。
pre-commit是一次checkpoint的開始,flink的checkpoint barrier在operator中傳遞,當一個operator接收到barrier,觸發state snapshot。
比如Kafka source會保存消費的offset,完成后傳遞barrier。
這個過程如果僅僅只涉及internal state(internal state是由flink保存和管理的),是沒有問題的,但是如果涉及到external state,則需要外部系統提供一致性保證,外部系統必須要提供對2PC的事務支持。
當所有的operator完成了checkpoint,Pre-commit階段就算完成了。Checkpoint的snapshot包含了整個application的狀態,包括外部系統的pre-commited的external state,如果發生失敗,可以回滾到最近一次成功的snapshot。
第二步:JobManager通知所有的operator,checkpoint完成了,執行commit階段。
例子中的data source和window operator沒有external state,在commit執行階段無需額外的操作。data sink有external state,需要commit這次事務。
整個流程如下:
- 當所有的operator完成了pre-commit(checkpoint snapshot),開啟一個commit。
- 如果有一個pre-commit失敗了,其他都abort,回滾到最近一次成功的checkpoint。
- Pre-commit成功后,所有的operator和外部系統必須保證commit執行成功,如果有失敗(如網絡中斷),則整個flink application fail,flink任務按重啟策略重啟,開始一次新的commit嘗試。
3、Implementing the Two-Phase Commit Operator in Flink
基於Flink的TwoPhaseCommitSinkFunction,實現一個2PC的sink,這個sink的實現機制依賴外部系統對2PC的支持:
- beginTransaction:開始事務,做一些准備工作。
- preCommit:預提交階段,如臨時保存等。
- commit:提交操作。
- abort:回滾,撤回操作。
如果pre-commit成功了但是commit沒有到達operator失敗就發生了,flink會將operator恢復到pre-commit時的狀態,然后繼續commit,我們需要提供足夠的信息讓flink重啟后決定是commit還是abort。
原文:https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka