Spark Streaming和Kafka整合保證數據零丟失


當我們正確地部署好Spark Streaming,我們就可以使用Spark Streaming提供的零數據丟失機制。為了體驗這個關鍵的特性,你需要滿足以下幾個先決條件:

  1、輸入的數據來自可靠的數據源和可靠的接收器;
  2、應用程序的metadata被application的driver持久化了(checkpointed );
  3、啟用了WAL特性(Write ahead log)。

  下面我將簡單地介紹這些先決條件。

可靠的數據源和可靠的接收器

對於一些輸入數據源(比如Kafka),Spark Streaming可以對已經接收的數據進行確認。輸入的數據首先被接收器(receivers )所接收,然后存儲到Spark中(默認情況下,數據保存到2個執行器中以便進行容錯)。數據一旦存儲到Spark中,接收器可以對它進行確認(比如,如果消費Kafka里面的數據時可以更新Zookeeper里面的偏移量)。這種機制保證了在接收器突然掛掉的情況下也不會丟失數據:因為數據雖然被接收,但是沒有被持久化的情況下是不會發送確認消息的。所以在接收器恢復的時候,數據可以被原端重新發送。

元數據持久化(Metadata checkpointing)

可靠的數據源和接收器可以讓我們從接收器掛掉的情況下恢復(或者是接收器運行的Exectuor和服務器掛掉都可以)。但是更棘手的問題是,如果Driver掛掉如何恢復?對此開發者們引入了很多技術來讓Driver從失敗中恢復。其中一個就是對應用程序的元數據進行Checkpint。利用這個特性,Driver可以將應用程序的重要元數據持久化到可靠的存儲中,比如HDFS、S3;然后Driver可以利用這些持久化的數據進行恢復。元數據包括:
  1、配置;
  2、代碼;
  3、那些在隊列中還沒有處理的batch(僅僅保存元數據,而不是這些batch中的數據)
由於有了元數據的Checkpint,所以Driver可以利用他們重構應用程序,而且可以計算出Driver掛掉的時候應用程序執行到什么位置。

可能存在數據丟失的場景

令人驚訝的是,即使是可靠的數據源、可靠的接收器和對元數據進行Checkpint,仍然不足以阻止潛在的數據丟失。我們可以想象出以下的糟糕場景:

  1、兩個Exectuor已經從接收器中接收到輸入數據,並將它緩存到Exectuor的內存中;
  2、接收器通知輸入源數據已經接收;
  3、Exectuor根據應用程序的代碼開始處理已經緩存的數據;
  4、這時候Driver突然掛掉了;
  5、從設計的角度看,一旦Driver掛掉之后,它維護的Exectuor也將全部被kill;
  6、既然所有的Exectuor被kill了,所以緩存到它們內存中的數據也將被丟失。結果,這些已經通知數據源但是還沒有處理的緩存數據就丟失了;
  7、緩存的時候不可能恢復,因為它們是緩存在Exectuor的內存中,所以數據被丟失了。

這對於很多關鍵型的應用程序來說非常的糟糕。

WAL(Write ahead log)

為了解決上面提到的糟糕場景,Spark Streaming 1.2開始引入了WAL機制。

  啟用了WAL機制,所以已經接收的數據被接收器寫入到容錯存儲中,比如HDFS或者S3。由於采用了WAl機制,Driver可以從失敗的點重新讀取數據,即使Exectuor中內存的數據已經丟失了。在這個簡單的方法下,Spark Streaming提供了一種即使是Driver掛掉也可以避免數據丟失的機制。

At-least-once語義

雖然WAL可以確保數據不丟失,它並不能對所有的數據源保證exactly-once語義。想象一下可能發生在Spark Streaming整合Kafka的糟糕場景。

  1、接收器接收到輸入數據,並把它存儲到WAL中;
  2、接收器在更新Zookeeper中Kafka的偏移量之前突然掛掉了;
        3、Spark Streaming假設輸入數據已成功收到(因為它已經寫入到WAL中),然而Kafka認為數據被沒有被消費,因為相應的偏移量並沒有在Zookeeper中更新;
  4、過了一會,接收器從失敗中恢復;
  5、那些被保存到WAL中但未被處理的數據被重新讀取;
  6、一旦從WAL中讀取所有的數據之后,接收器開始從Kafka中消費數據。因為接收器是采用Kafka的High-Level Consumer API實現的,它開始從Zookeeper當前記錄的偏移量開始讀取數據,但是因為接收器掛掉的時候偏移量並沒有更新到Zookeeper中,所有有一些數據被處理了2次。

WAL的缺點

除了上面描述的場景,WAL還有其他兩個不可忽略的缺點:

  1、WAL減少了接收器的吞吐量,因為接受到的數據必須保存到可靠的分布式文件系統中。
  2、對於一些輸入源來說,它會重復相同的數據。比如當從Kafka中讀取數據,你需要在Kafka的brokers中保存一份數據,而且你還得在Spark Streaming中保存一份。

Kafka direct API

為了解決由WAL引入的性能損失,並且保證 exactly-once 語義,Spark Streaming 1.3中引入了名為Kafka direct API。
  這個想法對於這個特性是非常明智的。Spark driver只需要簡單地計算下一個batch需要處理Kafka中偏移量的范圍,然后命令Spark Exectuor直接從Kafka相應Topic的分區中消費數據。換句話說,這種方法把Kafka當作成一個文件系統,然后像讀文件一樣來消費Topic中的數據。
在這個簡單但強大的設計中:
  1、不再需要Kafka接收器,Exectuor直接采用Simple Consumer API從Kafka中消費數據。
  2、不再需要WAL機制,我們仍然可以從失敗恢復之后從Kafka中重新消費數據;
  3、exactly-once語義得以保存,我們不再從WAL中讀取重復的數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM