當使用sparkstreaming處理流式數據的時候,它的數據源搭檔大部分都是Kafka,尤其是在互聯網公司頗為常見。
當他們集成的時候我們需要重點考慮就是如果程序發生故障,或者升級重啟,或者集群宕機,它究竟能否做到數據不丟不重呢?
也就是通常我們所說的高可靠和穩定性,通常框架里面都帶有不同層次的消息保證機制,一般來說有三種就是:
- at most once 最多一次
- at least once 最少一次
- exactly once 准確一次
在storm里面是通過ack和Trident,在sparkstreaming里面,如果是1.3版本之前是通過Receiver方式讀取kafka數據,1.3之后通過Direct Approach方式直接讀取kafka的數據,直接分配每個Batch及RDD最新的Topic partition offset,任務運行后使用kafka的Simple Consumer API去獲取那一段的offset的數據,這樣的好處是避免了原來Receiver接受數據宕機帶來的數據可靠性風險,相當於原來的數據是在內存中而現在的數據是在kafka的磁盤中,通過偏移量可隨時再次消費數據,從而實現了數據的Exactly Once處理,此外還有個不同之處在於1.3之后,使用的checkpoint保存當前消費的kafka的offset,而之前用zk保存的,這就是今天這篇文章重點吐槽的地方。
在sparkstreaming如何做到數據不丟失呢?
(1)使用checkpoint
(2)自己維護kafka偏移量
checkpoint配合kafka能夠在特定環境下保證不丟不重,注意為什么要加上特定環境呢,這里有一些坑,checkpoint是對sparkstreaming運行過程中的元數據和
每次rdds的數據狀態保存到一個持久化系統中,當然這里面也包含了offset,一般是HDFS,S3,如果程序掛了,或者集群掛了,下次啟動仍然能夠從checkpoint中恢復,從而做到生產環境的7*24高可用。
但是checkpoint的最大的弊端在於,一旦你的流式程序代碼或配置改變了,或者更新迭代新功能了,這個時候,你先停舊的sparkstreaming程序,然后新的程序打包編譯后執行運行,會發現兩種情況:
(1)啟動報錯,反序列化異常
(2)啟動正常,但是運行的代碼仍然是上一次的程序的代碼。
為什么會出現上面的兩種情況,這是因為checkpoint第一次持久化的時候會把整個相關的jar給序列化成一個二進制文件,每次重啟都會從里面恢復,但是當你新的
程序打包之后序列化加載的仍然是舊的序列化文件,這就會導致報錯或者依舊執行舊代碼。有的同學可能會說,既然如此,直接把上次的checkpoint刪除了,不就能啟動了嗎? 確實是能啟動,但是一旦你刪除了舊的checkpoint,新啟動的程序,只能從kafka的smallest或者largest的偏移量消費,默認是從最新的,如果是最新的,而不是上一次程序停止的那個偏移量
就會導致有數據丟失,如果是老的,那么就會導致數據重復。不管怎么樣搞,都有問題。
https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#upgrading-application-code
針對這種問題,spark官網給出了2種解決辦法:
(1)舊的不停機,新的程序繼續啟動,兩個程序並存一段時間消費。 評價:仍然有丟重復消費的可能
(2)停機的時候,記錄下最后一次的偏移量,然后新恢復的程序讀取這個偏移量繼續工作,從而達到不丟消息。 評價:官網沒有給出具體怎么操作,只是給了個思路
第二種思路是正確的,但還需要自己維護一個offset狀態,這樣以來checkpoint這個功能只能在程序寫好之后不允許再次變動,但可以重啟的情況保證高可靠。
但實際情況是大多數公司的代碼都會頻繁迭代和升級,與checkpoint剛好相悖,這樣以來checkpoint的作用便顯的有點沒用了,既然還是需要自己維護offset狀態,
那么不用checkpoint也罷,完全自己維護offset狀態到zk中即可。所以果斷棄用checkpoint,采用自己維護offset。其原理如下:
首次啟動,先從zk中找是否有上次存儲的偏移量,如果沒有就從最新的消費,然后保存偏移量至zk中
如果從zk中找到了偏移量,那么就從指定的偏移量處開始消費處理,每個批處理處理完畢后,都會更新新的offset到zk中,
這樣以來無論是程序故障,還是宕機,再次啟動后都會從上次的消費的偏移量處繼續開始消費,而且程序的升級或功能改動新版本的發布都能正常運行
並做到了消息不丟。
需要注意的是,雖然上游能夠做到准確一次的消費,但是下游的落地存儲輸出,比如寫入hbase,redis,mysql,es等等如果失敗了,整條消息依舊會失敗,這個完全要靠自己的設計了,要么記錄log,針對特定數據記錄,如果失敗定期 重新打入kafka走程序恢復或者手動恢復。
或者設計存儲的時候,有復合主鍵,把偏移量提前,就算重復消費,但主鍵一樣,最終只會有一條數據落地,這個要分場景和具體業務結合使用了。
回到主題,自己維護kafka的offset狀態,如何做? github上已經有大神貢獻了,我們只需要拿過來稍加改動即可,使用自己維護的offset之后,就沒有必要再使用
checkpoint,github連接如下,有興趣的朋友可以了解下:
https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala
使用zk維護offset也是比較不錯的選擇,如果將checkpoint存儲在HDFS上,每隔幾秒都會向HDFS上進行一次寫入操作而且大部分都是小文件,且不說寫入性能怎么樣,就小文件過多,對整個Hadoop集群都不太友好。因為只記錄偏移量信息,所以數據量非常小,zk作為一個分布式高可靠的的內存文件系統,非常適合這種場景。
所有參考鏈接:
http://aseigneurin.github.io/
http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
http://why-not-learn-something.blogspot.jp/2016/08/upgrading-running-spark-streaming.html
http://www.binwang.me/2015-11-03-the-proper-way-to-use-spark-checkpoint.html
https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala
https://github.com/ippontech/spark-kafka-source
參考
http://blog.csdn.net/lovehuangjiaju/article/details/50102831
http://blog.csdn.net/englishsname/article/details/50791347
http://blog.csdn.net/dengxing1234/article/details/73613484
https://www.cnblogs.com/gaoxing/p/4847119.html