Spark Streaming編程指南


Overview

  • A Quick Example
  • Basic Concepts
    • Linking
    • Initializing StreamingContext
    • Discretized Streams (DStreams)
    • Input DStreams and Receivers
    • Transformations on DStreams
    • Output Operations on DStreams
    • DataFrame and SQL Operations
    • MLlib Operations
    • Caching / Persistence
    • Checkpointing
    • Accumulators, Broadcast Variables, and Checkpoints
    • Deploying Applications
    • Monitoring Applications
  • Performance Tuning
    • Reducing the Batch Processing Times
    • Setting the Right Batch Interval
    • Memory Tuning
  • Fault-tolerance Semantics
  • Where to Go from Here

1. Overview 概述

 Spark Streaming是核心Spark API的擴展,支持可擴展,高吞吐量,實時數據流的容錯數據流處理。可以從sources(如Kafka、Flume、Kinesis、或者TCP sockets)獲取數據,並通過復雜的算法處理數據,這些算法使用高級函數(如map,reduce,join和window)表示。最后,處理過的數據可以推送到文件系統、數據庫和實時儀表板。事實上,你可以將Spark的機器學習和圖形處理算法應用於數據流。

在內部,它的工作原理如下。Spark Streaming接受實時輸入數據流,並把數據分成批,然后由Spark引擎處理,以批量生成最終結果流。

 

Spark Streaming提供了一個高層次的抽象,稱為離散流或DStream,它代表連續的數據流。DStreams可以通過Kafka,Flume和Kinesis等來源的輸入數據流創建,也可以通過在其他DStream上應用高級操作來創建。在內部,一個DStream被表示為一系列RDD。

本指南將向你介紹如何開始使用DStreams編寫Spark Streaming程序。你可以使用Scala,Java或Python編寫Spark Streaming程序,所有這些都在本指南中介紹。你將可以在本指南中通過標簽,選擇不同語言的代碼片段。

注意: 在Python中有少數不同或者不可用的APIs。通過本指南,你將會找到突出顯示這些差異的Python API標簽。

2. A Quick Example 快速學習示例

在我們進入如何編寫屬於你自己的Spark Streaming程序的細節之前,讓我們快速瀏覽一個簡單Spark Streaming程序是怎樣的。假設我們想要統計從監聽TCP socket的數據服務器接收到的文本數據中的字數。全部你需要做的是:

第一,我們將Spark Streaming類名和StreamingContext的一些隱式轉換導入到我們的環境中,以便將有用的方法添加到我們需要的其他類(如DStream)中。StreamingContext是所有流功能的主要入口點。我們創建一個帶有兩個執行線程的本地StreamingContext,批處理間隔為1秒。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

通過使用該context,我們可以創建一個DStream來表示來自TCP source的流數據,指定為主機名(例如localhost)和端口(例如9999)。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

該lines DStream表示從數據服務器接收到的流數據。在該DStream中的每條記錄是一行文本。接下來,我們想要通過空白字節把行分割成單詞。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap是一個一對多one-to-many的DStream操作,通過在源DStream把每條記錄生成多個記錄來創建一個新的DStream。在這種情況下,每行將會被分割成多個單詞,單詞流被表示為words DStream。接下來,我們想要統計這些單詞。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words DStream進一步映射(一對一變換)為一個(字,1)對的DStream,然后通過reduceByKey獲得每批數據中字的頻率。最后,wordCounts.print()將會打印每秒生成的一些計數。

注意當執行這些lines時,Spark Streaming只會設置它在啟動時執行的計算,並且尚未開始實際處理。在所有轉換完成之后開始處理,我們最終調用

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整的代碼可以在Spark Streaming示例NetworkWordCount中找到。

如果你已經准備下載和建立Spark,你可以運行這個例子。你首先需要通過使用nc -lk 9999運行Netcat(在大多數Unix系統中都有一個小實用程序)作為一個數據源服務器,然后在一個不同的終端中,你可以通過使用命令啟動該例子:

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

然后,在運行netcat服務終端上輸入的任何行將會被計數並每秒在屏幕上打印。它看起來像下面那樣:

3. Basic Concepts 基礎概念

接下來,我們超越這個簡單的例子,詳細簡述Spark Streaming的基礎知識。

3.1 Linking 鏈接

 與Spark類似,Spark Streaming可以通過Maven Central獲得。編寫你自己的Spark Streaming程序,你必須添加下述依賴到你的SBT或者Maven項目。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

為了從sources(如Kafka,Flume和Kinesis,這些sources不在Spark Streaming核心API中)獲取數據,你必須將相應的artifact(工件)spark-streaming-xyz_2.11添加到依賴關系中。例如,一些常用的commons如下:

Source Artifact
Kafka spark-streaming-kafka-0-10_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

有關最新的列表,請參閱Maven存儲庫以獲取支持的sources和artifacts(工件)的完整列表。

3.2 Initializing StreamingContext 初始化StreamingContext

為了初始化Spark Streaming程序,必須創建一個StreamingContext對象,這是所有Spark Streaming功能的主要入口點。

一個StreamingContext對象可以從一個SparkConf對象創建:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName參數是你應用程序在集群UI上展示的名字。master是Spark,Mesos或者YARN集群URL,或者是以本地模式運行的特定“local[*]”字符串。實際上, 當在一個集群中運行時,你不希望在程序中hardcode(硬編碼)master,而是使用spark-submit啟動應用程序,並在那里接收它。然而,對於本地測試和單元測試,你可以通過"local[*]"運行進程內的Spark Streaming(檢測本地系統中的核心數量)。注意這內部創建了一個SparkContext(Spark全部功能的起點),它可以作為ssc.sparkContext被訪問。

批處理間隔必須根據你的應用程序和可用集群資源的延遲要求來設置。詳情請看Performance Tuning節點。

一個StreamingContext對象也可以從一個現有的SparkContext對象中創建。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

當一個context(上下文)被定義后,你必須執行以下操作:

  1. 通過創建輸入DStreams定義輸入源。
  2. 通過將轉換(transformation)和輸出操作應用於DStreams來定義流計算。
  3. 開始接收數據並使用streamingContext.start()處理它。
  4. 使用streamingContext.awaitTermination()去等待處理停止(手動或由於任何錯誤)。
  5. 可以使用streamingContext.stop()去手動停止處理。

要點紀要:

  • 一旦context(上下文)已經開始后,就不能建立或者添加新的流式計算。
  • 一旦context(上下文)已經停止后,它就不能被重新啟動。
  • 在同一時間一個JVM虛擬機里只能有一個StreamingContext可以處於活動狀態。
  • StreamingContext上的stop()也可以停止SparkContext。為了僅停止StreamingContext,設置stop()可選參數stopSparkContext為false。
  • 一個sparkContext可以被重新使用去創建多個StreamingContexts,只要先前的StreamingContext在下一個StreamingContext被創建之前停止(不停止SparkContext)。

3.3 Discretized Streams(DStreams) 離散流

Discretized Streams或者DStream是Spark Streaming提供的基本抽象。它表示一個可持續的數據流,或者是從source接收的輸入數據流,或者是通過轉換輸入流生成的處理過的數據流。在內部,DStream由連續的RDD系列表示,它是Spark對不可變的分布式dataset(數據集)的抽象(詳情請看Spark Programming Guide)。DStream中的每個RDD都包含一定間隔的數據,如下圖所示。

在DStream上應用的任何操作都會轉換為對基礎RDD的操作。例如,在將行轉換為單詞的早期例子中,在行DStream中的每個RDD上應用flatMap操作以生成單詞DStream的RDD。如下圖所示。

這些基礎的RDD轉換是通過Spark引擎計算的。DStream操作隱藏大部分細節,並為開發者提供了一個便利的更高級API。這些操作可在下個節點中討論。

3.4 Input DStreams and Receivers 輸入DStreams和接收者

Input DStream是表示從流來源接收的輸入數據流的DStreams。在快速學習實例中,lines是一個輸入DStreams,因為它表示從netcat服務器接收的數據流。每個輸入DStream(除了文件流,在本節稍后討論)與一個接收者對象相關聯的,它從一個source源接收數據並將其存儲在Spark的內存中進行處理。

Spark Streaming提供了兩類內置的流sources(源)。

  • 基本sources:在StreamingContext API中Sources直接可用。例如:文件系統和socket(套接字)連接。
  • 高級sources:源(如Kafka,Flume,Kinesis等等)通過額外的使用類是可用的。這些要求添加額外的依賴包,這在linking節點中有過討論。

我們將要在本節點后面討論每個類別出現的一些來源sources。

注意,如果你想要在你streaming應用程序中並行接收多個數據源,你可以創建多個輸入DStreams(在Performance Tuning節點進一步討論)。這將會創建多個接收器,它將同時接收多個數據流。但是注意,Spark worker/executor是一個長期運行的任務,它占用了分配給Spark Streaming應用程序的內核其中之一。因此,重要的是要記住,Spark Streaming應用程序需要分配足夠多的內核(或者線程,如果本地運行時)去處理接收到的數據,以及運行接收器。

記住要點:

  • 當本地運行Spark Streaming項目時,不要使用“local”或者“local[1]”作為master URL。這些中的任何一個意味着本地只有一個線程用於運行任務。如果你使用基於接收器(如sockets、Kafka、Flume等)的輸入DStream,然后單線程將會用於該接收器,而沒有線程用於處理接收到的數據。因此,當本地運行時,總是使用"local[n]"作為master URL,這里n>運行的接收器的數量(詳情查看Spark Properties關於如何設置master的信息)。
  • 將邏輯擴展在集群上運行,分配給Spark Streaming應用程序的內核數量必須大於接收器的數量。否則系統只會接收到數據,但無法處理。

3.4.1 Basic Sources 基本來源

在從一個TCP socket(套接字)連接接收到的文本數據創建DStream的快速學習實例中,我們已經了解過ssc.socketTextStream(...)。除了sockets(套接字),StreamingContext API還提供了從文件去創建DStreams作為輸入sources的方法。

File Steam文件流:為了從與HDFS API(即HDFS,S3,NFS等)兼容的任何文件系統上的文件讀取數據,可以創建DStream為:

  streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming將會監控dataDirectory目錄並處理該目錄下創建的任何文件(不支持嵌套目錄編寫的文件)。注意:

  • 文件必須有相同的數據格式。
  • 文件必須通過在數據目錄中原子移動或重命名在dataDirectory中創建。
  • 一旦移動,文件必須不能更變。因此如果文件內容被連續追加,新的數據將不會被讀取。

 對於簡單的文本文件,這里有一個更簡單的方法streamingContext.textFileStream(dataDirectory)。而文件流不要求運行接收器,因此不要求分配內核。

Python API:fileStream在Python API中不可用,只有textFileStream可用。

Streams based on Custom Receivers基於自定義接收器的流:DStreams可以通過自定義接收器接收到的數據流創建。詳情查看Custom Receiver Guide。

Queue of RDDs as a Stream RDD隊列作為流:對於使用測試數據測試Spark Streaming應用程序,還可以使用streamingContext.queueStream(queueOfRDDs)基於RDD隊列創建DStream。推入隊列的每個RDD都將被視為DStream中的一批數據,並像流stream一樣進行處理。

更多關於從sockets和filesd的流的詳細信息,請參閱Scala的StreamingContext、Java的JavaStreamingContext和Python的StreamingContext中有關功能的API文檔。

3.4.2 Advanced Sources 高級Sources

Python API:從Spark2.2.1開始,在這些源中,Kafka、Kinesis和Flume都可以在Python API中使用。

這類資源sources要求與外部的非Spark類庫(其中一些具有復雜的依賴關系)進行交互(例如,Kafka和Flume)。因此,為了盡量減少與依賴包版本沖突有關的問題,從這些源sources創建DStreams的功能已經被移動到可以在必要時顯式鏈接的獨立庫。

注意這些高級源sources在Spark shell是不可用的,因此基於這些高級源sources的應用程序不能在shell中測試。如果你真的想要在Spark shell使用它們,你必須下載相應的Maven工件artifact的JAR以及它的依賴關系包,並將它添加到類路徑中。

這些高級源sources中的一些如下:

  • Kafka:Spark Streaming 2.2.1與Kafka broker版本0.8.2.1或更高版本兼容。詳情請看Kafka Integration Guide。
  • Flume:Spark Streaming2.2.1與Flume1.6.0兼容。詳情請看Flume Integration Guide。
  • Kinesis:Spark Streaming2.2.1與Kinesis Client Library1..2.1兼容。詳情請看Kinesis Integration Guide。

3.4.3 Custom Source自定義source

Python API 這個在Python還不能支持。

 輸入DStreams也可以創建自定義數據源。你需要去做的是實現用戶自定義的接收器(看下一節點了解它是什么),它可以接收來自自定義sources的數據並把它們推送到Spark里面。詳情請看Custom Receiver Guide。

3.4.4 Receiver Reliability 接收器可靠性

基於其可靠性可以有兩種數據源sources。源Sources(如Kafka和Flume)允許傳輸的數據被確認。如果從這些可靠的來源sources接收數據的系統正確地確認接收到的數據,則可以確保沒有數據由於任何故障而丟失。這導致兩種接收器:

  1. 可靠的接收器 —— 當數據已經被接收並存儲在具有復制的Spark中時,可靠的接收器正確地向可靠的來源source發送確認。
  2. 不可靠的接收器 —— 不可靠的接收器不會向來源source發送確認信息。這可以用於不支持確認的來源sources,甚至可以用於不希望或不需要進行復雜性確認的可靠來源。

詳情在Custom Receiver Guide查看如何編寫可靠的接收器。

3.5 Transformations on DStreams 在DStreams上進行轉換

類似於RDD,轉換允許來自輸入DStream的數據被修改。DStreams支持在正常Spark RDD上的很多轉換。詳情如下:

Transformation Meaning
map(func) 通過函數func傳遞來源source DStream的每個元素來返回一個新的DStream
flatMap(func) 類似於map,但是每個輸入item可以映射為0或多個輸出items
filter(func) 通過僅選擇func返回true的那個來源source DStream的記錄來返回一個新的DStream
repartition(numPartitions) 通過創建或多或少的分區來更改此DStream的並行性級別
union(otherStream) 返回一個新的DStream,其中包含來源source DStream和其他DStream中元素的聯合
count() 通過統計來源source DStream的每個RDD的元素數量來返回一個新的單一元素RDD的DStream
reduce(func) 通過使用函數func(它帶有兩個參數並返回一個)來聚合源source DStream的每個RDD中的元素,從而返回一個新的單元素RDD的DStream。函數應該是關聯和可交換的,以便可以並行計算。
countByValue() 當在元素類型為K的DStream上調用時,返回一個新的(K,Long)對的DStream,其中每個鍵的值是它在來源source DStream的每個RDD中的頻率。
reduceByKey(func, [numTasks]) 當在元素類型為(K,V)對的DStream上調用時,返回一個新的(K,V)對的DStream,其中每個鍵的值使用給定的reduce函數進行聚合。注意:默認情況下,它使用Spark的默認並行任務數(2表示本地模式,而在集群模式下,數字由config屬性spark.default.parallelism決定)進行分組。你可以傳遞一個可選的numTasks參數來設置不同數量的任務。
join(otherStream, [numTasks]) 當在兩個元素類型分別為(K,V)和(K,W)的DStreams上調用時,為每個鍵的所有元素對,返回一個新的元素類型為(K,(V,W))對DStream
cogroup(otherStream, [numTasks]) 當在一個元素類型為(K,V)和(K,W)對的DStream上調用時,返回一個新的元素類型為(K, Seq[V], Seq[W])元組的DStream
transform(func) 通過將RDD-to-RDD函數應用於來源source DStream的每個RDD來返回一個新的DStream。這可以用來在DStream上執行任意RDD操作
updateStateByKey(func) 返回一個新的“狀態”DStream,其中通過對鍵的先前狀態和鍵的新值應用給定的函數來更新每個鍵的狀態。這可以用來維護每個鍵的任意狀態數據

這些轉換中一些是值得更詳細地討論的。

3.5.1 UpdateStateByKey Operation UpdateStateByKey操作

updateStateByKey操作允許你維護任意狀態,同時不斷更新新信息。為了使用這個,你將不得不做兩個步驟:

  1. 定義狀態 —— 狀態可以是任意的數據類型。
  2. 定義狀態更新函數 —— 用函數指定如何使用之前的狀態來更新狀態以及如何更新輸入流中的新值。

在每個批處理中,Spark將狀態更新函數(或功能)應用於全部現有的密鑰key,而不管批處理中是否有新數據。如果更新函數(或功能)返回None,那么鍵值對將被消除。

 讓我們用例子說明。假設你想在文件數據流中保持看到的每個單詞的運行數量。這里,運行的數量是狀態並且它是一個整型。我們定義更新函數如:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

這是在包含單詞的DStream中應用(假設,在早期示例中對DStream包含(word,1)對)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函數將會被每個單詞調用,newValues具有1的序列(來自(word,1)對),並且runningCount具有前一個計數。

注意使用updateStateByKey要求配置好checkpoint目錄,詳情參閱checkpointing節點。

3.5.2 Transform Operation 轉換操作

transform轉換操作(帶有它的變量如transformWith)允許任意RDD-to-RDD函數應用於DStream。它可以用於應用任何未在DStream API中公開的RDD操作。例如,在數據流中的每個批次與其他數據集dataset連接起來的功能不會直接暴露在DStream API中。然而,你可以很容易地使用transform完成此操作。這可能性非常強大。例如,可以通過將輸入數據流與預先計算的垃圾信息(也可以使用Spark生成)進行實時數據清理,然后基於此進行過濾。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

注意,提供的函數在每個批處理間隔中被調用。此函數允許你執行隨時間變化的RDD操作,即可以在批次之間更改RDD操作、分區數量、廣播變量等等。

3.5.3 Window Operations Window操作

Spark Streaming也提供窗口化計算,這允許你在滑動的數據窗口上應用轉換transformations。下圖說明該滑動窗口。

就像上圖展示的那樣,每當窗口滑過源Source DStream時,窗口內的源Source RDD被組合並操作以產生窗口DStream的RDD。在這個特定情況中,該操作應用涵蓋在最后3個時間單位的數據上並滑過2個時間單位。這表明任何窗口操作需要指定兩個參數。

  • 窗口長 —— 窗口的持續時間(圖表中的3)
  • 滑動間隔 —— 窗口操作的執行間隔(圖中的2)

 這個兩個參數必須是源source DStream(圖中的1)的批處理間隔的倍數。

讓我們用個例子說明窗口操作。假設你想要通過在最后的30秒數據中每10秒生成一個字數來擴展前面的示例。為了做到這一點,我們必須在最后的30秒數據內在(word,1)對的DStream對上應用reduceByKey操作。這是使用reduceByKeyAndWindow操作完成的。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些普通窗口操作如下。這些操作全部都有上述說的兩個參數——windowLength和slideInterval。

Transformation Meaning
window(windowLengthslideInterval) 基於在源source DStream上的窗口化批處理計算來返回一個新的DStream。
countByWindow(windowLengthslideInterval) 返回流中元素的滑動窗口數量。
reduceByWindow(funcwindowLengthslideInterval) 返回一個新的單元素流,它是通過使用func經過滑動間隔聚合流中的元素來創建的。
reduceByKeyAndWindow(funcwindowLengthslideInterval, [numTasks]) 當在元素類型為(K,V)對的DStream調用時,返回一個新的元素類型為(K,V)對的DStream,其中每個key鍵的值在滑動窗口中使用給定的reduce函數func來進行批量聚合。
reduceByKeyAndWindow(funcinvFuncwindowLengthslideInterval, [numTasks])

上述reduceByKeyAndWindow()的一個更高效的版本,其中每個窗口的reduce值是使用前一個窗口的reduce值遞增計算的。這是通過減少進入滑動窗口的新數據並“反轉減少”離開窗口的舊數據來完成的。示例如當窗口滑動時“增加並減少”key鍵的數量。然而,它僅適用於“可逆減函數”,即具有相應“反減inverse reduce”函數的函數(作為參數invFunc)。如reduceByKeyAndWindow,reduce任務的數量是通過一個可選參數來設置的。注意使用這個操作checkpointing必須是能啟用的。

countByValueAndWindow(windowLength,slideInterval, [numTasks]) 當在元素類型為(K,V)對的DStream上調用時,返回一個新的元素類型為(K,Long)對的DStream,其中每個key鍵的值為它在一個滑動窗口出現的頻率。如在reduceByKeyAndWindow中,reduce任務的數量是通過一個可選參數設置的。

 3.5.4 Join Operations連接操作

最終,值得強調的是,你可以輕松地在Spark Streaming中執行不同類型的連接Joins。

Stream-stream joins 流——流連接

流可以很容易地加入到其他流中。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

這里,在每個批處理間隔中,stream1生成的RDD將會連接到stream2生成的RDD。你也可以執行leftOuterJoin、rightOuterJoin、fullOuterJoin。此外,在流的窗口上進行連接經常是非常有用的。這是非常容易的。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Streams-dataset joins 流——數據集連接

當在前面說明DStream.transform操作時,這已經展示了。這里是將窗口流與數據集dataset連接的另一個實例。

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

事實上,你也可以動態改變你想要連接的數據集dataset。提供轉換的函數在每個批處理間隔中進行評估,因此將使用數據集引用指向的當前數據集dataset。

API文檔提供了DStream轉換的完整列表。對於Scala API,請看DStream和PariDStreamFunctions。對於Java API,請看JavaDStream和JavaPairDStream。對於Python API,請看DStream。

3.6 Output Operations on DStreams 在DStreams上的輸出操作

 輸出操作允許DStream的數據被推送到外部系統(如數據或者文件系統)。由於輸出操作事實上允許外部系統消費轉換過的數據,因此它們會觸發所有DStream轉換的實際執行(類似於RDD的操作)。當前,輸出操作定義如下:

Output Operation Meaning
print()

在運行streaming應用程序的驅動節點上的DStream中打印每批數據的前10個元素。這對開發和調試很有用。

Python API:在Python API中叫做pprint()。

saveAsTextFiles(prefix, [suffix]) 把DStream的內容保存為文本文件。每個批次間隔的文件名是根據前綴prefix和后綴suffix:“prefix-TIME_IN_MS[.suffix]”生成的。
saveAsObjectFiles(prefix, [suffix]) 把此DStream的內容保存為序列化Java對象的序列文件。每個批次間隔的文件名是根據前綴prefix和后綴suffix:“prefix-TIME_IN_MS[.suffix]”生成的。
saveAsHadoopFiles(prefix, [suffix])

把此DStream的內容保存為Hadoop文件。每個批次間隔的文件名是根據前綴prefix和后綴suffix:“prefix-TIME_IN_MS[.suffix]”生成的。

Python API 這個在Python API中不可用。

foreachRDD(func) 最通用的輸出操作,將函數func應用於從流中生成的每個RDD。此功能應將每個RDD中的數據推送到外部系統,例如將RDD保存到文件,或通過網絡將其寫入數據庫。注意,函數func在流應用程序的驅動程序進程中執行,並且通常會在其中執行RDD操作,強制執行流RDD的計算。

3.6.1 Design Patterns for using foreachRDD

dstream.foreachRDD是一個功能強大的原語primitive,它允許將數據發送到外部系統。然而,了解如何正確並高效地使用原語primitive是很重要的。如下可以避免一些普通錯誤。

通常寫數據到外部系統要求創建一個連接對象(例如遠程服務器的TCP連接)並使用它發送數據到遠程系統。為了達到這個目的,開發者可能會不小心嘗試在Spark驅動程序中創建連接對象,然后嘗試在Spark worker中使用它來講記錄保存在RDD中。如scala中的示例。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

這是不正確的,因為它要求連接對象可以被序列化並從driver端發送到worker端。這種連接對象很難跨機器傳輸。這個錯誤可能表現為序列化錯誤(連接對象不可序列化)、初始化錯誤(連接對象需要在worker端初始化)等。正確的解決方法是在worker端創建連接對象。

然而,這可能導致另一個普遍的錯誤 —— 為每條記錄都創建一個新的連接。例如:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

典型地做法是創建一個連接對象有時間和資源的開銷。因此,為每條記錄創建和銷毀連接對象可能會產生不必要的高開銷,並且會顯著地降低系統的整體吞吐量。一個更換的解決方案是使用rdd.foreachPartition —— 創建一個單連接對象並在RDD分區使用該連接發送所有的記錄。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

這會緩解許多記錄中的連接創建開銷。

最終,通過在多個RDD/批次重用連接對象,可以進一步優化這個功能。我們可以維護一個靜態的可重用的連接對象池,因為多個批處理的RDD被推送到外部系統,從而進一步降低了開銷。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

注意在連接池中的連接應該按需延遲創建,並且如果不使用一段時間則超時。這實現了將數據最有效地發送到外部系統。

其他要求需要記住:

  • DStreams通過輸出操作延遲執行,比如RDD通過RDD actions延遲執行。具體來說,DStream輸出操作中的RDD actions會強制處理接收到的數據。因此,如果你的應用程序沒有任何輸出操作,或者有輸出操作但沒有任何RDD action在里面,如dstream.foreachRDD(),則不會執行任何操作。系統將會簡單地接收數據並丟棄它。
  • 默認的,輸出操作時一次一個執行的。而且它們按照在應用程序中定義的順序執行。

3.7 DataFrame and SQL Operations 數據框和SQL操作

你可以在streaming流數據上很容易地使用DataFrame數據框和SQL操作。你必須通過使用StreamingContext正在使用的SparkContext創建SparkSession。此外,必須這樣做才能在驅動程序故障時重新啟動。這是通過創建SparkSession的一個延遲的實例化單例實例來完成的。這在下面的實例展示。它通過使用DataFrames和SQL來修改前面的word count例子來生產單詞數量。每個RDD都可轉換為一個DataFrame,注冊為一個臨時表,然后使用SQL進行查詢。

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

完整源代碼請看 source code.

你也可以在來自不同線程(即與正在運行的StreamingContext異步)的streaming流數據定義的表上運行SQL查詢。只要確保你將StreamingContext設置為記住足夠多的streaming流數據,以便查詢可以運行。另外,不知道任何異步SQL查詢的StreamingContext將會在查詢完成之前刪除舊streaming流數據。例如,如果你想要查詢最后一批,但是查詢可能花費5分鍾才能運行,請調用streamingContext.remember(Minutes(5))(以Scala或其他語言的等效方式)。

參閱DataFrames and SQL指南去更多地了解DataFrames。

3.8 MLlib Operations MLlib操作

你可以很容易地使用MLlib提供的機器學習算法。首先,streaming流機器學習算法(例如流式線性回歸Streaming Linear Regression,Streaming KMeans等等)可以從steaming流數據中學習並在將模型應用在streaming流數據上。除此之外,對於機器學習算法更大的類,你可以離線學習一個學習模型(即使用歷史數據),然后將線上模型應用於streaming流數據。詳情參閱MLlib指南。

3.9 Caching/Persistence 緩存/持久化

類似於RDD,DStream也允許開發者在內存中持久化stream流數據。也就是說,在DStream使用persist()方法將會自動在內存中持久化DStream的每個RDD。如果在DStream中的數據將被多次計算(例如,對同一數據進行多次操作),這將非常有用。對於像reduceByWindow和reduceByKeyAndWindow這樣的基於窗口操作和像updateStateByKey這樣的基於狀態操作,這無疑問都是有用的。因此,生成基於窗口操作的DStreams是在內存中自動持久化的,沒有通過開發者調用persist()。

對於通過網絡接收數據的輸入流(如Kafka,Flume,sockets等等),默認的持久化級別設置為將數據復制到兩個節點以實現容錯。

注意,不像RDD,DStreams默認的持久化級別是在內存中保持數據序列化。這可以在Performance Tuning節點進一步討論。更多關於不同持久化級別的詳情可以在Spark Programming Guide找到。

3.10 Checkpointing 檢查點

一個streaming流應用程序必須全天運行,因此必須對於應用程序邏輯無關的故障(例如系統故障,JVM崩潰等)具有恢復能力。為了實現這個可能性,Spark Streaming需要checkpoint檢查點足夠的信息到一個容錯存儲系統,以便它可以從故障中恢復。這里有兩種檢查的數據類型。

  • Metadata checkpointing元數據檢查點 —— 將定義流式計算的信息保存到容錯存儲系統如HDFS。這用於從運行streaming流應用程序的驅動程序的節點上的故障中恢復(稍候詳細討論)。元數據包含:
    • 配置—— 用於創建streaming流應用程序的配置。
    • DStream 操作 —— 定義streaming流應用程序的DStream操作集合。
    • 沒完成的批處理 ——批處理的jobs在排隊但尚未完成。
  • Data checkpointing 數據檢查點—— 將生成RDD保存在可靠的存儲中。這在將多個批次的數據組合在一起的有狀態轉換中是必須的。在這樣的轉換中,生成的RDD依賴於之前批次的RDD,導致依賴鏈的長度隨着時間的推移而不斷增加。為了避免恢復時間的這種無限增長(與依賴鏈成比例),有狀態轉換的中間RDD被周期性地檢查為可靠存儲(如HDFS)以切斷依賴鏈。

 總而言之,metadata元數據checkpointing主要用於從驅動程序故障中恢復,而數據或RDD checkpointing對於使用有狀態轉換的基本功能是必須的。

3.10.1 When to enable Checkpointing 什么時候啟用checkpointing

必須為具有以下任何要求的應用程序啟用Checkpointing:

  • 有狀態轉換的用法 —— 如果在應用程序中使用updateStateByKey或者reduceByKeyAndWindow(帶有反轉函數),那么必須提供checkpointing目錄以允許定期RDD checkpointing。
  • 從運行應用程序的驅動程序故障中恢復 —— Metadata元數據checkpoints用於恢復進度信息。

注意沒有上述狀態轉換的簡單streaming流應用程序可以不通過啟用checkpointing運行。從驅動程序故障中恢復也是這種情況下的一部分(一些接收到但沒有處理的數據可能丟失)。這通常是可以接受的,很多人以這種方式運行Spark Streaming應用程序。

3.10.2 How to configure Checkpointing 如何配置Checkpointing

 Checkpointing可以通過在容錯,可靠的文件系統(如HDFS、S3等)中設置一個目錄啟用,checkpoint信息將被保存在該文件系統中。這是通過使用streamingContext.checkpoint(checkpointDirectory)來完成的。這將允許你使用上述提及的狀態轉換。另外,如果你想要應用程序從驅動程序的故障中恢復,你應該重寫你的streaming流應用程序以使其具有下述行為:

  • 當程序第一次啟動時,它將會創建一個新的StreamingContext,設置所有streams流,然后調用start()。
  • 當程序在失敗之后重新啟動時,它將會根據checkpoint目錄中的checkpoint數據重新創建一個StreamingContext。

這個行為通過使用StreamingContext.getOrCreate變得簡單。用法如下:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory存在,那么context上下文將會根據checkpoint數據重新創建。如果該目錄不存在(即第一次運行),那么函數functionToCreateContext將會被調用去創建新的context上下文,並設置DStreams。參閱Scala示例RecoverableNetworkWordCount。這個例子把網絡數據的單詞數量添加到一個文件中。

除了使用getOrCreate之外,你也需要確保驅動程序進程失敗后自動重新啟動。這個只能通過部署用於運行應用程序的基礎設施來完成。這個在部署節點進一步討論。

注意RDD的checkpointing導致了存儲到可靠存儲的成本。這個可能導致RDD checkpoint的批次處理時間延長。因此checkpointing的時間間隔需要小心設置。在小批量(例如1秒)的情況下,每個批次的checkpointing可能會顯著降低操作吞吐量。相反,checkpointing國語頻繁導致lineage和任務規模增長,這可能有不利影響。

對於要求RDD checkpointing的狀態轉換,默認時間間隔是至少10秒的批處理間隔的倍數。它可以通過使用dstream.checkpoint(checkpointInterval)來設置。通常情況下,一個DStream的5到10個滑動間隔的checkpoint間隔設置是一個很好的嘗試。

3.11 Accumulators, Broadcase Variables, and Checkpoints 累加器,廣播變量和Checkpoints

 累加器和廣播變量在Spark Streaming中不能從checkpoint恢復。如果你啟用checkpointing並使用累加器或者廣播變量,你將必須為累加器和廣播變量延遲創建實例化的單例實例,以便在驅動程序由於故障重新啟動后它們可以被重新實例化。下面例子可以參閱:

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

完整源代碼查看source code.

3.12 Deploying Applications 部署應用程序

這個節點討論部署一個Spark Streaming應用程序的步驟。

3.12.1 Requirements 要求

為了運行一個Spark Streaming應用程序,你需要有下述內容:

帶有集群管理器的集群 —— 這是任何Spark應用程序的普遍要求,詳情在deployment guide節點討論。

  • 打包應用程序JAR —— 你必須將你的streaming流式應用程序編譯為JAR。如果你正在使用spark-submit去啟動應用程序,那么你將不必要在JAR中提供Spark和Spark Streaming。然而,如果你的應用程序使用高級advanced sources(例如Kafka、Flume),那么你將必須在用於部署該應用程序的JAR中打包其鏈接的第三方artifact工件以及它們的依賴包。例如,一個使用KafkaUtils的應用程序必須在應用程序JAR中包含有spark-streaming-kafka-0-8_2.11及其所有依賴項。
  • 為executors配置足夠內存 —— 因為接收到的數據必須存儲在內存中,executors必須配置足夠的內存去持有接收到的數據。注意如果你執行10分鍾的窗口操作,系統必須在內存中保留至少10分鍾的數據。因此應用程序的內存要求取決於在其中使用的操作。
  • 配置checkpointing —— 如果流應用程序要求,那么必須將在Hadoop API兼容的容錯存儲(例如HDFS,S3等)中的目錄配置為checkpoint目錄。詳情參閱checkpointing節點。
  • 配置應用驅動程序的自動重啟 —— 為了從驅動程序故障中自動恢復,用於運行streaming流應用程序的部署基礎設施必須監控驅動程序進程並當它故障時重新啟動。不同集群管理有不同工具去實現它。
    • Spark Standalone Spark單例—— 在Spark單例集群中提交Spark應用驅動程序並運行(請參閱集群部署模式),即,應用驅動程序本身在worker節點中的一個上運行。此外,單例集群管理器可以被指示監控驅動程序,並且如果驅動程序由於非零退出代碼或者由於運行驅動程序的節點的故障而失敗,則重新啟動它。詳情參閱Spark Standalone指南中的集群模式和監控。
    • YARN —— Yarn 支持類似的機制來自動重啟應用程序。詳情參閱YARN文檔。
    • Mesos —— Marathon已經被用來與Mesos實現這個功能。
  • 配置預寫日志 —— 自從Spark 1.2,我們引入了預寫日志用於實現強大的容錯保證。如果啟用,所有從接收器接收到的數據都可以寫入配置好的checkpoint目錄中的預寫日志。這可以防止驅動程序恢復時的數據丟失,從而確保零數據丟失(詳情參閱容錯語義節點)。這可以通過設置配置屬性spark.streaming.receiver.writeAheadLog.enable為true來啟用。然而,這些更強的語義semantics可能是以單個接收器的接收吞吐量為代價的。這可以通過並行運行更多的接收器去提高總吞吐量來糾正。此外,建議當預寫日志啟用時,禁用Spark中接收數據的復制,因為日志已經存儲在復制存儲系統中。這可以通過設置輸入流的存儲級別為StorageLevel.MEMORY_AND_DISK_SER來完成的。當使用S3(或者任何不支持刷新的文件系統)寫預寫日志時,請記住啟用spark.streaming.driver.writeAheadLog.closeFileAfterWriter和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。詳情參閱Spark Streaming Configuration。注意當啟用I/O加密時,Spark不會加密寫入預寫日志中的數據。如果期望預寫日志中的數據加密,那么它存儲的文件系統應該支持本地加密。
  • 設置最大接收率 —— 如果集群資源不夠大,流式應用程序無法像接收數據一樣快速處理數據,接收器的接收速率可以通過以records/sec為單位設置最大速率限制來進行限制。詳情參閱配置參數接收器的spark.streaming.receiver.maxRate和Direct Kafka approach的spark.streaming.kafka.maxRatePerPartition。在Spark 1.5,我們引入了一個叫backpressure的特性,無需設置速率限制,因為Spark Streaming會自動計算速率限制,並在處理條件發生變化時動態調整速率限制。這個backpressure可以通過設置配置參數spark.streaming.backpressure.enabled為true來啟用。

3.13 Upgrading Application Code 升級應用程序代碼

 如果正在運行的Spark Streaming應用程序需要升級新的應用程序代碼,那么這里有兩種可能的機制。

升級的Spark Streaming應用程序啟動並與現有的應用程序是並行運行的。一旦新的(正在接收相同的數據與舊的一樣)已經預熱並准備好迎接黃金時段,舊的就可以被取消。注意這可以為支持將數據發送到兩個目標(即早期和已已升級的應用程序)的數據源完成。

現有的應用程序正常關閉(請參閱StreamingContext.stop(...)或JavaStreamingContext.stop(...)以獲取正常關閉選項),以確保已接收到的數據在關閉之前完全處理完畢。然后可以啟動升級的應用程序,它將會從早期應用程序中斷的同一點開始處理。注意這只能通過支持source-side源端緩存(如Kafka和Flume)的輸入源來完成,因為當前一應用程序關閉而升級的應用程序還沒啟動時,需要緩存數據。從升級前的代碼的更早checkpoint信息重新啟動是不能完成的。checkpoint信息本質上包含序列化的Scala/Java/Python對象並試圖用新的修改后的類來反序列化對象可能會導致錯誤。在這種情況下,可以使用不同的checkpoint目錄來啟動升級的應用程序,也可以刪除以前的checkpoint目錄。

 3.14 Monitoring Applications 監控應用程序

除了Spark的監控功能外,Spark Streaming還有其他特定功能。當使用StreamingContext時,Spark web UI會顯示一個額外的Streaming選項卡,它展示了關於正在運行的接收器(無論接收器是否處於活動狀態,接收到的記錄數量,接收器錯誤等等)和已完成批次(批處理次數,隊列延遲等等)的統計信息。這可以用於監控streaming流應用程序的進程。

Web UI中的以下兩個指標尤為重要:

  • 處理時間 —— 每批次數據的處理時間。
  • 調度延遲 —— 批次在隊列中等待前一批次處理完成的時間。

如果批處理時間一直超過批處理間隔或者排隊延遲時間持續增加,則表示當批次生成時系統無法快速地處理它們,並且落后。在這種情況下,考慮降低批處理時間。

Spark Streaming程序的處理也可以使用StreamingListenner接口進行監控,它允許你得到接收器狀態和處理時間。注意這是個開發者API並且將來還會改進。

4. Performance Tuning 性能調整

 從集群上的Spark Streaming應用程序中獲得最佳性能需要進行一些調整。本節點介紹可調整的一些參數和配置,以提高應用程序的性能。在高層次上,你需要考慮兩件事:

  1. 通過有效地利用集群資源來降低每批次數據處理時間。
  2. 設置正確的批處理大小,以便數據的批處理可以像它們接收時一樣快速處理(即,數據處理能跟上獲取數據)。

4.1 Reducing the Batch Processing Times 降低批處理時間

Spark中有很多優化可以使每個批次的處理時間最短。詳情已在Tuning Guide討論。本節重點介紹一些最重要的內容。

4.1.1 Level of Parallelism in Data Receiving 數據接收中的並行性水平

通過網絡接收數據(如Kafka,Flume,socket等等)要求將數據反序列化並存儲在Spark中。如果數據接收成為系統中的瓶頸,那么考慮並行化數據接收。注意每個輸入DStream創建一個簡單接收器(在一台worker機器上運行)去接收單一數據流。通過創建多個輸入DStreams並配置它們以接收來自source源的不同分區的數據流來完成接收多個數據流。例如,接收兩個topics數據的單一Kafka輸入DStream可以被分割成兩個Kafka輸入流,每個只接收一個topic。這會運行兩個接收器,它們允許數據並行接收,因而提高整體的吞吐量。這些多個DStreams可以結合在一起創建一個DStream。那么應用於單一輸入DStream的轉換也可以應用於統一流上。步驟如下:

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

另一個應該考慮的參數是接收器的塊間隔,它是通過配置參數spark.streaming.blockInterval來決定的。對於大多數接收器,接收的數據在存儲到Spark內存之前被合並成數據塊。每個批處理中塊的數量決定了將要用於在類似map的轉換處理接收到的數據的任務數量。每個接收器每個批次任務的數量將近似(批間隔/塊間隔)。例如,200ms的塊間隔將每2秒批次創建10個任務。如果任務的數量太少(即少於每台機器內核數量),那么它將是效率低的,因為全部可用的內核沒有用於處理數據。為給定的批次間隔提高任務的數量,降低塊間隔。但是,推薦塊間隔最小值大概為50ms,低於該值時,任務啟動開銷可能會成為問題。用多個輸入流/接收器接收數據的另一種方法是顯式地重新分配輸入數據流(使用inputStream.repartition(<分區數>))。在進一步處理之前,這將在集群中指定的數量的機器上分配接收到的批量數據。

4.1.2 Level of Parallelism in Data Processing 數據處理中的並行水平

如果在計算的任何階段使用使用的並行任務不夠高,則集群資源可能未被充分利用。例如,對於分配的reduce操作,如reduceByKey和reduceByKeyAndWindow,默認並行任務的數量是通過spark.default.parallelism配置屬性控制的。你可以將並行級別作為參數傳遞(參閱PairDStreamFunctions文檔),或設置spark.default.parallelism配置屬性以更改默認值。

4.1.3 Data Serialization 數據序列化

數據序列化的開銷可以通過調整序列化格式來減少。在streaming流的這種情況下,有兩種數據類型需要被序列化。

  • 輸入數據:默認的,通過接收器接收到的輸入數據通過StorageLevel.MEMORY_AND_DISK_SER_2存儲在executors的內存中。即,將數據序列化為字節以降低GC開銷,並復制用於容忍執行程序失敗。此外,數據首先保存在內存中,並且只有當內存不足以保存流式計算所需的所有輸入數據時才會溢出到磁盤。這些序列化有明顯的開銷 —— 接收器必須反序列化接收到的數據,並使用Spark的序列化格式重新序列化它。 
  • Persisted RDDs generated by Streaming Operations: 通過流式計算生成的RDDs可能在內存中持久化。例如,窗口操作在內存中持久化數據因為它們會被處理多次。然而,與StorageLevel.MEMORY_ONLY的Spark Core默認值不同,通過流式計算生成的持久化RDDs默認通過StroageLevel.MEMORY_ONLY_SER(即序列化)持久化以最大程度地降低GC開銷。

在這兩種情況下,使用Kryo序列化可以降低CPU和內存的開銷。詳情參閱Spark Tuning Guide。對於Kryo,考慮注冊自定義類,並禁用對象引用跟蹤(參閱配置指南中的與Kryo相關的配置)。在特定情況下,需要為流應用程序保留的數據量不大,可以將數據(兩種類型)作為反序列化的對象保持,而不會導致過多的GC開銷。例如,如果你正在使用幾秒的批處理間隔並且沒有窗口操作,那么你可以嘗試通過顯式設置存儲級別來禁用持久數據中的序列化。這可以減少由於序列化造成的CPU開銷,可能在不增加太多GC開銷的情況下提高性能。

4.1.4 Task Launching Overheads 任務啟動開銷

如果每秒任務啟動的數量很高的話(比如說每秒50或更多),那么發送任務到集群上的開銷可能就很大並且很難達到亞秒級的等待時間。開銷可以通過下述改變來降低:

  • Execution mode:在單例模式或者coarse-grained Mesos模式上運行Spark造成任務啟動時間比fine-grained Mesos模式快。詳情參閱Running on Mesos guide。

這些更改可能會使批處理時間減少100毫秒,從而運行亞秒級批處理大小可行。

 4.2 Setting the Right Batch Interval 設置正確的批處理間隔

對於運行在集群上Spark Streaming應用程序要穩定,系統應該有能力像接收數據一樣快速地處理數據。換一句話說,批量數據應該像它們生成一樣快速地處理。通過監控streaming web UI中的處理時間,其中批處理時間應該小於批處理間隔的處理時間,可以知道應用程序是否正確。

根據流式計算的性質,使用的批處理間隔可能會對一組固定的集群資源上的應用程序可以維持的數據速率產生重大的影響。例如,我們考慮前面的WordCountNetwork例子。對於特定的數據速率,系統可以保持每2秒(即2秒的間隔時間)匯報一次單詞字數,但不是每500毫秒。因此批處理間隔需要設置為生成環境可以維持的期待中的數據速率。

一個好的途徑去為你的應用程序找出正確的批處理大小是用一個保守的批處理間隔(比如5-10秒)和低數據速率進行測試。為了驗證系統是否有能力維持數據速率,你可以校驗通過每個處理過的批次所經歷的端到端end-to-end延遲的值(要么在Spark驅動程序log4j日志中查找“總延遲”,要么使用StreamingListener接口)。如果延遲保持與批量大小相當,那么系統是穩定的。否則,如果延遲是持續增加,這意味着系統不能跟上,因而它是不穩定的。一旦你有了穩定配置的主意,你可以試圖提高數據速率或者降低批量大小。注意,只要延遲降低到低值(即,小於批量大小),由於臨時數據速率增加而引起的延遲的暫時增加就可以是正常的。

4.3 Memory Tuning 內存調整

詳情參閱Tuning Guide中關於調整Spark應用程序的內存使用情況和GC行為。強烈要求你參閱。在本節點,我們在Spark Streaming應用程序討論一些特定的調整參數。

Spark Streaming應用程序要求的集群內存總量嚴重決定於使用的轉換類型。例如,如果你想要在最后10分鍾的數據中使用窗口操作,那么你的集群應該有足夠的內存來存儲10分鍾內存中的數據。或者如果你想要將updateStateByKey用於大量的keys,那么必要的內存將會很高。相反,如果你想要執行簡單的map-filter-store操作,那么需要的內存很低。

普遍來說,因為通過接收器接收到的數據是使用StorageLevel.MEMORY_AND_DISK_SER_2來存儲,在內存中不適合的數據將會溢出到磁盤。這將會降低streaming流應用程序的性能,因此建議當你的streaming流應用程序需要時提供足夠的內存。最好嘗試一下小規模的內存使用情況並做相應的估計。

內存調整的另一種方法是垃圾回收。對於一個要求低延遲的streaming流應用程序,通過JVM垃圾回收造成大量的暫停,這是不希望的。

這里有幾個參數可以幫助你調整內存使用情況和GC開銷:

  • Persistence Level of DStreams : 在Data Serialization節點提及,輸入數據和RDDs默認是作為序列化字節保存的。與反序列化持久化保存相比,這降低內存使用和GC開銷。啟用Kryo序列化進一步降低序列化大小和內存使用。進一步降低內存的使用可以通過壓縮來實現(參閱Spark配置spark.rdd.compress),代價是CPU時間。
  • Clearing old data: 默認,通過DStream轉換生成的所有的輸入數據和持久化的RDD自動被清除。Spark Streaming決定什么時候清除基於使用的轉換的數據。例如,如果你正在使用10分鍾的窗口操作,那么Spark Streaming將會存儲大約最后10分鍾的數據,並且積極清除舊數據。通過設置streamingContext.remember,數據可以保留一個更長的時間(例如交互式查詢舊數據)。
  • CMS Garbage Collector: 強烈建議使用並發的mark-and-sweep GC,以保持GC相關的暫停時間始終低。即使並發GC對於降低系統整體處理吞吐量已知,但仍然推薦使用並行GC來實現更一致的批處理時間。確保你在driver端(在spark-submit使用--driver-java-options)和executors端(使用spark配置spark.executor.extraJavaOptions)設置CMS GC。
  • Other tips:為了進一步降低GC開銷,這里有更多的技巧可以嘗試。
    • 使用OFF_HEAP存儲級別來持久化RDDs。詳情參閱Spark Programming Guide。
    • 通過更小的heap堆大小使用更多的executors。

 4.3.1 Important points to remember:

  •  DStream與單個接收器相關聯。為了達到並行讀取多個接收器,即創建多個DStream。在一個executor中運行一個接收器。它占用一個內核。確保在預訂接收器后有足夠的內核進行處理,即spark.cores.max應考慮接收器花費的耗時。接收器以循環方式分配給executors。
  • 當從流來源接收到的數據,接收器創建數據塊。每隔blockInterval毫秒生成一個新的數據塊。在batchInterval期間創建N個數據塊,N = batchInterval/blockInterval。這些塊被當前executor的塊管理器分配給其他executors的塊管理器。這之后,運行在driver上的網絡輸入跟蹤器將被通知有關塊位置以供進一步處理。
  • 在driver上為在batchInterval期間創建的塊創建一個RDD。在batchInterval期間生成的塊是RDD的分區。在spark中每個分區是一個任務。blockInterval==batchInterval意味着創建一個單獨的分區,並且可能在本地處理。
  • 除非是非本地調度,否則在塊中的map任務在executors中被處理(接收到的塊和復制塊的另一塊),而不管塊的間隔是多少。更大的blockInterval意味着更大的塊。spark.locality.wait的高值增加了在本地節點上處理塊的機會。在這兩個參數之間找到一個平衡去確保本地處理更大的塊。
  • 替代依賴於batchInterval和blockInterval,你可以通過調用inputDstream.repartition(n)來定義分區的數量。這隨機地重新刷新RDD中的數據以創建n個分區。是的,為了更大的並行性。雖然是以shuffle為代價的。driver的工作調度程序將RDD的處理作為一項工作。在特定的時間點,只有一個工作是活躍的。所以,如果一個工作正在執行,另一個工作正在排隊。
  •  如果你有兩個dstreams,將會有兩個RDD形成,並且將會創建兩個作業,這兩個作業將會被一個接一個地調度。為了避免這種情況,你可以結合兩個dstreams。這將確保dstreams的兩個RDD形成一個單一的unionRDD。然后這個unionRDD被認為是一個單一的作業。但是RDD的分區不受影響。
  • 如果批處理時間超過batchInterval,那么顯然接收器的內存將會開始填滿,最終將會在拋出異常(最可能是BlockNotFoundException)。目前沒有辦法暫停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。

5. Fault-tolerance Semantics 容錯語義

在這節點,我們將會討論Spark Streaming應用程序在發生故障時的行為。

5.1 Background 背景

為了明白Spark Streaming提供的語義,讓我們記住基本Spark RDD的容錯語義。

1. RDD是一個不可改變的,確定性可以重新計算的分布式數據集。每個RDD都記住在容錯輸入數據集上使用的確定性操作的lineage來創建它。

2. 如果RDD任何的分區由於workder節點故障而丟失,那么該分區可以通過使用lineage操作從原來的容錯數據集重新計算。

3. 假設所有RDD轉換都是確定性的,最終轉換的RDD中的數據總是相同的,而不管Spark集群中的故障如何。

在容錯文件系統(如HDFS或者S3)中Spark在數據上的操作。因此,從容錯數據生成的全部RDD也是容錯的。然而,對於Spark Streaming,情況並非如此,因為在大多數情況下通過網絡接收數據(除了使用fileStream時)。為了實現所有生成的RDD相同的容錯屬性,接收到的數據在集群中的worker節點里的多個Spark executors之間復制(默認的復制因子為2)。這導致系統中有兩種數據在發生故障時需要恢復:

1.Data received and replicated 數據接收和復制 —— 這些數據在單個worker節點的故障中仍然存在,因為它副本存在於其他節點之一上。

2.Data received but buffered for replication 數據接收但緩沖復制 —— 因為這不是復制的,唯一的方法來恢復這個數據是從來源source再次得到它。

進一步,這里有兩種我們應該關注的故障:

1.Worker節點的故障 —— 運行executors的任何worker節點可以失敗,並且這些節點的所有內存數據將會丟失。如果任何接收器運行在故障的節點上,那么它們緩沖的數據將會丟失。

2.Driver節點的故障 —— 如果運行Spark Streaming應用程序的driver節點失敗,那么顯然SparkContext會丟失,並且所有具有內存數據的executors都會丟失。

帶有基本的知識,讓我們了解Spark Streaming的容錯語義。

5.2 Definitions 定義

streamings流系統的語義經常是通過系統處理每條記錄的次數來獲取的。系統可以在所有可能的操作條件下提供三種類型的保證(盡管失敗等)。

1. 最多一次:每條記錄將會被處理一次或者根本不處理。

2. 至少一次:每條記錄將會被處理一次或多次。這比最多一次強壯,因為它確保沒有數據會丟失。但是可能有重復。

3. 正好一次:每條記錄將會正好被處理一次 —— 沒有數據會丟失也沒有數據會被執行多次。這顯然是三個中最強壯的保證。

5.3 Basic Semantics 基本語義

在任何流處理系統中,廣義來講,處理數據有三個步驟。

1.Receiving the data接收數據:使用接收器或者其他從來源接收數據。

2.Transforming the data轉換數據:使用DStream和RDD轉換變換接收到的數據。

3.Pushing out the data推送數據:最終轉換的數據被推送到外部系統,如文件系統、數據庫、儀表盤等等。

如果流應用程序必須實現端到端正好一次的保證,那么每一步驟必須提供正好一次的保證。也就是說,每條記錄必須實現只能被接收一次,只能被轉換一次,並只能被推送到下流系統一次。

在Spark Streaming上下文中這些步驟的語義:

1. Receiving the data:不同輸入來源提供不同的保證。詳情參閱下一小節。

2. Transforming the data:由於RDD提供的保證,已經接收到的所有數據將只會被處理一次。即使出現故障,只要接收到的輸入數據是可訪問的,那么最終轉換的RDD將總會有相同的內容。

3. Pusing out the data:輸出操作默認確保至少一次語義,因為它依賴於輸出操作的類型和下流系統的語義(支持事務與否)。

5.4 Semantics of Received Data 接收數據的語義

不同輸入來源提供不同的保證,范圍從至少一次到正好一次。閱讀更多的細節。

5.4.1 With Files 

如果所有的輸入數據已經在容錯文件系統如HDFS出現,Spark Streaming也可以從任何故障中恢復並處理所有數據。這給予正好一次語義,意味着所有數據將只能被處理一次無論發生什么故障。

5.4.2 With Receiver-based Sources 

對應基於接收器的輸入來源,容錯語義取決於故障情況和接收器的類型。正如我們前面所討論的,有兩種類型的接收器:

1. Reliable Receiver 可靠的接收器 —— 這些接收器只有在確保接收到的數據已被復制之后才確認可靠的來源。如果這個接收器失敗,來源將不會接收到緩沖數據(未復制)的確認。因此,如果接收器重新啟用,來源將會再次發送數據,將不會有數據會由於故障而丟失。

2. Unreliable Receiver 不可靠的接收器 —— 這樣的接收器不會發送確認,因此當由於worker或者driver發生故障時可能會丟失數據。

取決於使用什么類型的接收器,我們實現下面的語義。如果worker節點故障,那么可靠的接收器就不會有數據丟失。對於不可靠的接收器,接收到的數據但沒有復制可能會丟失。如果driver節點故障,那么除了這些損失之外,所有在內存中接收和復制的過去數據都將丟失。這將會影響有狀態轉換的結果。

為了避免過去接收到的數據丟失,Spark1.2引入了預寫日志,它在容錯存儲中保存接收到的數據。通過啟用預寫日志和可靠的接收器,不會有數據丟失。就語義而言,它至少提供一次保證。

下列表格總結了失敗的語義:

Deployment Scenario Worker Failure Driver Failure
Spark 1.1 or earlier, OR
Spark 1.2 or later without write ahead logs
不可靠的接收器會丟失緩存的數據
可靠的接收器零數據丟失
至少一次語義
不可靠的接收器丟失緩存數據
所有接收器會丟失過去的數據
沒有定義語義
Spark 1.2 or later with write ahead logs 可靠的接收器零數據丟失
至少一次語義
可靠的接收器和文件零數據丟失
至少一次語義

5.4.3 With Kafka Direct API

在Spark1.3,我們已經引入了一個新的Kafka Direct API,這可以確保所有Kafka數據正好被Spark Streaming一次接收到。除此之外,如果你實現正好一次的輸出操作,你可實現端到端的正好一次保證。 Kafka Integration Guide.進一步討論了這種方法。

5.4.4 Semantics of output operations 輸出操作語義

輸出操作(如foreachRDD)有至少一次語義,也就是說,轉換的數據可能在worker故障的情況下多次地被寫入外部實體。當使用saveAs***Files操作將文件保存到文件系統是可以接受的(因為文件只會被相同的數據覆蓋),可能需要額外的努力才能實現一次語義。這里有兩種方法。

  • Idempotent updates 冪等更新:多個嘗試總是寫入相同的數據。例如,saveAs***Files總是將相同的數據寫入到生成的文件。
  • Transactional updates 事務更新:所有更新都是以事務方式進行的,因此更新只能以原子方式進行一次。一個途徑執行它是這樣的。
    • 使用批處理時間(在foreachRDD中可用)和RDD分區索引創建一個標識符。該標識符唯一標識流應用程序中的blob數據。
    • 使用這個標識符事務性地(即只是一次,原子地)用這個blob更新外部系統。也就是說,如果標識符還沒提交,則以原子方式提交提交分區數據和標識符。否則,如果這已經提交,則跳過更新。
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}

6 Where to Go from Here


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM