Spark streaming 和kafka 處理確保消息不丟失的總結
接入kafka
我們前面的1到4 都在說 spark streaming 接入 kafka 消息的事情。講了兩種接入方式,以及spark streaming 如何和kafka協作接收數據,處理數據生成rdd的
主要有如下兩種方式
基於分布式receiver
基於receiver的方法采用Kafka的高級消費者API,每個executor進程都不斷拉取消息,並同時保存在executor內存與HDFS上的預寫日志(write-ahead log/WAL)。當消息寫入WAL后,自動更新ZooKeeper中的offset。
它可以保證at least once語義,但無法保證exactly once語義。原因是雖然引入了WAL來確保消息不會丟失,但有可能會出現消息已寫入WAL,但更新comsuer 的offset到zk時失敗的情況,此時consumer就會按上一次的offset重新發送消息到kafka重新獲取一次已保存到WAL的數據。這種方式還會造成數據冗余(WAL中一份,blockmanager中一份,其中blockmanager可能會做StorageLevel.MEMORY_AND_DISK_SER_2,即內存中一份,磁盤上兩份),大大降低了吞吐量和內存磁盤的利用率。現在基本都使用下面基於direct stream的方法了。
基於direct stream的方法
基於direct stream的方法采用Kafka的簡單消費者API,大大簡化了獲取message 的流程。executor不再從Kafka中連續讀取消息,也消除了receiver和WAL。還有一個改進就是Kafka分區與RDD分區是一一對應的,允許用戶控制topic-partition 的offset,程序變得更加可控。
driver進程只需要每次從Kafka獲得批次消息的offset range,然后executor進程根據offset range去讀取該批次對應的消息即可。由於offset在Kafka中能唯一確定一條消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,保證了exactly once語義。不過,由於它采用了簡單消費者API,我們就需要自己來管理offset。否則一旦程序崩潰,整個流只能從earliest或者latest點恢復,這肯定是不穩妥的。
如何保證處理結果不丟失呢?
主要有兩種方案:
2.1. 主要是 通過設計冪等性操作,在 at least once 的語義之上,確保數據不丟失
2.2. 在一些shuffle或者是集合計算的結果集中, 在 exactly-once 的基礎上,同時更新 處理結果和 offset,這種情況下,一般都是使用事務來做。
現有的支持事務的,也就是傳統的數據庫了,對於一些緩存系統為了更簡單更高效的訪問,即使有事務機制,也設計的非常簡單,或是只實現了部分功能,例如 redis 的事務是不能支持回滾的。需要我們在代碼中做相應的設計,來確保事務的正確執行。
分布式 RDD 計算過程如何確保准確性和一致性?
即分布式RDD計算是如何和確保計算恰好計算一次的呢?后續會出一系列源碼分析,分析 spark 是如何做分布式計算的。