根據Spark官方文檔中的描述,在Spark Streaming應用中,一個DStream對象可以調用多種操作,主要分為以下幾類
- Transformations
- Window Operations
- Join Operations
- Output Operations
一、Transformations
1、map(func)
2、flatMap(func)
map操作需要傳入一個函數當做參數,具體調用形式為
[]
純文本查看 復制代碼
|
1
|
val
b
=
a.map(func)
|
主要作用是,對DStream對象a,將func函數作用到a中的每一個元素上並生成新的元素,得到的DStream對象b中包含這些新的元素。
下面示例代碼的作用是,在接收到的一行消息后面拼接一個”_NEW”字符串
[Scala]
純文本查看 復制代碼
|
1
|
val
linesNew
=
lines.map(lines
=
> lines +
"_NEW"
)
|
程序運行結果如下:
<ignore_js_op>
注意與接下來的flatMap操作進行比較。
2、flatMap(func)
類似於上面的map操作,具體調用形式為
[Scala]
純文本查看 復制代碼
|
1
|
val
b
=
a.flatMap(func)
|
主要作用是,對DStream對象a,將func函數作用到a中的每一個元素上並生成0個或多個新的元素,得到的DStream對象b中包含這些新的元素。
下面示例代碼的作用是,在接收到的一行消息lines后,將lines根據空格進行分割,分割成若干個單詞
[Scala]
純文本查看 復制代碼
|
1
|
val
words
=
lines.flatMap(
_
.split(
" "
))
|
結果如下:
<ignore_js_op>
3、 filter(func)
filter傳入一個func函數,具體調用形式為
[Scala]
純文本查看 復制代碼
|
1
|
val
b
=
a.filter(func)
|
對DStream a中的每一個元素,應用func方法進行計算,如果func函數返回結果為true,則保留該元素,否則丟棄該元素,返回一個新的DStream b。
下面示例代碼中,對words進行判斷,去除hello這個單詞。
結果如下:
<ignore_js_op>
4、union(otherStream)
這個操作將兩個DStream進行合並,生成一個包含着兩個DStream中所有元素的新DStream對象。
下面代碼,首先將輸入的每一個單詞后面分別拼接“_one”和“_two”,最后將這兩個DStream合並成一個新的DStream
[Scala]
純文本查看 復制代碼
|
1
2
3
4
5
6
7
|
val
wordsOne
=
words.map(
_
+
"_one"
)
val
wordsTwo
=
words.map(
_
+
"_two"
)
val
unionWords
=
wordsOne.union(wordsTwo)
wordsOne.print()
wordsTwo.print()
unionWords.print()
|
運行結果如下:
<ignore_js_op>
5、count()
統計DStream中每個RDD包含的元素的個數,得到一個新的DStream,這個DStream中只包含一個元素,這個元素是對應語句單詞統計數值。
以下代碼,統計每一行中的單詞數
[Scala]
純文本查看 復制代碼
|
1
|
val
wordsCount
=
words.count()
|
運行結果如下,一行輸入4個單詞,打印的結果也為4。
<ignore_js_op>
6、reduce(func)
返回一個包含一個元素的DStream,傳入的func方法會作用在調用者的每一個元素上,將其中的元素順次的兩兩進行計算。
下面的代碼,將每一個單詞用 "-"符號進行拼接
[Scala]
純文本查看 復制代碼
|
1
|
val
reduceWords
=
words.reduce(
_
+
"-"
+
_
)
|
運行結果如下:
<ignore_js_op>
7、countByValue()
某個DStream中的元素類型為K,調用這個方法后,返回的DStream的元素為(K, Long)對,后面這個Long值是原DStream中每個RDD元素key出現的頻率。
以下代碼統計words中不同單詞的個數
[Scala]
純文本查看 復制代碼
|
1
|
val
countByValueWords
=
words.countByValue()
|
結果如下:
<ignore_js_op>
8、reduceByKey(func, [numTasks])
調用這個操作的DStream是以(K, V)的形式出現,返回一個新的元素格式為(K, V)的DStream。返回結果中,K為原來的K,V是由K經過傳入func計算得到的。還可以傳入一個並行計算的參數,在local模式下,默認為2。在其他模式下,默認值由參數
spark.default.parallelism確定。
下面代碼將words轉化成(word, 1)的形式,再以單詞為key,個數為value,進行word count。
[Scala]
純文本查看 復制代碼
|
1
2
|
val
pairs
=
words.map(word
=
> (word ,
1
))
val
wordCounts
=
pairs.reduceByKey(
_
+
_
)
|
結果如下,
<ignore_js_op>
9、join(otherStream, [numTasks])
由一個DStream對象調用該方法,元素內容為
(k, V),傳入另一個DStream對象,元素內容為(k, W),返回的DStream中包含的內容是
(k, (V, W))。這個方法也可以傳入一個並行計算的參數,該參數與reduceByKey中是相同的。
下面代碼中,首先將words轉化成 (word, (word + "_one"))和 (word, (word + "_two"))的形式,再以word為key,將后面的value合並到一起。
[Scala]
純文本查看 復制代碼
|
1
2
3
|
val
wordsOne
=
words.map(word
=
> (word , word +
"_one"
))
val
wordsTwo
=
words.map(word
=
> (word , word +
"_two"
))
val
joinWords
=
wordsOne.join(wordsTwo)
|
運行結果如下:
<ignore_js_op>
10、cogroup(otherStream, [numTasks])
由一個DStream對象調用該方法,元素內容為(k, V),傳入另一個DStream對象,元素內容為(k, W),返回的DStream中包含的內容是
(k, (Seq[V], Seq[W]))。這個方法也可以傳入一個並行計算的參數,該參數與reduceByKey中是相同的。
下面代碼首先將words轉化成 (word, (word + "_one"))和 (word, (word + "_two"))的形式,再以word為key,將后面的value合並到一起。
結果如下:
<ignore_js_op>
11、transform(func)
12、updateStateByKey(func)
二、Window Operations
我覺得用一個成語,管中窺豹,基本上就能夠很形象的解釋什么是窗口函數了。DStream數據流就是那只豹子,窗口就是那個管,以一個固定的速率平移,就能夠每次看到豹的一部分。
窗口函數,就是在DStream流上,以一個可配置的長度為窗口,以一個可配置的速率向前移動窗口,根據窗口函數的具體內容,分別對當前窗口中的這一波數據采取某個對應的操作算子。需要注意的是窗口長度,和窗口移動速率需要是batch time的整數倍。接下來演示Spark Streaming中提供的主要窗口函數。
1、window(windowLength, slideInterval)
該操作由一個DStream對象調用,傳入一個窗口長度參數,一個窗口移動速率參數,然后將當前時刻當前長度窗口中的元素取出形成一個新的DStream。
下面的代碼以長度為3,移動速率為1截取源DStream中的元素形成新的DStream。
[Scala]
純文本查看 復制代碼
|
1
|
val
windowWords
=
words.window(Seconds(
3
), Seconds(
1
))
|
運行結果如下:
<ignore_js_op>
基本上每秒輸入一個字母,然后取出當前時刻3秒這個長度中的所有元素,打印出來。從上面的截圖中可以看到,下一秒時已經看不到a了,再下一秒,已經看不到b和c了。表示a, b, c已經不在當前的窗口中。
2、 countByWindow(windowLength,slideInterval)
返回指定長度窗口中的元素個數。
代碼如下,統計當前3秒長度的時間窗口的DStream中元素的個數:
[Scala]
純文本查看 復制代碼
|
1
|
val
windowWords
=
words.countByWindow(Seconds(
3
), Seconds(
1
))
|
結果如下:
<ignore_js_op>
3、 reduceByWindow(func, windowLength,slideInterval)
類似於上面的reduce操作,只不過這里不再是對整個調用DStream進行reduce操作,而是在調用DStream上首先取窗口函數的元素形成新的DStream,然后在窗口元素形成的DStream上進行reduce。
代碼如下:
[Scala]
純文本查看 復制代碼
|
1
|
val
windowWords
=
words.reduceByWindow(
_
+
"-"
+
_
, Seconds(
3
) , Seconds(
1
))
|
結果如下:
<ignore_js_op>
4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
調用該操作的DStream中的元素格式為(k, v),整個操作類似於前面的reduceByKey,只不過對應的數據源不同,reduceByKeyAndWindow的數據源是基於該DStream的窗口長度中的所有數據。該操作也有一個可選的並發數參數。
下面代碼中,將當前長度為3的時間窗口中的所有數據元素根據key進行合並,統計當前3秒中內不同單詞出現的次數。
[Scala]
純文本查看 復制代碼
|
1
|
val
windowWords
=
pairs.reduceByKeyAndWindow((a
:
Int , b
:
Int)
=
> (a + b) , Seconds(
3
) , Seconds(
1
))
|
結果如下:
<ignore_js_op>
5、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])
這個窗口操作和上一個的區別是多傳入一個函數invFunc。前面的func作用和上一個reduceByKeyAndWindow相同,后面的invFunc是用於處理流出rdd的。
在下面這個例子中,如果把3秒的時間窗口當成一個池塘,池塘每一秒都會有魚游進或者游出,那么第一個函數表示每由進來一條魚,就在該類魚的數量上累加。而第二個函數是,每由出去一條魚,就將該魚的總數減去一。
[Scala]
純文本查看 復制代碼
|
1
|
val
windowWords
=
pairs.reduceByKeyAndWindow((a
:
Int, b
:
Int )
=
> (a + b) , (a
:
Int, b
:
Int)
=
> (a - b) , Seconds(
3
), Seconds(
1
))
|
下面是演示結果,最終的結果是該3秒長度的窗口中歷史上出現過的所有不同單詞個數都為0。
<ignore_js_op>
一段時間不輸入任何信息,看一下最終結果
<ignore_js_op>
6、 countByValueAndWindow(windowLength,slideInterval, [numTasks])
類似於前面的countByValue操作,調用該操作的DStream數據格式為(K, v),返回的DStream格式為(K, Long)。統計當前時間窗口中元素值相同的元素的個數。
代碼如下
[Scala]
純文本查看 復制代碼
|
1
|
val
windowWords
=
words.countByValueAndWindow(Seconds(
3
), Seconds(
1
))[/align]
|
結果如下
<ignore_js_op>
三、Join Operations
Join主要可分為兩種,
1、DStream對象之間的Join
這種join一般應用於窗口函數形成的DStream對象之間,具體可以參考第一部分中的join操作,除了簡單的join之外,還有leftOuterJoin, rightOuterJoin和fullOuterJoin。
2、DStream和dataset之間的join
這一種join,可以參考前面transform操作中的示例。
四、Output Operations
在Spark Streaming中,DStream的輸出操作才是DStream上所有transformations的真正觸發計算點,這個類似於RDD中的action操作。經過輸出操作DStream中的數據才能與外部進行交互,比如將數據寫入文件系統、數據庫,或其他應用中。
1、print()
print操作會將DStream每一個batch中的前10個元素在driver節點打印出來。
看下面這個示例,一行輸入超過10個單詞,然后將這行語句分割成單個單詞的DStream。
[Scala]
純文本查看 復制代碼
|
1
2
|
val
words
=
lines.flatMap(
_
.split(
" "
))
words.print()
|
看看print后的效果。
<ignore_js_op>
2、saveAsTextFiles(prefix, [suffix])
這個操作可以將DStream中的內容保存為text文件,每個batch的數據單獨保存為一個文夾,文件夾名前綴參數必須傳入,文件夾名后綴參數可選,最終文件夾名稱的完整形式為
prefix-TIME_IN_MS[.suffix]
比如下面這一行代碼
[Scala]
純文本查看 復制代碼
|
1
|
lines.saveAsTextFiles(
"satf"
,
".txt"
)[/align][align
=
left]
|
看一下執行結果,在當前項目路徑下,每秒鍾生成一個文件夾,打開的兩個窗口中的內容分別是nc窗口中的輸入。
<ignore_js_op>
另外,如果前綴中包含文件完整路徑,則該text文件夾會建在指定路徑下,如下圖所示
<ignore_js_op>
3、saveAsObjectFiles(prefix, [suffix])
這個操作和前面一個類似,只不過這里將DStream中的內容保存為SequenceFile文件類型,這個文件中保存的數據都是經過序列化后的Java對象。
實驗略過,可參考前面一個操作。
4、saveAsHadoopFiles(prefix, [suffix])
實驗略過,可參考前面一個操作。
這個操作和前兩個類似,將DStream每一batch中的內容保存到HDFS上,同樣可以指定文件的前綴和后綴。
5、foreachRDD(func)
