SparkStreaming實時流式大數據處理實戰總結


總結《SparkStreaming實時流式大數據處理實戰》

一、初始spark

1. 初始sparkstreaming

1.1 大數據處理模式

1. 一種是原生流處理(Native)的方式,即所有輸入記錄會一條接一條地被處理,storm 和 flink

2. 另一種是微批處理(Batch)的方式,將輸入的數據以某一時間間隔,切分成多個微批量數據,然后對每個批量進行處理,sparkStreaming

 

1.2 消息傳輸保障

at most once  至多一次,可能會丟

at least once  至少一次,可能重復

exactly once  精確傳遞一次

 

1.3 容錯機制

流式處理發生中斷出錯的現象是常有的情況,可能是發生在網絡部分、某個節點宕機或程序異常等。,當發生錯誤導致任務中斷后,應該能夠恢復到之前成功的狀態重新消費。

 

Storm是利用記錄確認機制(Record ACKs)來提供容錯功能。

Spark Streaming則采用了基RDD Checkpoint的方式進行容錯。

 

1.4 性能

延時時間:storm > spark

吞吐量 :spark > storm

 

1.5. Structed Streaming(結構化)簡述

不同點:

Spark Streaming是以RDD構成的DStream為處理結構.

Structed Streaming是一種基於Spark SQL引擎的可擴展且容錯的流處理引擎

Structed 提供了更加低延遲的處理模式

 

相同點:

都是微批的實時處理,內部並不是逐條處理數據記錄,而是按照一個個小batch來處理,從而實現低延遲的端到端延遲和一次性容錯保證。

 

二、共享變量

節點之間會給每個節點傳遞一個map、reduce等操作函數的獨立副本,這些變量也會被復制到每台機器上,而節點之間的運算是相互獨立的。

這些變量會被復制到每一台機器上,並且當變量發生改變時,並不會同步傳播回Driver程序。

如果進行通用支持,任務間的讀寫共享變量需要大量的同步操作,這會導致低效。

Spark提供了兩種受限類型的共享變量用於兩種常見的使用模式:廣播變量累加器

 

2.1 累加器(Accumulator)

累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效地應用於並行操作中。累加器能夠用來實現對數據的統計和求和操作。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型。

 

 

節點上的任務可以利用add方法進行累加操作,但是它們並不能讀取累加器的值。只有Driver程序能夠通過value方法讀取累加器的值。

 

2.2 廣播變量(Broadcast)

廣播變量允許程序員在每台機器上緩存一個只讀的變量,而不是每個任務保存一份拷貝。

利用廣播變量,我們有效的將一個大數據量輸入集合的副本分配給每個節點。Spark也有效的利用廣播算法去分配廣播變量,以減少通信的成本。

 

 

 

可以利用廣播變量將一些經常訪問的大變量進行廣播,而不是每個任務保存一份,這樣可以減少資源上的浪費。

 

 

三、DStream

Spark Streaming對微批處理方式做了一個更高層的抽象,將原始的連續的數據流抽象后得到的多個批處理數據(batches)抽象為離散數據流(discretized stream),DStream本質是RDD數據結構的序列

 

3.1 DStream的創建方式

一是從Kafka、Flume等輸入數據流上直接創建。

二是對其他DStream采用高階API操作之后得到(如map、flatMap等)。

 

3.2 DStream的Transform操作

與RDD的Transformation類似,DStream的轉移操作也不會觸發真正的計算,只會記錄整個計算流程。

  • map
  • flatMap
  • fliter
  • repartition
  • union
  • reduce
  • join

 

3.2.1 UpdateStateByKey操作

流式處理本身是無狀態的,如何記錄更新一種狀態呢?

我們可以利用外部存儲介質或者利用累加器來實現,而UpdateStateByKey就是專門用於這項工作,可以利用這個操作根據新的信息流持續地更新任意狀態。

(1)定義狀態(state):該狀態可以是任意數據類型。

(2)定義狀態更新函數(update function):我們需要制定一個函數,根據先前的狀態和數據流中新的數據值來更新狀態值。

 

特別注意:使用updateStateByKey需要配置checkpoint,這個將在后面詳細介紹。

 

3.2.2 Transform操作

 

 

 傳入的函數會在每個時間間隔(interval)的batch中被執行,這允許我們做不同時間段的RDD操作,也就是說RDD操作、分支(partitions)數量及廣播變量等都可在batch間進行改變。

 

3.2.3 windows操作

Spark Streaming還提供了基於窗口的計算,允許我們在滑動窗口數據上進行Transformation操作。

 

 

在原始DStream上,窗口每滑動一次,在窗口范圍內的RDDs就會結合、操作,形成一個新的基於窗口的DStream(windowed DStream)。

  •  窗口長度(window length):窗口的持續時長(圖中為3)。
  • 滑動間隔(sliding interval):執行窗口操作的時間間隔(圖中為2)。

 

 

3.2.4. join操作

(1)Stream-Stream join操作

 

 

 

 

(2)Stream-dataset join操作

 

 

 

這里的dataset是RDD數據類型,在前面介紹Transform時我們提到DStream的Transform操作使我們能夠直接操作DStream內部的RDD,所以可以利用上述操作將二者拼接起來。

 

3.3 DStream 的輸出操作

將DStream中的數據保存到外部系統中,如數據庫或者文件系統。與RDD中的Action操作類似,DStream中只有輸出操作才會觸發DStream的轉移操作(Transformation)。

  • print()
  • saveAsObjectFiles(prefix,[suffix])
  • saveAsTextFiles(prefix.[suffix])
  • foreachRDD(func) 會將傳入的func函數應用在DSteam中的每個RDD上,值得注意的是func函數通常會運行在Driver中,並且由於Spark是惰性的,func需要包含Action操作,以此推動整個RDDs的運算。

在實際生產環境中,foreach()這個操作是常用到的,我們可以利用該函數將DStream中的數據按照需要的方式,輸出到指定外部系統中,如MySQL、Redis、HBase、文件等

 

3.4 SparkStreaming初始化及輸入源

3.4.1初始化流式上下文(StreamingContext)

初始化sparkStreaming需要sparkContext 

 

 在創建了上下文對象之后,還需要進一步創建輸入DStream來指定輸入數據源,下面具體介紹。

 

3.4.2 輸入源及接收器(Receivers)

創建輸入源:

  1. 基本數據源,這類數據源可以直接由StreamingContext API使用,如文件系統或者套接字連接(socket connections)。
  2. 高級數據源:像Kafka、Flume、Kinesis等數據源,需要使用額外的接口類。

 

3.4.2.1 基本數據流

  1. Socket文本數據流:通過TCP套接字連接接收文本數據產生DStream。
  2. File數據流:從文件或者任何兼容的文件系統(HDFS、S3、NFS等)中讀取數據產生DStream。
  3. RDDs隊列(queue)作為數據流:在測試Spark Streaming應用時,可以將一系列RDD,使用streamingContext.queueStream(queueofRDDs)來產生DStream,每個進入隊列的RDD,會被認為是DStream中的一批(batch)數據,會以流式的方式進行處理。

 

3.4.3持久化、Checkpointing和共享變量

3.4.3.1 DStream持久化(caching/presistence)

對於基於窗口的操作如reduceByWindow和reduceByKeyAndWindow,以及基於狀態的操作如updateStateByKey,它們本身就暗含了對同一數據進行多重操作的特性。因此,由基於窗口操作產生的DStream,即使開發者不主動調用persist()方法,DStreams也會自動持久化在內存當中。默認是內存。

 

3.4.3.2 Checkpointing操作

一般來說,一個流式處理程序需要24小時不間斷運作,所以其必須擁有一定的與程序邏輯本身相獨立的容錯機制(如系統錯誤、虛擬機宕機等)。Spark Streaming的容錯恢復系統必須擁有檢查點(checkpoint)的充足信息,從而能夠從失敗中恢復過來。其中主要有兩種類型的數據被檢查點記錄下來。

 

第一種,元數據檢查點(metadata checkpointing):保存流式計算中用於容錯存儲的信息,如HDFS。這些信息會被用來恢復driver節點的流式應用。

  • 配置(Configuration):用於創建流式應用的配置信息。
  • DStream操作:流式應用中定義的DStream操作集。
  • 未完成的批處理(batches):工作(Job)已經入隊但是還未完成的批處理。

 

第二種,數據檢查點(data checkpointing):將已經生成的RDDs進行可靠的存儲。這在一些依賴於多個batch數據的狀態轉移操作(stateful transformation)中是必須的。

transform 狀態轉移操作的中間RDD會通過定期檢查點(checpoint)輸出到可靠的存儲介質當中(如HDFS),從而切斷這種依賴鏈的影響。

 

下面的條件,則必須使用檢查點(checpointing):

  1. 使用有狀態轉移操作(stateful transformation):如果在應用中使用了updateStateByKey或者類似reduceByKeyAndWindow的操作,則必須設置檢查點目錄(checkpoint directory),從而能夠定期保存RDD。
  2. 將運行程序的Driver節點從失敗中恢復:元數據檢查點被用來恢復這個過程。

檢查點的保存目錄可以設置為(如HDFS、S3等),這樣檢查點的信息就會保存在我們所設置的目錄下,我們可以使用streamingContext.checkpoint(checkpoint Directory)進行設置。

 

當程序第一次運行時,其必須創建新的StreamingContext,並且設定好所有的流(streams),之后調用start()函數。

當一個程序需要從失敗中重啟時,其會根據檢查點目錄中保存的檢查點數據來重建StreamingContext。

 

 

 

RDD檢查點保存到可靠存儲空間中時,會有一定的消耗。這可能會使得那些被保存RDD檢查點的batches的處理時間加長,因此檢查點的間隔(interval)需要謹慎設置。在最小的batch尺度上(1s),每個batch的檢查點設置可能會嚴重降低操作的生產量。而相反,如果檢查點過於頻繁,會導致任務尺寸的增長,造成不利的影響。

通常在DStream間隔(intervals)的基礎上增加5~10s是一個比較好的檢查點時間間隔。

 

特別說明,關於累加器、廣播變量的檢查點會有所不同,累加器和廣播變量無法從Spark Streaming的檢查點恢復。

 

如果我們開啟了檢查點(checpointing),並且同時使用了累加器或者廣播變量,那么必須為累加器和廣播變量創建一個惰性實例化的單例(lazily instantiatedsingleton instances),從而使得它們能夠在Driver重啟時能夠再次被實例化,代碼如下:

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM