DStream轉換操作包括無狀態轉換和有狀態轉換。
無狀態轉換:每個批次的處理不依賴於之前批次的數據。
有狀態轉換:當前批次的處理需要使用之前批次的數據或者中間結果。有狀態轉換包括基於滑動窗口的轉換和追蹤狀態變化的轉換(updateStateByKey)。
DStream無狀態轉換操作包括:
map(func) :對源DStream的每個元素,采用func函數進行轉換,得到一個新的DStream;
* flatMap(func): 與map相似,但是每個輸入項可用被映射為0個或者多個輸出項;
* filter(func): 返回一個新的DStream,僅包含源DStream中滿足函數func的項;
* repartition(numPartitions): 通過創建更多或者更少的分區改變DStream的並行程度;
* union(otherStream): 返回一個新的DStream,包含源DStream和其他DStream的元素;
* count():統計源DStream中每個RDD的元素數量;
* reduce(func):利用函數func聚集源DStream中每個RDD的元素,返回一個包含單元素RDDs的新DStream;
* countByValue():應用於元素類型為K的DStream上,返回一個(K,V)鍵值對類型的新DStream,每個鍵的值是在原DStream的每個RDD中的出現次數;
* reduceByKey(func, [numTasks]):當在一個由(K,V)鍵值對組成的DStream上執行該操作時,返回一個新的由(K,V)鍵值對組成的DStream,每一個key的值均由給定的recuce函數(func)聚集起來;
* join(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, (V, W))鍵值對的新DStream;
* cogroup(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, Seq[V], Seq[W])的元組;
* transform(func):通過對源DStream的每個RDD應用RDD-to-RDD函數,創建一個新的DStream。支持在新的DStream中做任何RDD操作。
注意:
1. 想要使用countByValue ,reduceByKey等 需要使用 flatMap 如果用map處理數據:報如下的錯誤
Exception in thread "main" org.apache.spark.SparkException: Cannot use map-side combining with array keys.
flatMap:主要用於spark Streaming
map:到處用
區別:
以讀文件例子:map會為每一行返回一個對象,但flatMap會匯總每行返回的對象成為一個更大的對象(map-side的錯誤也能看得出)
2.
val sc=new SparkContext(conf) val lines2=sc.textFile("file:///**/1.txt") 2.1 lines2.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_) 2.2 lines2.flatMap(lines=>lines.split(" ")).countByValue() 2.1和2.2 結果一樣