總結《SparkStreaming實時流式大數據處理實戰》
一、初始spark
1. 初始sparkstreaming
1.1 大數據處理模式
1. 一種是原生流處理(Native)的方式,即所有輸入記錄會一條接一條地被處理,storm 和 flink
2. 另一種是微批處理(Batch)的方式,將輸入的數據以某一時間間隔,切分成多個微批量數據,然后對每個批量進行處理,sparkStreaming
1.2 消息傳輸保障
at most once 至多一次,可能會丟
at least once 至少一次,可能重復
exactly once 精確傳遞一次
1.3 容錯機制
流式處理發生中斷出錯的現象是常有的情況,可能是發生在網絡部分、某個節點宕機或程序異常等。,當發生錯誤導致任務中斷后,應該能夠恢復到之前成功的狀態重新消費。
Storm是利用記錄確認機制(Record ACKs)來提供容錯功能。
Spark Streaming則采用了基於RDD Checkpoint的方式進行容錯。
1.4 性能
延時時間:storm > spark
吞吐量 :spark > storm
1.5. Structed Streaming(結構化)簡述
不同點:
Spark Streaming是以RDD構成的DStream為處理結構.
Structed Streaming是一種基於Spark SQL引擎的可擴展且容錯的流處理引擎
Structed 提供了更加低延遲的處理模式
相同點:
都是微批的實時處理,內部並不是逐條處理數據記錄,而是按照一個個小batch來處理,從而實現低延遲的端到端延遲和一次性容錯保證。
二、共享變量
節點之間會給每個節點傳遞一個map、reduce等操作函數的獨立副本,這些變量也會被復制到每台機器上,而節點之間的運算是相互獨立的。
這些變量會被復制到每一台機器上,並且當變量發生改變時,並不會同步傳播回Driver程序。
如果進行通用支持,任務間的讀寫共享變量需要大量的同步操作,這會導致低效。
Spark提供了兩種受限類型的共享變量用於兩種常見的使用模式:廣播變量和累加器。
2.1 累加器(Accumulator)
累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效地應用於並行操作中。累加器能夠用來實現對數據的統計和求和操作。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型。

節點上的任務可以利用add方法進行累加操作,但是它們並不能讀取累加器的值。只有Driver程序能夠通過value方法讀取累加器的值。
2.2 廣播變量(Broadcast)
廣播變量允許程序員在每台機器上緩存一個只讀的變量,而不是每個任務保存一份拷貝。
利用廣播變量,我們有效的將一個大數據量輸入集合的副本分配給每個節點。Spark也有效的利用廣播算法去分配廣播變量,以減少通信的成本。

可以利用廣播變量將一些經常訪問的大變量進行廣播,而不是每個任務保存一份,這樣可以減少資源上的浪費。
三、DStream
Spark Streaming對微批處理方式做了一個更高層的抽象,將原始的連續的數據流抽象后得到的多個批處理數據(batches)抽象為離散數據流(discretized stream),DStream本質是RDD數據結構的序列
3.1 DStream的創建方式
一是從Kafka、Flume等輸入數據流上直接創建。
二是對其他DStream采用高階API操作之后得到(如map、flatMap等)。
3.2 DStream的Transform操作
與RDD的Transformation類似,DStream的轉移操作也不會觸發真正的計算,只會記錄整個計算流程。
- map
- flatMap
- fliter
- repartition
- union
- reduce
- join
3.2.1 UpdateStateByKey操作
流式處理本身是無狀態的,如何記錄更新一種狀態呢?
我們可以利用外部存儲介質或者利用累加器來實現,而UpdateStateByKey就是專門用於這項工作,可以利用這個操作根據新的信息流持續地更新任意狀態。
(1)定義狀態(state):該狀態可以是任意數據類型。
(2)定義狀態更新函數(update function):我們需要制定一個函數,根據先前的狀態和數據流中新的數據值來更新狀態值。
特別注意:使用updateStateByKey需要配置checkpoint,這個將在后面詳細介紹。
3.2.2 Transform操作

傳入的函數會在每個時間間隔(interval)的batch中被執行,這允許我們做不同時間段的RDD操作,也就是說RDD操作、分支(partitions)數量及廣播變量等都可在batch間進行改變。
3.2.3 windows操作
Spark Streaming還提供了基於窗口的計算,允許我們在滑動窗口數據上進行Transformation操作。

在原始DStream上,窗口每滑動一次,在窗口范圍內的RDDs就會結合、操作,形成一個新的基於窗口的DStream(windowed DStream)。
- 窗口長度(window length):窗口的持續時長(圖中為3)。
- 滑動間隔(sliding interval):執行窗口操作的時間間隔(圖中為2)。

3.2.4. join操作
(1)Stream-Stream join操作


(2)Stream-dataset join操作

這里的dataset是RDD數據類型,在前面介紹Transform時我們提到DStream的Transform操作使我們能夠直接操作DStream內部的RDD,所以可以利用上述操作將二者拼接起來。
3.3 DStream 的輸出操作
將DStream中的數據保存到外部系統中,如數據庫或者文件系統。與RDD中的Action操作類似,DStream中只有輸出操作才會觸發DStream的轉移操作(Transformation)。
- print()
- saveAsObjectFiles(prefix,[suffix])
- saveAsTextFiles(prefix.[suffix])
- foreachRDD(func) 會將傳入的func函數應用在DSteam中的每個RDD上,值得注意的是func函數通常會運行在Driver中,並且由於Spark是惰性的,func需要包含Action操作,以此推動整個RDDs的運算。
在實際生產環境中,foreach()這個操作是常用到的,我們可以利用該函數將DStream中的數據按照需要的方式,輸出到指定外部系統中,如MySQL、Redis、HBase、文件等
3.4 SparkStreaming初始化及輸入源
3.4.1初始化流式上下文(StreamingContext)
初始化sparkStreaming需要sparkContext

在創建了上下文對象之后,還需要進一步創建輸入DStream來指定輸入數據源,下面具體介紹。
3.4.2 輸入源及接收器(Receivers)
創建輸入源:
- 基本數據源,這類數據源可以直接由StreamingContext API使用,如文件系統或者套接字連接(socket connections)。
- 高級數據源:像Kafka、Flume、Kinesis等數據源,需要使用額外的接口類。
3.4.2.1 基本數據流
- Socket文本數據流:通過TCP套接字連接接收文本數據產生DStream。
- File數據流:從文件或者任何兼容的文件系統(HDFS、S3、NFS等)中讀取數據產生DStream。
- RDDs隊列(queue)作為數據流:在測試Spark Streaming應用時,可以將一系列RDD,使用streamingContext.queueStream(queueofRDDs)來產生DStream,每個進入隊列的RDD,會被認為是DStream中的一批(batch)數據,會以流式的方式進行處理。
3.4.3持久化、Checkpointing和共享變量
3.4.3.1 DStream持久化(caching/presistence)
對於基於窗口的操作如reduceByWindow和reduceByKeyAndWindow,以及基於狀態的操作如updateStateByKey,它們本身就暗含了對同一數據進行多重操作的特性。因此,由基於窗口操作產生的DStream,即使開發者不主動調用persist()方法,DStreams也會自動持久化在內存當中。默認是內存。
3.4.3.2 Checkpointing操作
一般來說,一個流式處理程序需要24小時不間斷運作,所以其必須擁有一定的與程序邏輯本身相獨立的容錯機制(如系統錯誤、虛擬機宕機等)。Spark Streaming的容錯恢復系統必須擁有檢查點(checkpoint)的充足信息,從而能夠從失敗中恢復過來。其中主要有兩種類型的數據被檢查點記錄下來。
第一種,元數據檢查點(metadata checkpointing):保存流式計算中用於容錯存儲的信息,如HDFS。這些信息會被用來恢復driver節點的流式應用。
- 配置(Configuration):用於創建流式應用的配置信息。
- DStream操作:流式應用中定義的DStream操作集。
- 未完成的批處理(batches):工作(Job)已經入隊但是還未完成的批處理。
第二種,數據檢查點(data checkpointing):將已經生成的RDDs進行可靠的存儲。這在一些依賴於多個batch數據的狀態轉移操作(stateful transformation)中是必須的。
transform 狀態轉移操作的中間RDD會通過定期檢查點(checpoint)輸出到可靠的存儲介質當中(如HDFS),從而切斷這種依賴鏈的影響。
下面的條件,則必須使用檢查點(checpointing):
- 使用有狀態轉移操作(stateful transformation):如果在應用中使用了updateStateByKey或者類似reduceByKeyAndWindow的操作,則必須設置檢查點目錄(checkpoint directory),從而能夠定期保存RDD。
- 將運行程序的Driver節點從失敗中恢復:元數據檢查點被用來恢復這個過程。
檢查點的保存目錄可以設置為(如HDFS、S3等),這樣檢查點的信息就會保存在我們所設置的目錄下,我們可以使用streamingContext.checkpoint(checkpoint Directory)進行設置。
當程序第一次運行時,其必須創建新的StreamingContext,並且設定好所有的流(streams),之后調用start()函數。
當一個程序需要從失敗中重啟時,其會根據檢查點目錄中保存的檢查點數據來重建StreamingContext。

RDD檢查點保存到可靠存儲空間中時,會有一定的消耗。這可能會使得那些被保存RDD檢查點的batches的處理時間加長,因此檢查點的間隔(interval)需要謹慎設置。在最小的batch尺度上(1s),每個batch的檢查點設置可能會嚴重降低操作的生產量。而相反,如果檢查點過於頻繁,會導致任務尺寸的增長,造成不利的影響。
通常在DStream間隔(intervals)的基礎上增加5~10s是一個比較好的檢查點時間間隔。
特別說明,關於累加器、廣播變量的檢查點會有所不同,累加器和廣播變量無法從Spark Streaming的檢查點恢復。
如果我們開啟了檢查點(checpointing),並且同時使用了累加器或者廣播變量,那么必須為累加器和廣播變量創建一個惰性實例化的單例(lazily instantiatedsingleton instances),從而使得它們能夠在Driver重啟時能夠再次被實例化,代碼如下:


