一:SparkCore,SparkSQL和SparkStreaming的類似之處
(一)SparkCore
Spark Core主要是作為離線批處理(Batch Processing),每次處理的數據都是一個固定的數據集,而不是變化的
相關概念:
RDD:彈性分布式數據集
Spark Context:Spark的上下文,它負責與程序和spark集群進行交互,包括申請集群資源、創建RDD、accumulators及廣播變量等。
(二)Spark SQL
Spark SQL用於交互式處理(interactive Processing),同樣的,每次處理的數據都是一個固定的數據集,而不是變化的
相關概念:
DataFrame=RDD+Schema
DataFrame:相當於一個Row類型的DataSet,在Spark 2.x之后推薦使用DataSet
SQLContext:SQL的上下文
(三)Spark Streaming
Spark Streaming是一個流式數據處理(Stream Processing)的框架,要處理的數據就像流水一樣源源不斷的產生,就需要實時處理。
在Spark Streaming中,對於Spark Core進行了API的封裝和擴展,將流式的數據切分為小批次(batch,稱之為微批,按照時間間隔切分)進行處理,可以用於進行大規模、高吞吐量、容錯的實時數據流的處理。---同Storm相比:Storm是來一條數據處理一條數據,是真正意義上的實時處理
支持從很多種數據源中讀取數據,使用算子來進行數據處理,處理后的數據可以被保存到文件系統、數據庫等存儲中
相關概念:
DStream:離散流,相當於是一個數據的集合
StreamingContext:在創建StreamingContext的時候,會自動的創建SparkContext對象
對於電商來說,每時每刻都會產生數據(如訂單,網頁的瀏覽數據等),這些數據就需要實時的數據處理
將源源不斷產生的數據實時收集並實時計算,盡可能快的得到計算結果並展示。
二:Spark Streaming組成部分
(一)數據源
大多情況從Kafka中獲取數據,還可以從Flume中直接獲取,還能從TCP Socket中獲取數據(一般用於開發測試)
(二)數據處理
主要通過DStream針對不同的業務需求使用不同的方法(算子)對數據進行相關操作。
企業中最多的兩種類型統計:實時累加統計(如統計某電商銷售額)會用到DStream中的算子updateStateBykey、實時統計某段時間內的數據(如對趨勢進行統計分析,實時查看最近20分鍾內各個省份用戶點擊廣告的流量統計)會用到reduceByKeyAndWindow這個算子。
(三)存儲結果
調用RDD中的API將數據進行存儲,因為Spark Streaming是將數據分為微批處理的,所以每一批次就相當於一個RDD。
可以把結果存儲到Console(控制台打印,開發測試)、Redis(基於內存的分布式Key-Value數據庫)、HBase(分布式列式數據庫)、RDBMS(關系型數據庫,如MySQL,通過JDBC)
三:SparkStreaming的運行流程
(一)運行流程圖
(二)運行流程
1、我們在集群中的其中一台機器上提交我們的Application Jar,然后就會產生一個Application,開啟一個Driver,然后初始化SparkStreaming的程序入口StreamingContext;
2、Master(Driver是spark作業的Master)會為這個Application的運行分配資源,在集群中的一台或者多台Worker上面開啟Executer,executer會向Driver注冊;
3、Driver服務器會發送多個receiver給開啟的executer,(receiver是一個接收器,是用來接收消息的,在excuter里面運行的時候,其實就相當於一個task任務)
每個作業包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task。
4、receiver接收到數據后,每隔200ms就生成一個block塊,就是一個rdd的分區,然后這些block塊就存儲在executer里面,block塊的存儲級別是Memory_And_Disk_2;
5、receiver產生了這些block塊后會把這些block塊的信息發送給StreamingContext;
6、StreamingContext接收到這些數據后,會根據一定的規則將這些產生的block塊定義成一個rdd;
四:Spark Streaming工作原理
(一)SparkStreaming工作原理
補充:
BlockInterval:200ms 生成block塊的依據,多久內的數據生成一個block塊,默認值200ms生成一個block塊,官網最小推薦值50ms。
BatchInterval:1s 我們將每秒的數據抽象為一個RDD。那么這個RDD里面包含了多個block(1s則是50個RDD),這些block是分散的存儲在各個節點上的。
Spark Streaming內部的基本工作原理:

(二)SparkStreaming和Storm對比
1.對比優勢
2.應用場景

1、建議在那種需要純實時,不能忍受1s以上延遲的場景下使用,比如金融系統,要求純實時進行金融交易和分析; 2、如果對於實時計算的功能中,要求可靠的事務機制和可靠性機制,即數據的處理完全精准,一條也不能多,一條也不能少,也可以考慮使用Strom; 3、如果需要針對高峰低峰時間段,動態調整實時計算程序的並行度,以最大限度利用集群資源,也可以考慮用Storm; 4、如果一個大數據應用系統,它就是純粹的實時計算,不需要在中間執行SQL交互式查詢、復雜的transformation算子等,那么使用Storm是比較好的選擇

1、如果對上述適用於Storm的三點,一條都不滿足的實時場景,即,不要求純實時,不要求強大可靠的事務機制,不要求動態調整並行度,那么可以考慮使用Spark Streaming; 2、考慮使用Spark Streaming最主要的一個因素,應該是針對整個項目進行宏觀的考慮,即,如果一個項目除了實時計算之外,還包括了離線批處理、交互式查詢等業務功能,而且實時計算中,可能還會牽扯到高延遲批處理、交互式查詢等功能,那么就應該首選Spark生態,用Spark Core開發離線批處理,用Spark SQL開發交互式查詢,用Spark Streaming開發實時計算,三者可以無縫整合,給系統提供非常高的可擴展性。
五:DStream離散流
DStream可以通過輸入數據源來創建,比如Kafka、Flume,也可以通過對其他DStream應用高階函數來創建,比如map、reduce、join、window;
(一)DStream原理(與四中補充信息有關聯)
DStream的內部,其實是一系列持續不斷產生的RDD,RDD是Spark Core的核心抽象,即,不可變的,分布式的數據集;
DStream中的每個RDD都包含了一個時間段內的數據;
(二)DStream算子工作
對DStream應用的算子,其實在底層會被翻譯為對DStream中每個RDD的操作。

(三)DStream算子
六:StreamingContext的使用
(一)StreamingContext的創建
1.直接使用sparkconf配置創建
val conf = new SparkConf().setAppName(appName).setMaster("local"); val ssc = new StreamingContext(conf, Seconds(1));
2.使用已創建的sparkcontext創建
val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)); //batch interval可以根據你的應用程序的延遲要求以及可用的集群資源情況來設置
(二)StreamingContext的使用
1.通過創建輸入DStream來創建輸入數據源。
2.通過對DStream定義transformation和output算子操作,來定義實時計算邏輯。
3.調用StreamingContext的start()方法,來開始實時處理數據。
4.調用StreamingContext的awaitTermination()方法,來等待應用程序的終止。可以使用CTRL+C手動停止,或者就是讓它持續不斷的運行進行計算。
5.也可以通過調用StreamingContext的stop()方法,來停止應用程序。
(三)注意事項
1.只要一個StreamingContext啟動之后,就不能再往其中添加任何計算邏輯了。比如執行start()方法之后,還給某個DStream執行一個算子。
2.一個StreamingContext停止之后,是肯定不能夠重啟的,調用stop()之后,不能再調用start()
3.一個JVM同時只能有一個StreamingContext啟動,在你的應用程序中,不能創建兩個StreamingContext。
4.調用stop()方法時,會同時停止內部的SparkContext,如果不希望如此,還希望后面繼續使用SparkContext創建其他類型的Context,比如SQLContext,那么就用stop(false)。
5.一個SparkContext可以創建多個StreamingContext,只要上一個先用stop(false)停止,再創建下一個即可。
七:代碼編寫
(一)從Socket獲取數據---用於測試
def main(args:Array[String]):Unit={ val conf = new SparkConf().setAppName("WordCount").setMaster("local[5]") val sc = new SparkContext(conf) //設置streamingcontext val scc = new StreamingContext(sc,Seconds(2)) //數據輸入 val inDStream:ReceiverInputDStream[String] = scc.socketTextStream("localhost", 9090) inDStream.print() //數據打印 //數據處理 val resultDStream:DStream[(String,Int)]=inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_) //數據輸出 resultDStream.print() //啟動應用程序 scc.start() scc.awaitTermination() scc.stop() }
nc下載地址:https://eternallybored.org/misc/netcat/,在cmd中使用:
(二)設置hdfs獲取數據
scc.textFileStream("hdfs://ns:9000/streaming")
八:緩存與持久化機制
與RDD類似,Spark Streaming也可以讓開發人員手動控制,將數據流中的數據持久化到內存中,對DStream調用persist()方法,就可以讓Spark Streaming自動將該數據流中的所有產生的RDD都持久化到內存中。如果要對一個DStream多次執行操作,那么,對DSteram持久化是非常有用的。因為多次操作,可以共享使用內存中的一份緩存數據。
對於基礎窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操作,比如updateStateByKey,默認就隱式開啟了持久化機制,即Spark Streaming默認就會將上述操作產生的DStream中的數據,緩存到內存中,不需要開發人員手動調用persist()方法。
對於通過網絡接收數據的輸入流,比如Socket、Kafka、Flume等,默認的持久化級別是將數據復制一份,以便於容錯,相當於用的是MEMORY_ONLY_SER_2。
與Spark Core中的RDD不同的是,默認的持久化級別,統一都是要序列化的。
九:Checkpoint機制
每一個Spark Streaming應用,正常來說都是要7x24小時運轉的,這就是實時計算程序的特點。要持續不斷的對數據進行計算,必須要能夠對於應用程序邏輯無關的失敗進行容錯。
對於一些將多個batch的數據進行聚合的,有狀態的transformation操作,這是非常有用的。在這種transformation操作中,生成的RDD是依賴之前的batch中的RDD的,這樣就會隨着時間的推移,依賴鏈條越來越長,從而導致失敗恢復時間也變得越來越差。有狀態的transformation操作執行過程當中產生的RDD要定期的被checkpoint到可靠的存儲上,這樣做可以消減RDD的依賴鏈條,從而縮短恢復時間。
當使用了有狀態的transformation操作時,必須要開啟checkpoint機制,提供checkpoint目錄。
注意,並不是所有的Spark Streaming應用程序都要啟用checkpoint機制
如何啟用Checkpoint機制:
1.配置一個文件系統(比如HDFS)的目錄,作為checkpoint目錄
2.使用StreamingContext的checkpoint方法,填入配置好的目錄作為參數即
十: 注意事項
如果要在實時計算應用中並行接收多條數據流,可以創建多個輸入DStream,這樣就會創建多個Receiver,從而並行地接收多個數據流。這里有一個問題,一個Spark Streaming應用程序的executor是一個長期運行的任務,所以它會獨占分配給Spark Streaming應用程序的CPU core,所以只要Spark Streaming運行起來之后,這個節點上的CPU core數就沒有辦法給其他的應用所使用了,因為會被Receiver所獨占。
使用本地模式運行程序時,必須使用local[n],n>=2絕對不能用local和local[1],因為就會給執行輸入DStream的executor分配一個線程,Spark Streaming底層的原理需要至少有兩個線程,一個線程分配給Receiver接收數據,另一個線程用來處理接收到的數據。如果線程小於2的話,那么程序只會接收數據,不會處理數據。
如果直接將Spark Streaming應用提交到集群上運行,需要保證有足夠資源。