正文
一,簡介
1.1 概述
是一個基於Spark Core之上的實時計算框架,可以從很多數據源消費數據並對數據進行處理.Spark Streaming 是Spark核心API的一個擴展,可以實現高吞吐量的、具備容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從數據源獲取數據之后,可以使用諸如map、reduce、join和window等高級函數進行復雜算法的處理。最后還可以將處理結果存儲到文件系統,數據庫和現場儀表盤。在“One Stack rule them all”的基礎上,還可以使用Spark的其他子框架,如集群學習、圖計算等,對流數據進行處理。
Spark Streaming處理的數據流圖:
Spark的各個子框架,都是基於核心Spark的,Spark Streaming在內部的處理機制是,接收實時流的數據,並根據一定的時間間隔拆分成一批批的數據,然后通過Spark Engine處理這些批數據,最終得到處理后的一批批結果數據。
對應的批數據,在Spark內核對應一個RDD實例,因此,對應流數據的DStream可以看成是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分成一批一批后,通過一個先進先出的隊列,然后 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,然后進行處理,這是一個典型的生產者消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。
1.2 術語定義
離散流(discretized stream)或DStream:
本質上就是一系列連續的RDD,DStream其實就是對RDD的封裝DStream可以任務是一個RDD的工廠,該DStream里面生產都是相同業務邏輯的RDD,只不過是RDD里面要讀取數據的不相同
他是sparkStreaming中的一個最基本的抽象,代表了一下列連續的數據流,本質上是一系列連續的RDD,你對DStream進行操作,就是對RDD進行操作
DStream每隔一段時間生成一個RDD,你對DStream進行操作,本質上是對里面的對應時間的RDD進行操作
DSteam和DStream之間存在依賴關系,在一個固定的時間點,對個存在依賴關系的DSrteam對應的RDD也存在依賴關系,
每個一個固定的時間,其實生產了一個小的DAG,周期性的將生成的小DAG提交到集群中運行
DStream圖例:
批數據(batch data):這是化整為零的第一步,將實時流數據以時間片為單位進行分批,將流處理轉化為時間片數據的批處理。隨着持續時間的推移,這些處理結果就形成了對應的結果數據流了。
時間片或批處理時間間隔( batch interval):這是人為地對流數據進行定量的標准,以時間片作為我們拆分流數據的依據。一個時間片的數據對應一個RDD實例。
窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,
滑動時間間隔:前一個窗口到后一個窗口所經過的時間長度。必須是批處理時間間隔的倍數
Input DStream :一個input DStream是一個特殊的DStream,將Spark Streaming連接到一個外部數據源來讀取數據。
1.3 Storm和Spark Streaming比較
處理模型以及延遲
雖然兩框架都提供了可擴展性(scalability)和可容錯性(fault tolerance),但是它們的處理模型從根本上說是不一樣的。Storm可以實現亞秒級時延的處理,而每次只處理一條event,而Spark Streaming可以在一個短暫的時間窗口里面處理多條(batches)Event。所以說Storm可以實現亞秒級時延的處理,而Spark Streaming則有一定的時延。
容錯和數據保證
然而兩者的代價都是容錯時候的數據保證,Spark Streaming的容錯為有狀態的計算提供了更好的支持。在Storm中,每條記錄在系統的移動過程中都需要被標記跟蹤,所以Storm只能保證每條記錄最少被處理一次,但是允許從錯誤狀態恢復時被處理多次。這就意味着可變更的狀態可能被更新兩次從而導致結果不正確。
任一方面,Spark Streaming僅僅需要在批處理級別對記錄進行追蹤,所以他能保證每個批處理記錄僅僅被處理一次,即使是node節點掛掉。雖然說Storm的 Trident library可以保證一條記錄被處理一次,但是它依賴於事務更新狀態,而這個過程是很慢的,並且需要由用戶去實現。
實現和編程API
Storm主要是由Clojure語言實現,Spark Streaming是由Scala實現。如果你想看看這兩個框架是如何實現的或者你想自定義一些東西你就得記住這一點。Storm是由BackType和Twitter開發,而Spark Streaming是在UC Berkeley開發的。
Storm提供了Java API,同時也支持其他語言的API。 Spark Streaming支持Scala和Java語言(其實也支持Python)。
批處理框架集成
Spark Streaming的一個很棒的特性就是它是在Spark框架上運行的。這樣你就可以想使用其他批處理代碼一樣來寫Spark Streaming程序,或者是在Spark中交互查詢。這就減少了單獨編寫流批量處理程序和歷史數據處理程序。
生產支持
Storm已經出現好多年了,而且自從2011年開始就在Twitter內部生產環境中使用,還有其他一些公司。而Spark Streaming是一個新的項目,並且在2013年僅僅被Sharethrough使用(據作者了解)。
Storm是 Hortonworks Hadoop數據平台中流處理的解決方案,而Spark Streaming出現在 MapR的分布式平台和Cloudera的企業數據平台中。除此之外,Databricks是為Spark提供技術支持的公司,包括了Spark Streaming。
雖然說兩者都可以在各自的集群框架中運行,但是Storm可以在Mesos上運行, 而Spark Streaming可以在YARN和Mesos上運行。
二,運行原理
2.1 Streaming架構
SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,可以對多種數據源(如Kafka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等復雜操作,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。
計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。
2.2 容錯,持久化和性能調優
2.2.1 容錯
對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分布式可重算的數據集,其記錄着確定性的操作繼承關系(lineage),所以只要輸入數據是可容錯的,那么任意一個RDD的分區(Partition)出錯或不可用,都是可以利用原始輸入數據通過轉換操作而重新算出的。
對於Spark Streaming來說,其RDD的傳承關系如下圖所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最后一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連接的,由於Spark Streaming輸入數據可以來自於磁盤,例如HDFS(多份拷貝)或是來自於網絡的數據流(Spark Streaming會將網絡輸入數據的每一個數據流拷貝兩份到其他的機器)都能保證容錯性,所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。
如下圖:Spark Streaming中RDD的lineage關系圖
2.2.2 持久化
與RDD一樣,DStream同樣也能通過persist()方法將數據流存放在內存中,默認的持久化方式是MEMORY_ONLY_SER,也就是在內存中存放數據同時序列化的方式,這樣做的好處是遇到需要多次迭代計算的程序時,速度優勢十分的明顯。而對於一些基於窗口的操作,如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操作,如updateStateBykey,其默認的持久化策略就是保存在內存中。
對於來自網絡的數據源(Kafka、Flume、sockets等),默認的持久化策略是將數據保存在兩台機器上,這也是為了容錯性而設計的。
另外,對於窗口和有狀態的操作必須checkpoint,通過StreamingContext的checkpoint來指定目錄,通過 Dtream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。
2.2.3 性能調優
1. 優化運行時間
增加並行度 確保使用整個集群的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操作,增加其並行度以確保更為充分地使用集群資源;
減少數據序列化,反序列化的負擔 Spark Streaming默認將接受到的數據序列化后存儲,以減少內存的使用。但是序列化和反序列話需要更多的CPU時間,因此更加高效的序列化方式(Kryo)和自定義的系列化接口可以更高效地使用CPU;
設置合理的batch duration(批處理時間間) 在Spark Streaming中,Job之間有可能存在依賴關系,后面的Job必須確保前面的作業執行結束后才能提交。若前面的Job執行的時間超出了批處理時間間隔,那么后面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成后續Job的阻塞。因此設置一個合理的批處理間隔以確保作業能夠在這個批處理間隔內結束時必須的;
減少因任務提交和分發所帶來的負擔 通常情況下,Akka框架能夠高效地確保任務及時分發,但是當批處理間隔非常小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone和Coarse-grained Mesos模式通常會比使用Fine-grained Mesos模式有更小的延遲。
2. 優化內存使用
控制batch size(批處理間隔內的數據量) Spark Streaming會把批處理間隔內接收到的所有數據存放在Spark內部的可用內存區域中,因此必須確保當前節點Spark的可用內存中少能容納這個批處理時間間隔內的所有數據,否則必須增加新的資源以提高集群的處理能力;
及時清理不再使用的數據 前面講到Spark Streaming會將接受的數據全部存儲到內部可用內存區域中,因此對於處理過的不再需要的數據應及時清理,以確保Spark Streaming有富余的可用內存空間。通過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,這個參數需要小心設置以免后續操作中所需要的數據被超時錯誤處理;
觀察及適當調整GC策略 GC會影響Job的正常運行,可能延長Job的執行時間,引起一系列不可預料的問題。觀察GC的運行情況,采用不同的GC策略以進一步減小內存回收對Job運行的影響。
三,編程模型
3.1 如何使用Spark Streaming
DStream(Discretized Stream)作為Spark Streaming的基礎抽象,它代表持續性的數據流。這些數據流既可以通過外部輸入源賴獲取,也可以通過現有的Dstream的transformation操作來獲得。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每個RDD都包含了自己特定時間間隔內的數據流。如圖7-3所示。
DStream中在時間軸下生成離散的RDD序列:
對DStream中數據的各種操作也是映射到內部的RDD上來進行的,如圖7-4所示,對Dtream的操作可以通過RDD的transformation生成新的DStream。這里的執行引擎是Spark。
作為構建於Spark之上的應用框架,Spark Streaming承襲了Spark的編程風格,對於已經了解Spark的用戶來說能夠快速地上手。接下來以Spark Streaming官方提供的WordCount代碼為例來介紹Spark Streaming的使用方式。
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate
1.創建StreamingContext對象 同Spark初始化需要創建SparkContext對象一樣,使用Spark Streaming就需要創建StreamingContext對象。創建StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱(如NetworkWordCount)。需要注意的是參數Seconds(1),Spark Streaming需要指定處理數據的時間間隔,如上例所示的1s,那么Spark Streaming會以1s為時間窗口進行數據處理。此參數需要根據用戶的需求和集群的處理能力進行適當的設置;
2.創建InputDStream如同Storm的Spout,Spark Streaming需要指明數據源。如上例所示的socketTextStream,Spark Streaming以socket連接作為數據源讀取數據。當然Spark Streaming支持多種不同的數據源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等數據源;
3.操作DStream對於從數據源得到的DStream,用戶可以在其基礎上進行各種操作,如上例所示的操作就是一個典型的WordCount執行流程:對於當前時間窗口內從數據源得到的數據首先進行分割,然后利用Map和ReduceByKey方法進行計算,當然最后還有使用print()方法輸出結果;
4.啟動Spark Streaming之前所作的所有步驟只是創建了執行流程,程序沒有真正連接上數據源,也沒有對數據進行任何操作,只是設定好了所有的執行計划,當ssc.start()啟動后程序才真正進行所有預期的操作。
至此對於Spark Streaming的如何使用有了一個大概的印象,在后面的章節我們會通過源代碼深入探究一下Spark Streaming的執行流程。
3.2 DStream的輸入源
在Spark Streaming中所有的操作都是基於流的,而輸入源是這一系列操作的起點。輸入 DStreams 和 DStreams 接收的流都代表輸入數據流的來源,在Spark Streaming 提供兩種內置數據流來源:
基礎來源 在 StreamingContext API 中直接可用的來源。例如:文件系統、Socket(套接字)連接和 Akka actors;
高級來源 如 Kafka、Flume、Kinesis、Twitter 等,可以通過額外的實用工具類創建。
3.2.1 基礎來源
在前面分析怎樣使用Spark Streaming的例子中我們已看到ssc.socketTextStream()方法,可以通過 TCP 套接字連接,從從文本數據中創建了一個 DStream。除了套接字,StreamingContext 的API還提供了方法從文件和 Akka actors 中創建 DStreams作為輸入源。
Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法可以從任何文件系統(如:HDFS、S3、NFS 等)的文件中讀取數據,然后創建一個DStream。Spark Streaming 監控 dataDirectory 目錄和在該目錄下任何文件被創建處理(不支持在嵌套目錄下寫文件)。需要注意的是:讀取的必須是具有相同的數據格式的文件;創建的文件必須在dataDirectory 目錄下,並通過自動移動或重命名成數據目錄;文件一旦移動就不能被改變,如果文件被不斷追加,新的數據將不會被閱讀。對於簡單的文本文,可以使用一個簡單的方法streamingContext.textFileStream(dataDirectory)來讀取數據。
Spark Streaming也可以基於自定義 Actors 的流創建DStream ,通過 Akka actors 接受數據流,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用 streamingContext.queueStream(queueOfRDDs)方法可以創建基於 RDD 隊列的DStream,每個RDD 隊列將被視為 DStream 中一塊數據流進行加工處理。
3.2.2 高級來源
這一類的來源需要外部 non-Spark 庫的接口,其中一些有復雜的依賴關系(如 Kafka、Flume)。因此通過這些來源創建 DStreams 需要明確其依賴。例如,如果想創建一個使用Twitter tweets 的數據的DStream 流,必須按以下步驟來做:
1)在 SBT 或 Maven工程里添加 spark-streaming-twitter_2.10 依賴。
2)開發:導入 TwitterUtils 包,通過 TwitterUtils.createStream 方法創建一個DStream。
3)部署:添加所有依賴的 jar 包(包括依賴的spark-streaming-twitter_2.10 及其依賴),然后部署應用程序。
需要注意的是,這些高級的來源一般在Spark Shell中不可用,因此基於這些高級來源的應用不能在Spark Shell中進行測試。如果你必須在Spark shell中使用它們,你需要下載相應的Maven工程的Jar依賴並添加到類路徑中。
其中一些高級來源如下:
Twitter Spark Streaming的TwitterUtils工具類使用Twitter4j,Twitter4J 庫支持通過任何方法提供身份驗證信息,你可以得到公眾的流,或得到基於關鍵詞過濾流。
Flume Spark Streaming可以從Flume中接受數據。
Kafka Spark Streaming可以從Kafka中接受數據。
Kinesis Spark Streaming可以從Kinesis中接受數據。
需要重申的一點是在開始編寫自己的 SparkStreaming 程序之前,一定要將高級來源依賴的Jar添加到SBT 或 Maven 項目相應的artifact中。常見的輸入源和其對應的Jar包如下圖所示。
另外,輸入DStream也可以創建自定義的數據源,需要做的就是實現一個用戶定義的接收器。
3.3 DStream的操作
與RDD類似,DStream也提供了自己的一系列操作方法,這些操作可以分成三類:普通的轉換操作、窗口轉換操作和輸出操作。
3.3.1 普通的轉換操作
普通的轉換操作如下表所示:
在上面列出的這些操作中,transform()方法和updateStateByKey()方法值得我們深入的探討一下:
transform(func)操作
該transform操作(轉換操作)連同其其類似的 transformWith操作允許DStream 上應用任意RDD-to-RDD函數。它可以被應用於未在 DStream API 中暴露任何的RDD操作。例如,在每批次的數據流與另一數據集的連接功能不直接暴露在DStream API 中,但可以輕松地使用transform操作來做到這一點,這使得DStream的功能非常強大。例如,你可以通過連接預先計算的垃圾郵件信息的輸入數據流(可能也有Spark生成的),然后基於此做實時數據清理的篩選,如下面官方提供的偽代碼所示。事實上,也可以在transform方法中使用機器學習和圖形計算的算法。
updateStateByKey操作
該 updateStateByKey 操作可以讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :
(1) 定義狀態 - 狀態可以是任意的數據類型。
(2) 定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。
讓我們用一個例子來說明,假設你要進行文本數據流中單詞計數。在這里,正在運行的計數是狀態而且它是一個整數。我們定義了更新功能如下:
3.3.2 窗口轉換操作
Spark Streaming 還提供了窗口的計算,它允許你通過滑動窗口對數據進行轉換,窗口轉換操作如下:
轉換 |
描述 |
window(windowLength, slideInterval) |
返回一個基於源DStream的窗口批次計算后得到新的DStream。 |
countByWindow(windowLength,slideInterval) |
返回基於滑動窗口的DStream中的元素的數量。 |
reduceByWindow(func, windowLength,slideInterval) |
基於滑動窗口對源DStream中的元素進行聚合操作,得到一個新的DStream。 |
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) |
基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操作,得到一個新的DStream。 |
reduceByKeyAndWindow(func,invFunc,windowLength, slideInterval, [numTasks]) |
一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合並移去最早的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那么我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法可以復用中間三秒的統計量,提高統計的效率。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) |
基於滑動窗口計算源DStream中每個RDD內每個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue一樣,reduce任務的數量可以通過一個可選參數進行配置。 |
在Spark Streaming中,數據處理是按批進行的,而數據采集是逐條進行的,因此在Spark Streaming中會先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把采集到的數據匯總起來成為一批數據交給系統去處理。
對於窗口操作而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續時間,在窗口操作中,只有窗口的長度滿足了才會觸發批數據的處理。除了窗口的長度,窗口操作還有另一個重要的參數就是滑動間隔(slide duration),它指的是經過多長時間窗口滑動一次形成新的窗口,滑動窗口默認情況下和批次間隔的相同,而窗口間隔一般設置的要比它們兩個大。在這里必須注意的一點是滑動間隔和窗口間隔的大小一定得設置為批處理間隔的整數倍。
如批處理間隔示意圖所示,批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。對於初始的窗口time 1-time 3,只有窗口間隔滿足了才觸發數據的處理。這里需要注意的一點是,初始的窗口有可能流入的數據沒有撐滿,但是隨着時間的推進,窗口最終會被撐滿。當每個2個時間單位,窗口滑動一次后,會有新的數據流入窗口,這時窗口會移去最早的兩個時間單位的數據,而與最新的兩個時間單位的數據進行匯總形成新的窗口(time3-time5)。
對於窗口操作,批處理間隔、窗口間隔和滑動間隔是非常重要的三個時間概念,是理解窗口操作的關鍵所在。
3.3.3 輸出操作
Spark Streaming允許DStream的數據被輸出到外部系統,如數據庫或文件系統。由於輸出操作實際上使transformation操作后的數據可以通過外部系統被使用,同時輸出操作觸發所有DStream的transformation操作的實際執行(類似於RDD操作)。以下表列出了目前主要的輸出操作:
轉換 |
描述 |
print() |
在Driver中打印出DStream中數據的前10個元素。 |
saveAsTextFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存為文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) |
將DStream中的內容按對象序列化並且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存為Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) |
最基本的輸出操作,將func函數應用於DStream中的RDD上,這個操作會輸出數據到外部系統,比如保存RDD到文件或者網絡數據庫等。需要注意的是func函數是在運行該streaming應用的Driver進程里執行的。 |
dstream.foreachRDD是一個非常強大的輸出操作,它允將許數據輸出到外部系統。但是 ,如何正確高效地使用這個操作是很重要的,下面展示了如何去避免一些常見的錯誤。
通常將數據寫入到外部系統需要創建一個連接對象(如 TCP連接到遠程服務器),並用它來發送數據到遠程系統。出於這個目的,開發者可能在不經意間在Spark driver端創建了連接對象,並嘗試使用它保存RDD中的記錄到Spark worker上,如下面代碼:
這是不正確的,這需要連接對象進行序列化並從Driver端發送到Worker上。連接對象很少在不同機器間進行這種操作,此錯誤可能表現為序列化錯誤(連接對不可序列化),初始化錯誤(連接對象在需要在Worker 上進行需要初始化) 等等,正確的解決辦法是在 worker上創建的連接對象。
通常情況下,創建一個連接對象有時間和資源開銷。因此,創建和銷毀的每條記錄的連接對象可能招致不必要的資源開銷,並顯著降低系統整體的吞吐量 。一個更好的解決方案是使用rdd.foreachPartition方法創建一個單獨的連接對象,然后使用該連接對象輸出的所有RDD分區中的數據到外部系統。
這緩解了創建多條記錄連接的開銷。最后,還可以進一步通過在多個RDDs/ batches上重用連接對象進行優化。一個保持連接對象的靜態池可以重用在多個批處理的RDD上將其輸出到外部系統,從而進一步降低了開銷。
需要注意的是,在靜態池中的連接應該按需延遲創建,這樣可以更有效地把數據發送到外部系統。另外需要要注意的是:DStreams延遲執行的,就像RDD的操作是由actions觸發一樣。默認情況下,輸出操作會按照它們在Streaming應用程序中定義的順序一個個執行。