4.DStream 轉換
DStream 上的原語與 RDD 的類似,分為 Transformations(轉換)和 Output Operations(輸
出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及
各種 Window 相關的原語。
4.1 無狀態轉化操作
無狀態轉化操作就是把簡單的 RDD 轉化操作應用到每個批次上,也就是轉化 DStream 中的
每一個 RDD。部分無狀態轉化操作列在了下表中。注意,針對鍵值對的 DStream 轉化操作(比如
reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

需要記住的是,盡管這些函數看起來像作用在整個流上一樣,但事實上每個 DStream 在內部
是由許多 RDD(批次)組成,且無狀態轉化操作是分別應用到每個 RDD 上的。例如,reduceByKey()
會歸約每個時間區間中的數據,但不會歸約不同區間之間的數據。
舉個例子,在之前的 wordcount 程序中,我們只會統計 1 秒內接收到的數據的單詞個數,而
不會累加。
無狀態轉化操作也能在多個 DStream 間整合數據,不過也是在各個時間區間內。例如,鍵 值
對 DStream 擁有和 RDD 一樣的與連接相關的轉化操作,也就是 cogroup()、join()、leftOuterJoin()
等。我們可以在 DStream 上使用這些操作,這樣就對每個批次分別執行了對應的 RDD 操作。
我們還可以像在常規的 Spark 中一樣使用 DStream 的 union() 操作將它和另一個 DStream
的內容合並起來,也可以使用 StreamingContext.union()來合並多個流。
4.2 有狀態轉化操作
4.2.1 UpdateStateByKey
UpdateStateByKey 原語用於記錄歷史記錄,有時,我們需要在 DStream 中跨批次維護狀態(例
如流計算中累加 wordcount)。針對這種情況,updateStateByKey()為我們提供了對一個狀態變量的
訪問,用於鍵值對形式的 DStream。給定一個由(鍵,事件)對構成的 DStream,並傳遞一個指定如
何根據新的事件更新每個鍵對應狀態的函數,它可以構建出一個新的 DStream,其內部數據為(鍵,
狀態) 對。
updateStateByKey() 的結果會是一個新的 DStream,其內部的 RDD 序列是由每個時間區間
對應的(鍵,狀態)對組成的。
updateStateByKey 操作使得我們可以在用新信息進行更新時保持任意的狀態。為使用這個功
能,你需要做下面兩步:
1. 定義狀態,狀態可以是一個任意的數據類型。
2. 定義狀態更新函數,用此函數闡明如何使用之前的狀態和來自輸入流的新值對狀態進行更新。
使用 updateStateByKey 需要對檢查點目錄進行配置,會使用檢查點來保存狀態。
更新版的 wordcount:
(1)編寫代碼
package com.lxl.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定義更新狀態方法,參數 values 為當前批次單詞頻度,state 為以往批次單詞頻度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like hadoop102:9999 val lines = ssc.socketTextStream("hadoop102", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) // 使用 updateStateByKey 來更新狀態,統計從運行開始以來單詞總的次數 val stateDstream = pairs.updateStateByKey[Int](updateFunc) stateDstream.print() //val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console //wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
(2)啟動程序並向 9999 端口發送數據
[lxl@hadoop102 kafka]$ nc -lk 9999
ni shi shui
ni hao ma
(3)結果展示
------------------------------------------- Time: 1504685175000 ms ------------------------------------------- ------------------------------------------- Time: 1504685181000 ms ------------------------------------------- (shi,1) (shui,1) (ni,1) ------------------------------------------- Time: 1504685187000 ms ------------------------------------------- (shi,1) (ma,1) (hao,1) (shui,1) (ni,2)
筆記:
[lxl@hadoop102 spark]$ sudo yum install nc.x86_64
[lxl@hadoop102 spark]$ nc -lk 9999
package com.atlxl.helloworld import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver /* 自定義輸入流 */ class CustomerRecevicer(host:String, port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ //接收器啟動的時候調用 override def onStart(): Unit = { new Thread("receiver"){ override def run(): Unit = { //接受數據並提交給框架 receive() } }.start() } def receive(): Unit ={ var socket: Socket = null var input: String = null try { socket = new Socket(host,port) //生成數據流 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) //接收數據 /* 方式一: */ while (!isStopped() && (input = reader.readLine()) != null){ store(input) } /* 方式二: */ // input = reader.readLine() // store(input) // while (!isStopped() && input != null){ // store(input) // input = reader.readLine() // } // restart("restart") }catch { case e:java.net.ConnectException => restart("restart") case t:Throwable => restart("restart") } } //接收器關閉的時候調用 override def onStop(): Unit = {} }
package com.atlxl.helloworld import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount { def main(args: Array[String]): Unit = { //conf val conf = new SparkConf().setAppName("wc").setMaster("local[*]") // val ssc = new StreamingContext(conf,Seconds(5)) //保存狀態信息 ssc.checkpoint("./check") //獲取數據 // val lineDStream = ssc.socketTextStream("hadoop102",9999) //自定義獲取 val lineDStream = ssc.receiverStream(new CustomerRecevicer("hadoop102", 9999)) //DStream[String] val wordsDStream = lineDStream.flatMap(_.split(" ")) //DStream[(String,1)] val k2vDStream = wordsDStream.map((_,1)) //DStream[(String,sum)] //無狀態轉換 // val result = k2vDStream.reduceByKey(_+_) val updateFuc =(v:Seq[Int],state:Option[Int])=> { val preStatus = state.getOrElse(0) Some(preStatus + v.sum) } //有狀態轉換 val result = k2vDStream.updateStateByKey(updateFuc) result.print() //運行 ssc.start() ssc.awaitTermination() } }
4.2.2 Window Operations
Window Operations 有點類似於 Storm 中的 State,可以設置窗口的大小和滑動窗口的間隔來
動態的獲取當前 Steaming 的允許狀態。
基於窗口的操作會在一個比 StreamingContext 的批次間隔更長的時間范圍內,通過整合多個
批次的結果,計算出整個窗口的結果。
所有基於窗口的操作都需要兩個參數,分別為窗口時長以及滑動步長,兩者都必須是
StreamContext 的批次間隔的整數倍。窗口時長控制每次計算最近的多少個批次的數據,其實就
是最近的 windowDuration/batchInterval 個批次。如果有一個以 10 秒為批次間隔的源 DStream,
要創建一個最近 30 秒的時間窗口(即最近 3 個批次),就應當把 windowDuration 設為 30 秒。
而滑動步長的默認值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。如果源
DStream 批次間隔為 10 秒,並且我們只希望每兩個批次計算一次窗口結果, 就應該把滑動步
長設置為 20 秒。
假設,你想拓展前例從而每隔十秒對持續 30 秒的數據生成 word count。為做到這個,我們需
要在持續 30 秒數據的(word,1)對DStream上應用 reduceByKey。使用操作 reduceByKeyAndWindow.
# reduce last 30 seconds of data, every 10 second
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)
關於 Window 的操作有如下原語:
(1)window(windowLength, slideInterval): 基於對源 DStream 窗化的批次進行計算返回一個新
的 Dstream
(2)countByWindow(windowLength, slideInterval):返回一個滑動窗口計數流中的元素。
(3)reduceByWindow(func, windowLength, slideInterval):通過使用自定義函數整合滑動區間流
元素來創建一個新的單元素流。
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):當在一個(K,V)對
的 DStream 上調用此函數,會返回一個新(K,V)對的 DStream,此處通過對滑動窗口中批次數據使
用 reduce 函數來整合每個 key 的 value 值。Note:默認情況下,這個操作使用 Spark 的默認數量並
行任務(本地是 2),在集群模式中依據配置屬性(spark.default.parallelism)來做 grouping。你可以通
過設置可選參數 numTasks 來設置不同數量的 tasks。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):這個函
數是上述函數的更高效版本,每個窗口的 reduce 值都是通過用前一個窗的 reduce 值來遞增計算。
通過 reduce 進入到滑動窗口數據並”反向 reduce”離開窗口的舊數據來實現這個操作。一個例子
是隨着窗口滑動對 keys 的“加”“減”計數。通過前邊介紹可以想到,這個函數只適用於”可逆
的 reduce 函數”,也就是這些 reduce 函數有相應的”反 reduce”函數(以參數 invFunc 形式傳入)。
如前述函數,reduce 任務的數量通過可選參數來配置。注意:為了使用這個操作,檢查點必須可
用。
(6)countByValueAndWindow(windowLength,slideInterval, [numTasks]):對(K,V)對的 DStream
調用,返回(K,Long)對的新 DStream,其中每個 key 的值是其在滑動窗口中頻率。如上,可配置
reduce 任務數量。
reduceByWindow() 和 reduceByKeyAndWindow() 讓我們可以對每個窗口更高效地進行歸約
操作。它們接收一個歸約函數,在整個窗口上執行,比如 +。除此以外,它們還有一種特殊形式,
通過只考慮新進入窗口的數據和離開窗口的數據,讓 Spark 增量計算歸約結果。這種特殊形式需
要提供歸約函數的一個逆函數,比 如 + 對應的逆函數為 -。對於較大的窗口,提供逆函數可以
大大提高執行效率

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x, y) => x + y}, {(x, y) => x - y}, Seconds(30), Seconds(10)) //加上新進入窗口的批次中的元素 //移除離開窗口的老批次中的元素 //窗口時長// 滑動步長
countByWindow() 和 countByValueAndWindow() 作 為 對 數 據 進 行 計 數 操 作 的 簡 寫 。
countByWindow()返回一個表示每個窗口中元素個數的 DStream,而 countByValueAndWindow()返
回的 DStream 則包含窗口中每個值的個數。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
WordCount 第三版:3 秒一個批次,窗口 12 秒,滑步 6 秒。
package com.atguigu.streaming
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount { def main(args: Array[String]) { // 定義更新狀態方法,參數 values 為當前批次單詞頻度,state 為以往批次單詞頻度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".")
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("hadoop102", 9999)
// Split each line into words val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))
// Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
4.3 其他重要操作
4.3.1 Transform
Transform 原語允許 DStream 上執行任意的 RDD-to-RDD 函數。即使這些函數並沒有在 DStream
的 API 中暴露出來,通過該函數可以方便的擴展 Spark API。該函數每一批次調度一次。其實也
就是對 DStream 中的 RDD 應用轉換。
比如下面的例子,在進行單詞統計的時候,想要過濾掉 spam 的信息。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
4.3.2 Join
連接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以連接Stream-Stream,windows
stream to windows-stream、stream-dataset
Stream-Stream Joins
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }