Spark Streaming中的基本操作函數實例


官網文檔中,大概可分為這幾個

Transformations
Window Operations
Join Operations
Output Operations

 

請了解一些基本信息:

DStream是Spark Streaming提供的基本抽象。它表示連續的數據流,可以是從源接收的輸入數據流,也可以是通過轉換輸入流生成的已處理數據流。在內部,DStream由一系列連續的RDD表示,這是Spark對不可變分布式數據集的抽象。DStream中的每個RDD都包含來自特定時間間隔的數據。

 

 

Transformations  直達車

1)map(func),將func函數作用到每一個元素上並生成一個新的元素,得到一個新的的DStream對象,包含這些新的元素。 

代碼

object Map {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)
val mapLines = lines.map(word => "map_" + word)

mapLines.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

2)flatMap(func),將func函數作用到每一個元素上並生成0個或多個新的元素(例如下面的split就生成了>=0個新元素),得到一個新的DStream對象。包含這些新的元素。

代碼

object FlatMap {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)
val fmapLines = lines.flatMap(_.split(" "))

fmapLines.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

ps:Spark操作-map和flatMap,map的作用很容易理解就是對rdd之中的元素進行逐一進行函數操作映射為另外一個rdd。flatMap的操作是將函數應用於rdd之中的每一個元素,將返回的迭代器的所有內容構成新的rdd。通常用來切分單詞。

 

 

 

 

3)filter(func),對DStream每一個元素,應用func方法進行計算,如果func函數返回結果為true,則保留該元素,否則丟棄該元素,返回一個新的DStream。

代碼

object Filter {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val filterLines = lines.flatMap(_.split(" "))
.filter(!StringUtils.equals(_, "hello"))

filterLines.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

4)repartition(numPartitions),可自行操作

5)union(otherStream),返回一個新的DStream,它包含源DStream和otherDStream中元素的並集。

代碼

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)
val union1 = lines.map(word => "union1_" + word)
val union2 = lines.map(word => "union2_" + word)
val union1_2 = union1.union(union2)

union1.print()
union2.print()
union1_2.print()

ssc.start()
ssc.awaitTermination()
}

 

 

 

6)count(),通過計算源DStream的每個RDD中的元素數量,返回單個元素RDD的新DStream。

代碼

object Count {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)
val mapLines = lines.map(_.split(" "))
val fmapLines = lines.flatMap(_.split(" "))

mapLines.count().print()
fmapLines.count().print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

7)reduce(func),通過使用函數func(它接受兩個參數並返回一個),其中兩個參數(元素)兩兩計算,返回單個元素RDD的新DStream 。

代碼

object Reduce {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val fmapLines = lines.flatMap(_.split(" "))
val result = fmapLines.reduce(_ + "*" + _)
//fmapLines.reduce((a, b) => a + "*" + b)

result.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

8)countByValue(),當在類型為K的DStream元素上調用時,返回新DStream的元素是(K,Long)對,其中每個鍵的值(Long)是其在源DStream的每個RDD中的頻率。

代碼

object countByValue {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val fmapLines = lines.flatMap(_.split(" "))
val countByKey = fmapLines.countByValue()

countByKey.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

9)reduceByKey(func, [numTasks]),當在類型為(K,V)的DStream元素上調用時,返回(K,V)對的新DStream,其中K為原來的K,V是由K經過傳入func計算得到的。

注意:默認情況下,這使用Spark的默認並行任務數(local模式下默認為2,在群集模式下,數量由config屬性確定spark.default.parallelism)進行分組。

代碼

object ReduceByKey {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val fmapLines = lines.flatMap(_.split(" "))
val tuple = fmapLines.map(word => (word, 1))
val reduceByKey = tuple.reduceByKey(_ + _)

reduceByKey.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

10)join(otherStream, [numTasks]),當在(K,V)和(K,W)對的兩個DStream上調用時,返回新的DStream內容是(K,(V,W))對。numTasks並行度,可選

代碼

object Join {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)
val words = lines.flatMap(_.split(" "))
val join1 = words.map(word => (word, "join1_" + word))
val join2 = words.map(word => (word, "join2_" + word))
val join1_2 = join1.join(join2)

join1.print()
join2.print()
join1_2.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

11)cogroup(otherStream, [numTasks]),當在(K,V)和(K,W)對的DStream上調用時,返回(K,Seq [V],Seq [W])元組的新DStream。numTasks並行度,可選

代碼

object Cogroup {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)
val words = lines.flatMap(_.split(" "))
val cogroup1 = words.map(word => (word, "cogroup1_" + word))
val cogroup2 = words.map(word => (word, "cogroup2_" + word))
val cogroup1_2 = cogroup1.cogroup(cogroup2)

cogroup1.print()
cogroup2.print()
cogroup1_2.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

12)transform(func直達車,通過將RDD-to-RDD函數應用於源DStream的每個RDD來返回新的DStream。這可以用於在DStream上執行任意RDD操作。

代碼

object Transform {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val words = lines.transform(rdd=>{
rdd.flatMap(_.split(" "))
})

words.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

13)updateStateByKey(func)直達車,返回一個新的“狀態”DStream,其中通過在鍵的先前狀態和鍵的新值上應用給定函數來更新每個鍵的狀態。這可用於維護每個密鑰的任意狀態數據。

代碼

object UpdateStateByKey {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

//請注意,使用updateStateByKey需要配置檢查點目錄
ssc.checkpoint("D:\\spark\\checkpoint")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
val state = result.updateStateByKey[Int](updateFunction _)

state.print()

ssc.start()
ssc.awaitTermination()
}

/**
* 更新數據
* @param newValues
* @param runningCount
* @return
*/
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {

val current = newValues.sum
val pre = runningCount.getOrElse(0)
Some(current + pre)

}
}

 

 

 

 

 

Window Operations  直達車

window:定時的進行一段時間內數據的操作
window length:窗口的長度
sliding interval:窗口的間隔
這兩個參數和batch size是倍數關系,不是的話會報錯

1)window(windowLengthslideInterval),將當前時刻當前長度窗口中的元素取出形成一個新的DStream。

代碼

object Window {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val fmapLines = lines.flatMap(_.split(" "))
//每隔5秒去計算前10秒的結果
val window = fmapLines.window(Seconds(10), Seconds(5))

window.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

2)countByWindow(windowLengthslideInterval),和count類似,只不過Dstream是我們截取的。

代碼

object CountByWindow {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("D:\\spark\\checkpoint")
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)


val fmapLines = lines.flatMap(_.split(" "))
//每隔5s統計當前10秒長度的時間窗口的DStream中元素的個數:
val countByWindow = fmapLines.countByWindow(Seconds(10), Seconds(5))

countByWindow.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

3)reduceByWindow(funcwindowLengthslideInterval),和reduce類似,只不過Dstream是我們截取的。

代碼

object ReduceByWindow {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("D:\\spark\\checkpoint")
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)


val fmapLines = lines.flatMap(_.split(" "))
val reduceByWindow = fmapLines.reduceByWindow(_ + "*" + _, Seconds(10), Seconds(5))

reduceByWindow.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

4)reduceByKeyAndWindow(funcwindowLengthslideInterval, [numTasks]) 直達車,和reduceByKey類似,只不過Dstream是我們截取的。

代碼

object ReduceByKeyAndWindow {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("D:\\spark\\checkpoint")
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val fmapLines = lines.flatMap(_.split(" "))
val tuple = fmapLines.map(word => (word, 1))
val reduceByKeyAndWindow = tuple.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(10),Seconds(5))

reduceByKeyAndWindow.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

5)reduceByKeyAndWindow(funcinvFuncwindowLengthslideInterval, [numTasks]),和上面相比,多傳入一個函數invFunc。向車站一樣,有進去的人,也有出去的人,進去的人+1,出來的人-1。

代碼

object ReduceByKeyAndWindow2 {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("D:\\spark\\checkpoint")
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val fmapLines = lines.flatMap(_.split(" "))
val tuple = fmapLines.map(word => (word, 1))
val reduceByKeyAndWindow = tuple.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), (a: Int, b: Int) => (a - b), Seconds(10), Seconds(5))

reduceByKeyAndWindow.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

6)countByValueAndWindow(windowLength,slideInterval, [numTasks]),和countByValue類似,只不過Dstream是我們截取的。

代碼

object CountByValueAndWindow {
def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("D:\\spark\\checkpoint")
ssc.sparkContext.setLogLevel("ERROR")

val lines = ssc.socketTextStream("192.168.31.30", 9999)

val fmapLines = lines.flatMap(_.split(" "))
val countByValueAndWindow = fmapLines.countByValueAndWindow(Seconds(10), Seconds(5))

countByValueAndWindow.print()

ssc.start()
ssc.awaitTermination()
}
}

 

 

 

Join Operations  直達車

 

1)Stream-stream joins 直達車

調用 join,leftOuterJoin,rightOuterJoin,fullOuterJoin就ok了

2)Stream-dataset joins 直達車

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

我們調用transform后就可以和dataset 連接操作了

Output Operations  直達車

 

輸出操作      含義
print() 在運行流應用程序的驅動程序節點上打印DStream中每批數據的前十個元素。這對開發和調試很有用。
saveAsTextFiles(prefix, [suffix]) 將此DStream的內容保存為文本文件。每個批處理間隔的文件名基於前綴后綴生成:“prefix-TIME_IN_MS [.suffix]”
saveAsObjectFiles(prefix, [suffix]) 將此DStream的內容保存為SequenceFiles序列化Java對象。每個批處理間隔的文件名基於前綴和 后綴生成:“prefix-TIME_IN_MS [.suffix]”。 
saveAsHadoopFiles(prefix, [suffix]) 將此DStream的內容保存為SequenceFiles序列化Java對象。每個批處理間隔的文件名基於前綴和 后綴生成:“prefix-TIME_IN_MS [.suffix]”。 
Python API這在Python API中不可用。  
foreachRDD(func) 最通用的輸出運算符,它將函數func應用於從流生成的每個RDD。此函數應將每個RDD中的數據推送到外部系統,例如將RDD保存到文件,或通過網絡將其寫入數據庫。請注意,函數func在運行流應用程序的驅動程序進程中執行,並且通常會在其中執行RDD操作,這將強制計算流式RDD。

1)foreachRDD(func),正確高效的使用 直達車

connection 為外部鏈接 

代碼

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}

更高效的

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM