容錯(Fault-tolerance)


Spark Streaming的容錯包括了三個地方的容錯:
1、Executor失敗容錯:Executor的失敗會重新啟動一個新的Executor,這個是Spark自身的特性。如果Receiver所在的Executor失敗了,那么Spark Streaming會在另外一個Executor上啟動這個Receiver(這個Executor上可能存在已經接收到的數據的備份)
2、Driver失敗的容錯:如果Driver失敗的話,那么整個Spark Streaming應用將會全部掛掉。所以Driver端的容錯是非常重要的,我們首先可以配置Driver端的checkpoint,用於定期的保存Driver端的狀態;然后我們可以配置Driver端失敗的自動重啟機制(每一種集群管理的配置都不一樣);最后我們需要打開Executor端的WAL機制
3、一個Task失敗的容錯:Spark中的某個Task失敗了可以重新運行,這個Task所在的Stage失敗的話呢,也可以根據RDD的依賴重新跑這個Stage的父親Stage,進而重新跑這個失敗的Stage,在實時計算的過程,肯定不能容忍某個Task的運行時間過長,Spark Streaming對於某個運行時間過長的Task會將這個Task殺掉重新在另一個資源比較充足的Executor上執行。這個就是利用了Spark的Task調度的推測機制。

Executor失敗容錯

 

 

 

 Driver失敗容錯

 

 

 

 

checkpoint機制:定期將Driver端的信息寫到HDFS中
1、configuration (配置信息)
2、定義的DStream的操作
3、沒有完成的batches的信息
 
1、設置自動重啟Driver程序
standalone、yarn以及mesos都支持
 
2、設置hdfs的checkpoint目錄
streamingContext.setCheckpoint(hdfsDirectory)
3、在driver端使用正確的API來達到Driver的容錯,需要寫代碼

 

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * WordCount程序,Spark Streaming消費TCP Server發過來的實時數據的例子:
  *
  * 1、在master服務器上啟動一個Netcat server
  * `$ nc -lk 9998` (如果nc命令無效的話,我們可以用yum install -y nc來安裝nc)
  *
  * 2、用下面的命令在在集群中將Spark Streaming應用跑起來
  * spark-submit --class com.twq.wordcount.JavaNetworkWordCount \
  * --master spark://master:7077 \
  * --deploy-mode cluster \
  * --driver-memory 512m \
  * --executor-memory 512m \
  * --total-executor-cores 4 \
  * --executor-cores 2 \
  * /home/hadoop-twq/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar
  */
object NetworkWordCount {
  def main(args: Array[String]) {

    val checkpointDirectory = "hdfs://master:9999/user/hadoop-twq/spark-course/streaming/chechpoint"

    def functionToCreateContext(): StreamingContext = {
      val sparkConf = new SparkConf()
      .setAppName("NetworkWordCount")
      val sc = new SparkContext(sparkConf)

      // Create the context with a 1 second batch size
      val ssc = new StreamingContext(sc, Seconds(1))    

      //創建一個接收器(ReceiverInputDStream),這個接收器接收一台機器上的某個端口通過socket發送過來的數據並處理
      val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER_2)//  提高數據塊的高可用性,備份兩份,但會占用一定的內存

      //處理的邏輯,就是簡單的進行word count
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

      //將結果輸出到控制台
      wordCounts.print()
      ssc.checkpoint(checkpointDirectory)
      ssc
    }
    //  代碼
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

    //啟動Streaming處理流
    ssc.start()

    //等待Streaming程序終止
    ssc.awaitTermination()
  }
}

  

設置自動重啟Driver程序
standalone :
在spark-submit中增加以下兩個參數:
--deploy-mode cluster
--supervise
 
 
yarn :
在spark-submit中增加以下一個參數:
--deploy-mode cluster
在yarn配置中設置yarn.resourcemanager.am.max-attemps
 
 
mesos :
Marathon 可以重啟 Mesos應用

 

接收到的數據丟失的容錯

 

 

checkpoint機制:定期將Driver端的DStream DAG信息寫到HDFS中(寫內存和寫磁盤同時進行)

 

 

利用WAL恢復數據的配置
1、設置hdfs的checkpoint目錄
streamingContext.setCheckpoint(hdfsDirectory)
2、打開WAL的配置
sparkConf.set(“spark.streaming.receiver.writeAheadLog.enable”, “true”)
3、Receiver應該是reliable的
當數據寫完了WAL后,才告訴數據源數據已經消費
對於沒有告訴數據源的數據,可以從數據源中重新消費數據
4、取消掉in-memory數據備份
使用StorageLevel.MEMORY_AND_DISK_SER來存儲數據源,已經寫入磁盤,沒必要備份到其他executor上內存中,進而節省空間

 

 

 

接收到的數據不管是備份到其他 Executor還是保存到HDFS上,都會給數據源發送回執,假設沒有發送回執,重新消費沒有發送回執的數據,進而保證數據不會丟失,eg: Kafka
Reliable Receiver :
當數據接收到,並且已經備份存儲后,再發送回執給數據源
Unreliable Receiver :
不發送回執給數據源

當一個task很慢的容錯

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM