SparkStreaming工作原理


一:SparkCore,SparkSQL和SparkStreaming的類似之處

(一)SparkCore

Spark Core主要是作為離線批處理(Batch Processing),每次處理的數據都是一個固定的數據集,而不是變化的

相關概念:

RDD:彈性分布式數據集
Spark Context:Spark的上下文,它負責與程序和spark集群進行交互,包括申請集群資源、創建RDD、accumulators及廣播變量等。

(二)Spark SQL

Spark SQL用於交互式處理(interactive Processing),同樣的,每次處理的數據都是一個固定的數據集,而不是變化的

相關概念:

DataFrame=RDD+Schema
DataFrame:相當於一個Row類型的DataSet,在Spark 2.x之后推薦使用DataSet
SQLContext:SQL的上下文

(三)Spark Streaming

Spark Streaming是一個流式數據處理(Stream Processing)的框架,要處理的數據就像流水一樣源源不斷的產生,就需要實時處理

在Spark Streaming中,對於Spark Core進行了API的封裝和擴展,將流式的數據切分為小批次(batch,稱之為微批,按照時間間隔切分)進行處理,可以用於進行大規模、高吞吐量、容錯的實時數據流的處理。---同Storm相比:Storm是來一條數據處理一條數據,是真正意義上的實時處理
支持從很多種數據源中讀取數據,使用算子來進行數據處理,處理后的數據可以被保存到文件系統、數據庫等存儲中

相關概念:

DStream:離散流,相當於是一個數據的集合
StreamingContext:在創建StreamingContext的時候,會自動的創建SparkContext對象

對於電商來說,每時每刻都會產生數據(如訂單,網頁的瀏覽數據等),這些數據就需要實時的數據處理

將源源不斷產生的數據實時收集並實時計算,盡可能快的得到計算結果並展示。 

二:Spark Streaming組成部分

(一)數據源

大多情況從Kafka中獲取數據,還可以從Flume中直接獲取,還能從TCP Socket中獲取數據(一般用於開發測試)

(二)數據處理

主要通過DStream針對不同的業務需求使用不同的方法(算子)對數據進行相關操作。

企業中最多的兩種類型統計:實時累加統計(如統計某電商銷售額)會用到DStream中的算子updateStateBykey、實時統計某段時間內的數據(如對趨勢進行統計分析,實時查看最近20分鍾內各個省份用戶點擊廣告的流量統計)會用到reduceByKeyAndWindow這個算子。

(三)存儲結果

調用RDD中的API將數據進行存儲,因為Spark Streaming是將數據分為微批處理的,所以每一批次就相當於一個RDD。

可以把結果存儲到Console(控制台打印,開發測試)、Redis(基於內存的分布式Key-Value數據庫)、HBase(分布式列式數據庫)、RDBMS(關系型數據庫,如MySQL,通過JDBC)

三:SparkStreaming的運行流程

(一)運行流程圖

(二)運行流程

1、我們在集群中的其中一台機器上提交我們的Application Jar,然后就會產生一個Application,開啟一個Driver,然后初始化SparkStreaming的程序入口StreamingContext;

2、Master(Driver是spark作業的Master)會為這個Application的運行分配資源,在集群中的一台或者多台Worker上面開啟Executer,executer會向Driver注冊;

3、Driver服務器會發送多個receiver給開啟的executer,(receiver是一個接收器,是用來接收消息的,在excuter里面運行的時候,其實就相當於一個task任務)

每個作業包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task。

4、receiver接收到數據后,每隔200ms就生成一個block塊,就是一個rdd的分區,然后這些block塊就存儲在executer里面,block塊的存儲級別是Memory_And_Disk_2;

5、receiver產生了這些block塊后會把這些block塊的信息發送給StreamingContext;

6、StreamingContext接收到這些數據后,會根據一定的規則將這些產生的block塊定義成一個rdd;

四:Spark Streaming工作原理

(一)SparkStreaming工作原理

補充:

BlockInterval:200ms  生成block塊的依據,多久內的數據生成一個block塊,默認值200ms生成一個block塊,官網最小推薦值50ms。

BatchInterval:1s  我們將每秒的數據抽象為一個RDD。那么這個RDD里面包含了多個block(1s則是50個RDD),這些block是分散的存儲在各個節點上的。

Spark Streaming內部的基本工作原理:

接收實時輸入數據流,然后將數據拆分成多個batch,比如每收集1s的數據封裝為一個batch, 然后將每個batch交給Spark的計算引擎進行處理,最后會生產出一個結果數據流,其中的數據,也是一個個的batch所組成的。
其中,一個batchInterval累加讀取到的數據對應一個RDD的數據

(二)SparkStreaming和Storm對比

1.對比優勢

Storm在實時延遲度上,比Spark Streaming就好多了,Storm是純實時,Spark Streaming是准實時;而且Storm的事務機制,健壯性/容錯性、動態調整並行度等特性,都要比Spark Streaming更加優秀。
Spark Streaming的真正優勢(Storm絕對比不上的),是它屬於Spark生態技術棧中,因此Spark Streaming可以和Spark Core、Spark SQL無縫整合,而這也就意味着,我們可以對實時處理出來的中間數據,立即在程序中無縫進行延遲批處理、交互式查詢等操作,這個特點大大增強了Spark Streaming的優勢和功能。

2.應用場景

1、建議在那種需要純實時,不能忍受1s以上延遲的場景下使用,比如金融系統,要求純實時進行金融交易和分析;
2、如果對於實時計算的功能中,要求可靠的事務機制和可靠性機制,即數據的處理完全精准,一條也不能多,一條也不能少,也可以考慮使用Strom;
3、如果需要針對高峰低峰時間段,動態調整實時計算程序的並行度,以最大限度利用集群資源,也可以考慮用Storm;
4、如果一個大數據應用系統,它就是純粹的實時計算,不需要在中間執行SQL交互式查詢、復雜的transformation算子等,那么使用Storm是比較好的選擇
storm
1、如果對上述適用於Storm的三點,一條都不滿足的實時場景,即,不要求純實時,不要求強大可靠的事務機制,不要求動態調整並行度,那么可以考慮使用Spark Streaming;
2、考慮使用Spark Streaming最主要的一個因素,應該是針對整個項目進行宏觀的考慮,即,如果一個項目除了實時計算之外,還包括了離線批處理、交互式查詢等業務功能,而且實時計算中,可能還會牽扯到高延遲批處理、交互式查詢等功能,那么就應該首選Spark生態,用Spark Core開發離線批處理,用Spark SQL開發交互式查詢,用Spark Streaming開發實時計算,三者可以無縫整合,給系統提供非常高的可擴展性。
sparkstreaming

五:DStream離散流

DStream離散流是Spark Streaming提供的一種高級抽象,DStream代表了一個持續不斷的數據流;
DStream可以通過輸入數據源來創建,比如Kafka、Flume,也可以通過對其他DStream應用高階函數來創建,比如map、reduce、join、window;

(一)DStream原理(與四中補充信息有關聯)

DStream的內部,其實是一系列持續不斷產生的RDD,RDD是Spark Core的核心抽象,即,不可變的,分布式的數據集;

DStream中的每個RDD都包含了一個時間段內的數據;
以下圖為例,0-1這段時間的數據累積構成了RDD@time1,1-2這段時間的數據累積構成了RDD@time2...

(二)DStream算子工作

對DStream應用的算子,其實在底層會被翻譯為對DStream中每個RDD的操作。

比如對一個DStream執行一個map操作,會產生一個新的DStream,其底層原理為,對輸入DStream中的每個時間段的RDD,都應用一遍map操作,然后生成的RDD,即作為新的DStream中的那個時間段的一個RDD;
底層的RDD的transformation操作,還是由Spark Core的計算引擎來實現的,Spark Streaming對Spark core進行了一層封裝,隱藏了細節,然后對開發人員提供了方便易用的高層次API。

(三)DStream算子

六:StreamingContext的使用

(一)StreamingContext的創建

1.直接使用sparkconf配置創建

val conf = new SparkConf().setAppName(appName).setMaster("local");
val ssc = new StreamingContext(conf, Seconds(1));

2.使用已創建的sparkcontext創建

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1));  //batch interval可以根據你的應用程序的延遲要求以及可用的集群資源情況來設置

(二)StreamingContext的使用

1.通過創建輸入DStream來創建輸入數據源。
2.通過對DStream定義transformation和output算子操作,來定義實時計算邏輯。
3.調用StreamingContext的start()方法,來開始實時處理數據。
4.調用StreamingContext的awaitTermination()方法,來等待應用程序的終止。可以使用CTRL+C手動停止,或者就是讓它持續不斷的運行進行計算。
5.也可以通過調用StreamingContext的stop()方法,來停止應用程序。

(三)注意事項

1.只要一個StreamingContext啟動之后,就不能再往其中添加任何計算邏輯了。比如執行start()方法之后,還給某個DStream執行一個算子。
2.一個StreamingContext停止之后,是肯定不能夠重啟的,調用stop()之后,不能再調用start()
3.一個JVM同時只能有一個StreamingContext啟動,在你的應用程序中,不能創建兩個StreamingContext。
4.調用stop()方法時,會同時停止內部的SparkContext,如果不希望如此,還希望后面繼續使用SparkContext創建其他類型的Context,比如SQLContext,那么就用stop(false)。
5.一個SparkContext可以創建多個StreamingContext,只要上一個先用stop(false)停止,再創建下一個即可。

七:代碼編寫

(一)從Socket獲取數據---用於測試

  def main(args:Array[String]):Unit={
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[5]")
    val sc = new SparkContext(conf)
    
    //設置streamingcontext
    val scc = new StreamingContext(sc,Seconds(2))
    
    //數據輸入
    val inDStream:ReceiverInputDStream[String] = scc.socketTextStream("localhost", 9090)
    inDStream.print()  //數據打印
    
    //數據處理
    val resultDStream:DStream[(String,Int)]=inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
    
    //數據輸出
    resultDStream.print()
    
    //啟動應用程序
    scc.start()
    scc.awaitTermination()
    scc.stop()
  }

 nc下載地址:https://eternallybored.org/misc/netcat/,在cmd中使用:

 

(二)設置hdfs獲取數據

scc.textFileStream("hdfs://ns:9000/streaming")

八:緩存與持久化機制

與RDD類似,Spark Streaming也可以讓開發人員手動控制,將數據流中的數據持久化到內存中,對DStream調用persist()方法,就可以讓Spark Streaming自動將該數據流中的所有產生的RDD都持久化到內存中如果要對一個DStream多次執行操作,那么,對DSteram持久化是非常有用的。因為多次操作,可以共享使用內存中的一份緩存數據

對於基礎窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操作,比如updateStateByKey,默認就隱式開啟了持久化機制,即Spark Streaming默認就會將上述操作產生的DStream中的數據,緩存到內存中,不需要開發人員手動調用persist()方法。

對於通過網絡接收數據的輸入流,比如Socket、Kafka、Flume等,默認的持久化級別是將數據復制一份,以便於容錯,相當於用的是MEMORY_ONLY_SER_2。

與Spark Core中的RDD不同的是,默認的持久化級別,統一都是要序列化的。

九:Checkpoint機制

每一個Spark Streaming應用,正常來說都是要7x24小時運轉的,這就是實時計算程序的特點。要持續不斷的對數據進行計算,必須要能夠對於應用程序邏輯無關的失敗進行容錯。

對於一些將多個batch的數據進行聚合的,有狀態的transformation操作,這是非常有用的。在這種transformation操作中,生成的RDD是依賴之前的batch中的RDD的,這樣就會隨着時間的推移,依賴鏈條越來越長,從而導致失敗恢復時間也變得越來越差有狀態的transformation操作執行過程當中產生的RDD要定期的被checkpoint到可靠的存儲上,這樣做可以消減RDD的依賴鏈條,從而縮短恢復時間。

當使用了有狀態的transformation操作時,必須要開啟checkpoint機制,提供checkpoint目錄。

注意,並不是所有的Spark Streaming應用程序都要啟用checkpoint機制

如何啟用Checkpoint機制:

1.配置一個文件系統(比如HDFS)的目錄,作為checkpoint目錄
2.使用StreamingContext的checkpoint方法,填入配置好的目錄作為參數即

十: 注意事項

如果要在實時計算應用中並行接收多條數據流,可以創建多個輸入DStream,這樣就會創建多個Receiver,從而並行地接收多個數據流。這里有一個問題,一個Spark Streaming應用程序的executor是一個長期運行的任務,所以它會獨占分配給Spark Streaming應用程序的CPU core,所以只要Spark Streaming運行起來之后,這個節點上的CPU core數就沒有辦法給其他的應用所使用了,因為會被Receiver所獨占
使用本地模式運行程序時,必須使用local[n],n>=2絕對不能用local和local[1],因為就會給執行輸入DStream的executor分配一個線程Spark Streaming底層的原理需要至少有兩個線程,一個線程分配給Receiver接收數據,另一個線程用來處理接收到的數據。如果線程小於2的話,那么程序只會接收數據,不會處理數據
如果直接將Spark Streaming應用提交到集群上運行,需要保證有足夠資源。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM