DStream編程數據模型
DStream(Discretized Stream)作為Spark Streaming的基礎抽象,它代表持續性的數據流。
這些數據流既可以通過外部輸入源賴獲取,也可以通過現有的Dstream的transformation操作來獲得。
在內部實現上,DStream由一組時間序列上連續的RDD來表示。每個RDD都包含了自己特定時間間隔內的數據流。
對DStream中數據的各種操作也是映射到內部的RDD上來進行的
對Dtream的操作可以通過RDD的transformation生成新的DStream。
我們把RDD加上一個時間屬性來區分。
我們可以把DStream當作一連串用時間分段的RDD來看待,並且這串是RDD像流水一樣綿綿不斷的。
當我們對DStream采取一些操作的時候,其中每段時間的RDD之間相互對應轉化成新的DStream.
SparkStreaming的基本步驟
1.通過創建輸入DStream來定義輸入源
2.通過對DStream應用轉換操作和輸出操作來定義流計算,用戶自己定義處理邏輯
3.通過streamingContext.start()來開始接收數據和處理流程
4.通過streamingContext.awaitTermination()方法來等待處理結果
5.通過streamingContext.stop()來手動結束流計算流程
具體步驟
1.創建StreamingContext對象
(1)通過 new StreamingContext(SparkConf,Interval)建立
創建StreamingContext對象所需的參數有兩個一個是 SparkConf 配置參數,一個是時間參數。
與SparkContext基本一致,SparkConf 配置參數需要指明Master,任務名稱(如NetworkWordCount)。
時間參數我們如果以秒來定義的話格式為Seconds(n),這個參數定義了Spark Streaming需要指定處理數據的時間間隔,
時間參數需要根據用戶的需求和集群的處理能力進行適當的設置。
例如
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
這里的那么Spark Streaming會以1s為時間窗口進行數據處理。
(2)通過 new StreamingContext(SparkContext,Interval)建立
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1))
這種方式一般用於spark-shell中建立,
spark-shell中給我們定義好了sc,但是spark-shell並沒有為我們建立好ssc
所以我們需要自己建立ssc
在建立ssc 之前我們需要導入 import org.apache.spark.streaming._
在編碼之前我們需要設置一下日志等級,以便我們之后的程序調試。
要么日志會把所有東西都顯示出來,你根本找不到哪條是錯誤信息。
//設置日志等級的單例對象 import org.apache.log4j.{Logger,Level} import org.apache.spark.internal.Logging object StreamingLoggingExample extends Logging{ def setStreamingLogLevels(): Unit ={ val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if(!log4jInitialized) logInfo("Setting log level to [WARN] for streaming example" + "To override add a custom log4j.properties to the classpath" ) Logger.getRootLogger.setLevel(Level.WARN) } } //使用單例對象修改日志等級 StreamingLoggingExample.setsetStreamingLogLevels() //注意在編碼之前設置
2.創建InputDStream
我們通過設置InputDStream來設置數據的來源
Spark Streaming支持的數據源有文件流、套接字流、RDD隊列流、Kafka、 Flume、HDFS/S3、Kinesis和Twitter等數據源。
(1)文件流
val lines = ssc.textFileStream("file:///")
文件流的加載的是系統中的文件,可以是HDFS中的也可以是本地的,跟創建RDD是一樣的。
(2)套接字流
val lines = ssc.socketTextStream("hostname", port.toInt)
(3)RDD隊列流
創建RDD隊列
val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()//建立一個整型RDD的隊列流,初始化為空
創建RDD隊列流的spark流進行監聽
val lines = ssc.queueStream(rddQueue)
rdd隊列流中添加數據
for(i <- 1 to 100){ rddQueue += ssc.sparkContext.makeRDD(1 to 100,2) //添加數據到RDD隊列 }
(4)Kafka
3.操作DStream
對於從數據源得到的DStream,用戶可以在其基礎上進行各種操作。
與RDD類似,DStream也提供了自己的一系列操作方法,這些操作可以分成三類:普通的轉換操作、窗口轉換操作和輸出操作。
(1)普通的轉換操作
轉換 |
描述 |
map(func) |
源 DStream的每個元素通過函數func返回一個新的DStream。 |
flatMap(func) |
類似與map操作,不同的是每個輸入元素可以被映射出0或者更多的輸出元素。 |
filter(func) |
在源DSTREAM上選擇Func函數返回僅為true的元素,最終返回一個新的DSTREAM 。 |
repartition(numPartitions) |
通過輸入的參數numPartitions的值來改變DStream的分區大小。 |
union(otherStream) |
返回一個包含源DStream與其他 DStream的元素合並后的新DSTREAM。 |
count() |
對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。 |
reduce(func) |
使用函數func(有兩個參數並返回一個結果)將源DStream 中每個RDD的元素進行聚 合操作,返回一個內部所包含的RDD只有一個元素的新DStream。 |
countByValue() |
計算DStream中每個RDD內的元素出現的頻次並返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現的頻次。 |
reduceByKey(func, [numTasks]) |
當一個類型為(K,V)鍵值對的DStream被調用的時候,返回類型為類型為(K,V)鍵值對的新 DStream,其中每個鍵的值V都是使用聚合函數func匯總。注意:默認情況下,使用 Spark的默認並行度提交任務(本地模式下並行度為2,集群模式下位8),可以通過配置numTasks設置不同的並行任務數。 |
join(otherStream, [numTasks]) |
當被調用類型分別為(K,V)和(K,W)鍵值對的2個DStream 時,返回類型為(K,(V,W))鍵值對的一個新 DSTREAM。 |
cogroup(otherStream, [numTasks]) |
當被調用的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。 |
transform(func) |
通過對源DStream的每RDD應用RDD-to-RDD函數返回一個新的DStream,這可以用來在DStream做任意RDD操作。 |
updateStateByKey(func) |
返回一個新狀態的DStream,其中每個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函數func后的更新。這個方法可以被用來維持每個鍵的任何狀態數據。 |
注意:
transform(func)
該transform操作(轉換操作)連同其其類似的 transformWith操作允許DStream 上應用任意RDD-to-RDD函數。
它可以被應用於未在 DStream API 中暴露任何的RDD操作。
例如,在每批次的數據流與另一數據集的連接功能不直接暴露在DStream API 中,但可以輕松地使用transform操作來做到這一點,這使得DStream的功能非常強大。
例如,你可以通過連接預先計算的垃圾郵件信息的輸入數據流(可能也有Spark生成的),然后基於此做實時數據清理的篩選,如下面官方提供的偽代碼所示。
事實上,也可以在transform方法中使用機器學習和圖形計算的算法。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
updateStateByKey操作
我們使用的一般操作都是不記錄歷史數據的,也就說只記錄當前定義時間段內的數據,跟前后時間段無關。
如果我們想統計歷史時間內的總共數據並且實時更新呢?
該 updateStateByKey 操作可以讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :
(1) 定義狀態 - 狀態可以是任意的數據類型。
(2) 定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。
對DStream通過updateStateByKey(updateFunction)來實現實時更新。
更新函數有兩個參數 1.newValues 是當前新進入的數據 2.runningCount 是歷史數據,被封裝到了Option中。
為什么歷史數據要封裝到Option中呢?有可能我們沒有歷史數據,這個時候就可以用None,有數據可以用Some(x)。
當然我們的當前結果也要封裝到Some()中,以便作為之后的歷史數據。
我們並不用關心新進入的數據和歷史數據,系統會自動幫我們產生和維護,我們只需要專心寫處理方法就行。
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {//定義的更新函數 val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) } val runningCounts = pairs.updateStateByKey[Int](updateFunction)//應用
示例:
(1)首先我們需要了解數據的類型
(2)編寫處理方法
(3)封裝結果
//定義更新函數 //我們這里使用的Int類型的數據,因為要做統計個數 def updateFunc(newValues : Seq[Int],state :Option[Int]) :Some[Int] = { //傳入的newVaules將當前的時間段的數據全部保存到Seq中 //調用foldLeft(0)(_+_) 從0位置開始累加到結束 val currentCount = newValues.foldLeft(0)(_+_) //獲取歷史值,沒有歷史數據時為None,有數據的時候為Some //getOrElse(x)方法,如果獲取值為None則用x代替 val previousCount = state.getOrElse(0) //計算結果,封裝成Some返回 Some(currentCount+previousCount) } //使用 val stateDStream = DStream.updateStateByKey[Int](updateFunc)
(2)窗口轉換函數
Spark Streaming 還提供了窗口的計算,它允許你通過滑動窗口對數據進行轉換,窗口轉換操作如下:
轉換 |
描述 |
window(windowLength, slideInterval) |
返回一個基於源DStream的窗口批次計算后得到新的DStream。 |
countByWindow(windowLength,slideInterval) |
返回基於滑動窗口的DStream中的元素的數量。 |
reduceByWindow(func, windowLength,slideInterval) |
基於滑動窗口對源DStream中的元素進行聚合操作,得到一個新的DStream。 |
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) |
基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操作,得到一個新的DStream。 |
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) |
一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合並移去最早的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那么我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法可以復用中間三秒的統計量,提高統計的效率。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) |
基於滑動窗口計算源DStream中每個RDD內每個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue一樣,reduce任務的數量可以通過一個可選參數進行配置。 |
在Spark Streaming中,數據處理是按批進行的,而數據采集是逐條進行的。
因此在Spark Streaming中會先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把采集到的數據匯總起來成為一批數據交給系統去處理。
對於窗口操作而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定
而窗口間隔指的就是窗口的持續時間,在窗口操作中,只有窗口的長度滿足了才會觸發批數據的處理。
除了窗口的長度,窗口操作還有另一個重要的參數就是滑動間隔(slide duration)
它指的是經過多長時間窗口滑動一次形成新的窗口,滑動窗口默認情況下和批次間隔的相同,而窗口間隔一般設置的要比它們兩個大。
在這里必須注意的一點是滑動間隔和窗口間隔的大小一定得設置為批處理間隔的整數倍。
如圖所示,批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。
對於初始的窗口time 1-time 3,只有窗口間隔滿足了定義的長度也就是3才觸發數據的處理,不夠3繼續等待。
當間隔滿足3之后進行計算后然后進行窗口滑動,滑動2個單位,會有新的數據流入窗口。
然后重復等待滿足窗口間隔執行計算。
(3)輸出操作
Spark Streaming允許DStream的數據被輸出到外部系統,如數據庫或文件系統。
由於輸出操作實際上使transformation操作后的數據可以通過外部系統被使用,同時輸出操作觸發所有DStream的transformation操作的實際執行(類似於RDD操作)。
以下表列出了目前主要的輸出操作:
轉換 |
描述 |
print() |
在Driver中打印出DStream中數據的前10個元素。 |
saveAsTextFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存為文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) |
將DStream中的內容按對象序列化並且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存為Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) |
最基本的輸出操作,將func函數應用於DStream中的RDD上,這個操作會輸出數據到外部系統,比如保存RDD到文件或者網絡數據庫等。需要注意的是func函數是在運行該streaming應用的Driver進程里執行的。 |
同樣DStream也支持持久化
與RDD一樣,DStream同樣也能通過persist()方法將數據流存放在內存中,
4.啟動Spark Streaming
通過streamingContext.start()來開始接收數據和處理流程
通過streamingContext.awaitTermination()方法來等待處理結果
通過streamingContext.stop()來手動結束流計算流程
示例
package SparkDemo import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamWordCount { def main(args:Array[String]): Unit ={ //創建StreamingContext val conf = new SparkConf().setMaster("local[*]").setAppName("StreamTest") val ssc = new StreamingContext(conf,Seconds(20)) //建立文件流數據源通道 val lines = ssc.textFileStream("file:///") lines.cache()//持久化 //處理,word count val words = lines.flatMap(_.split(" ")) val wordPair = words.map((_,1)) val count = wordPair.reduceByKey(_+_) count.print() //啟動StreamingContext ssc.start() ssc.awaitTermination() } }
然后我們將程序打包提交到spark集群中運行
當程序運行ssc.start()后,就開始自動循環進入監聽狀態,屏幕上會顯示
這是正確的,如果我們在建立ssc的文件中再添加一個文件file3.txt
就可以在監聽窗口中顯示詞頻的統計了。
最后我們可以通過ssc.stop()停止程序,不過注意我們不能省略這里的圓括號。