簡介:
SparkStreaming是一套框架。
SparkStreaming是Spark核心API的一個擴展,可以實現高吞吐量的,具備容錯機制的實時流數據處理。
支持多種數據源獲取數據:
Spark Streaming接收Kafka、Flume、HDFS等各種來源的實時輸入數據,進行處理后,處理結構保存在HDFS、DataBase等各種地方。
Dashboards:圖形監控界面,Spark Streaming可以輸出到前端的監控頁面上。
*使用的最多的是kafka+Spark Streaming
Spark Streaming和SparkCore的關系:
Spark處理的是批量的數據(離線數據),Spark Streaming實際上處理並不是像Strom一樣來一條處理一條數據,而是對接的外部數據流之后按照時間切分,批處理一個個切分后的文件,和Spark處理邏輯是相同的。
Spark Streaming將接收到的實時流數據,按照一定時間間隔,對數據進行拆分,交給Spark Engine引擎,最終得到一批批的結果。
Dstream:Spark Streaming提供了表示連續數據流的、高度抽象的被稱為離散流的DStream
假如外部數據不斷涌入,按照一分鍾切片,每個一分鍾內部的數據是連續的(連續數據流),而一分鍾與一分鍾的切片卻是相互獨立的(離散流)。
DStream是Spark Streaming特有的數據類型。
Dstream可以看做一組RDDs,即RDD的一個序列:
Spark的RDD可以理解為空間維度,Dstream的RDD理解為在空間維度上又加了個時間維度。
例如上圖,數據流進切分為四個分片,內部處理邏輯都是相同的,只是時間維度不同。
Spark與Spark Streaming區別:
Spark -> RDD:transformation action + RDD DAG
Spark Streaming -> Dstream:transformation output(它不能讓數據在中間激活,必須保證數據有輸入有輸出) + DStreamGraph
任何對DStream的操作都會轉變為對底層RDD的操作(通過算子):
總結:將連續的數據持久化,離散化,然后進行批量處理。
持久化:接收到的數據暫存。
為什么持久化:
做容錯的,當數據流出錯了,因為沒有得到計算,需要把數據從源頭進行回溯,暫存的數據可以進行恢復。
離散化:按時間分片,形成處理單元。
分片處理:分批處理。
transformation 轉換算子:
reduce,count算子不會直接觸發Dstreami計算。
output 執行算子(輸出算子):
· saveAsObjectFile、saveAsTextFile、saveAsHadoopFiles:將一批數據輸出到Hadoop文件系統中,用批量數據的開始時間
戳來命名
· forEachRDD:允許用戶對DStream的每一批量數據對應的RDD本身做任意操作
Dstream Graph:
一系列transformation操作的抽象
Dstream之間的轉換所形成的的依賴關系全部保存在DStreamGraph中, DStreamGraph對於
后期生成RDD Graph至關重要
SparkStreaming是一套框架,實際是寫代碼就是寫框架。
框架:先把整個數據計算的流程做一個統一的分析,直到output。
傳統Spark開發中涉及到的RDD具有數據不變性,但是SparkStreaming卻與其相違背。
所以有了Dstream和Dstream Graph。
框架變成任務執行,其實執行的是spark job,而spark任務只認識RDD。
所以可以把Dstream當成RDD的一個模板,DStream Graph當成RDD DAG的一個模板。
所以寫代碼就是寫Dstream和DStream Graph模板。
SparkStreaming架構:
Master:記錄Dstream之間的依賴關系或者血緣關系,並負責任務調度以生成新的RDD
Worker:從網絡接收數據,存儲並執行RDD計算
Client:負責向Spark Streaming中灌入數據
調度:按照時間觸發。
Master:維護了DStream Graph這張圖。(不是節點級別的,是任務級別的)
Worker:按照圖去執行。
Worker里面有個重要的角色:receiver,接收外部數據流,然后數據流通過receiver傳入整個Spark Streaming 內部(receiver最終把數據流包裝成Spark Streaming能處理的格式)
receiver:接收器,接收不同的數據源,進行針對性的獲取,Spark Streaming 也提供了不同的接收器分布在不同的節點上,每個接收器都是一個特定的進程,每個節點接收一部分作為輸入。,receiver接受完不馬上做計算,先存儲到它的內部緩存區。因為Streaming 是按照時間不斷的分片,所以需要等待,一旦定時器到時間了,緩沖區就會把數據轉換成數據塊block(緩沖區的作用:按照用戶定義的時間間隔切割),然后把數據塊放到一個隊列里面去,然后Block manager從隊列中把數據塊拿出來,把數據塊轉換成一個spark能處理的數據塊。
為什么是一個進程?
container -> Executor 所以是一個進程
Spark Streaming 作業提交:
• Network Input Tracker:跟蹤每一個網絡received數據,並且將其映射到相應的input Dstream上
• Job Scheduler:周期性的訪問DStream Graph並生成Spark Job,將其交給Job Manager執行
• Job Manager:獲取任務隊列,並執行Spark任務
Spark Streaming 窗口操作:
• Spark提供了一組窗口操作,通過滑動窗口技術對大規模數據的增量更新進行統計分析
• Window Operation:定時進行一定時間段內的數據處理
• 任何基於窗口操作需要指定兩個參數:
– 窗口總長度(window length):你想計算多長時間的數據
– 滑動時間間隔(slide interval):你每多長時間去更新一次
Spark Streaming的容錯:
•實時的流式處理系統必須是7*24運行的,同時可以從各種各樣的系統錯誤中恢復,在設計之初,Spark Streaing就支持driver和worker節點的錯誤恢復(Spark Streaing只有兩個節點:driver->AM,worker->NM)
• Worker容錯:spark和rdd的保證worker節點的容錯性。spark streaming構建在spark之上,
所以它的worker節點也是同樣的容錯機制
• Driver容錯:依賴WAL持久化日志 --------------------------------------------------- Hbase也有WAL
– 啟動WAL需要做如下的配置
– 1:給streamingContext設置checkpoint的目錄,該目錄必須是HADOOP支持的文件系統,用來保存WAL和做
Streaming的checkpoint
– 2:spark.streaming.receiver.writeAheadLog.enable 設置為true
引入了WAL:保證了任何可靠的數據源接收到的數據在失敗中都不會丟失。
例如:接收的數據源不支持事物,那么依靠數據源重新發送數據不可靠,所以WAL能盡量避免丟失。
Spark Streaming中 WAL 工作原理:
(藍色的線是數據)input stream進來的時候是連續的,receiver接收數據,然后切分成block,左邊的寫到了自己的內存空間,右邊的寫到了log(WAL)里面,(綠色的線是元數據信息,不是真正的數據)StreamingContext把數據對接過來(原信息包含了內存中block的id號,日志中文件里block偏移量,偏移信息等),右邊綠色也是把元數據信息寫入log,紅色表明StreamingContext轉換成SparkContext
黃色的線:例如 wordcount
Streaming對10分鍾一個的處理窗口 -> wordcount
0-10min ------>wordcount1
10-20min------>wordcount2
.........
單看wordcount1和wordcount2是沒有關系的,但是假設我們的目的是統計一天的wordcount,如果這么看,那么就需要把時間窗口改為1天,那就完全可以用mapreduce和spark去做了,失去了Spark Streaming的意義。
所以Spark Streaming是帶狀態的流計算。
帶狀態:假設要做0-20min的wordcount,是把wordcount1的值直接拿過來使用的。(把前面的結果拿過來)
所以黃色線所連的log為wordcount1,wordcount2.....
實時流計算:主要就是為了低延時。
• 當一個Driver失敗重啟后,恢復流程:
一旦driver失敗重啟了,首先黃色恢復計算,point信息用來重啟driver的,構造上下文,重啟receiver(因為receiver是由driver來啟動的,driver掛了,沒人管receiver了),然后receiver從外部的WAL讀取進來,然后再重啟所有的receiver,依賴綠色的元數據信息,沒有計算完的任務,重新生成啟動,正在數據從藍色讀取到內存中。紫色是外部組件做一些確認的操作(數據有沒有傳輸過來)