Spark Streaming的容錯包括了三個地方的容錯:
1、Executor失敗容錯:Executor的失敗會重新啟動一個新的Executor,這個是Spark自身的特性。如果Receiver所在的Executor失敗了,那么Spark Streaming會在另外一個Executor上啟動這個Receiver(這個Executor上可能存在已經接收到的數據的備份)
2、Driver失敗的容錯:如果Driver失敗的話,那么整個Spark Streaming應用將會全部掛掉。所以Driver端的容錯是非常重要的,我們首先可以配置Driver端的checkpoint,用於定期的保存Driver端的狀態;然后我們可以配置Driver端失敗的自動重啟機制(每一種集群管理的配置都不一樣);最后我們需要打開Executor端的WAL機制
3、一個Task失敗的容錯:Spark中的某個Task失敗了可以重新運行,這個Task所在的Stage失敗的話呢,也可以根據RDD的依賴重新跑這個Stage的父親Stage,進而重新跑這個失敗的Stage,在實時計算的過程,肯定不能容忍某個Task的運行時間過長,Spark Streaming對於某個運行時間過長的Task會將這個Task殺掉重新在另一個資源比較充足的Executor上執行。這個就是利用了Spark的Task調度的推測機制。
Executor失敗容錯


Driver失敗容錯


checkpoint機制:定期將Driver端的信息寫到HDFS中
1、configuration (配置信息)
2、定義的DStream的操作
3、沒有完成的batches的信息
1、設置自動重啟Driver程序
standalone、yarn以及mesos都支持
2、設置hdfs的checkpoint目錄
streamingContext.setCheckpoint(hdfsDirectory)
3、在driver端使用正確的API來達到Driver的容錯,需要寫代碼
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* WordCount程序,Spark Streaming消費TCP Server發過來的實時數據的例子:
*
* 1、在master服務器上啟動一個Netcat server
* `$ nc -lk 9998` (如果nc命令無效的話,我們可以用yum install -y nc來安裝nc)
*
* 2、用下面的命令在在集群中將Spark Streaming應用跑起來
* spark-submit --class com.twq.wordcount.JavaNetworkWordCount \
* --master spark://master:7077 \
* --deploy-mode cluster \
* --driver-memory 512m \
* --executor-memory 512m \
* --total-executor-cores 4 \
* --executor-cores 2 \
* /home/hadoop-twq/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar
*/
object NetworkWordCount {
def main(args: Array[String]) {
val checkpointDirectory = "hdfs://master:9999/user/hadoop-twq/spark-course/streaming/chechpoint"
def functionToCreateContext(): StreamingContext = {
val sparkConf = new SparkConf()
.setAppName("NetworkWordCount")
val sc = new SparkContext(sparkConf)
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sc, Seconds(1))
//創建一個接收器(ReceiverInputDStream),這個接收器接收一台機器上的某個端口通過socket發送過來的數據並處理
val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER_2)// 提高數據塊的高可用性,備份兩份,但會占用一定的內存
//處理的邏輯,就是簡單的進行word count
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
//將結果輸出到控制台
wordCounts.print()
ssc.checkpoint(checkpointDirectory)
ssc
}
// 代碼
val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
//啟動Streaming處理流
ssc.start()
//等待Streaming程序終止
ssc.awaitTermination()
}
}
設置自動重啟Driver程序
standalone :
在spark-submit中增加以下兩個參數:
--deploy-mode cluster
--supervise
yarn :
在spark-submit中增加以下一個參數:
--deploy-mode cluster
在yarn配置中設置yarn.resourcemanager.am.max-attemps
mesos :
Marathon 可以重啟 Mesos應用
接收到的數據丟失的容錯

checkpoint機制:定期將Driver端的DStream DAG信息寫到HDFS中(寫內存和寫磁盤同時進行)

利用WAL恢復數據的配置
1、設置hdfs的checkpoint目錄
streamingContext.setCheckpoint(hdfsDirectory)
2、打開WAL的配置
sparkConf.set(“spark.streaming.receiver.writeAheadLog.enable”, “true”)
3、Receiver應該是reliable的
當數據寫完了WAL后,才告訴數據源數據已經消費
對於沒有告訴數據源的數據,可以從數據源中重新消費數據
4、取消掉in-memory數據備份
使用StorageLevel.MEMORY_AND_DISK_SER來存儲數據源,已經寫入磁盤,沒必要備份到其他executor上內存中,進而節省空間

接收到的數據不管是備份到其他 Executor還是保存到HDFS上,都會給數據源發送回執,假設沒有發送回執,重新消費沒有發送回執的數據,進而保證數據不會丟失,eg: Kafka
Reliable Receiver :
當數據接收到,並且已經備份存儲后,再發送回執給數據源
Unreliable Receiver :
不發送回執給數據源
當一個task很慢的容錯


