spark教程(16)-Streaming 之 DStream 詳解


DStream 其實是 RDD 的序列,它的語法與 RDD 類似,分為 transformation(轉換) 和 output(輸出) 兩種操作;

DStream 的轉換操作分為 無狀態轉換 和 有狀態轉換,且 tansformation 也是惰性的;

DStream 的輸出操作請參考 我的博客 Streaming

 

無狀態轉換

轉換操作只作用於單個 RDD,即單個數據流的 batch;

例如,每次根據采集到的數據流統計單詞個數,第一次采集到的是  a 2個 b 1個,第二次采集到的是 a 1個 b 1個,那么第二次的輸出也是 a 1 b 1,並不與前面的累計

 

DStream 的轉換操作最終還是會轉化成 RDD 的轉換操作,這個轉化由 spark streaming 完成

本文只介紹部分 API,詳細請參考源碼 /usr/lib/spark/python/pyspark/streaming/dstream.py

 

基本轉換

為了解釋下面的操作,假定輸入都一樣,如下

hellp spark
hello hadoop hive

 

map(self, f, preservesPartitioning=False):生成一個新的 DStream,不解釋了

[u'hellp', u'spark']
[u'hello', u'hadoop', u'hive']

flatMap(self, f, preservesPartitioning=False):把一個 func 作用於 DStream 的所有元素,並將生成的結果展開

hellp
spark
hello
hadoop
hive

filter(self, f):不解釋了

repartition(self, numPartitions):把 DStream 中每個 RDD 進行分區,用於提高並發

union(self, other):在一個 RDD 上再加一個 RDD

 

聚合轉換

count(self):

reduce(self, func):

countByValue(self):返回一個由鍵值對型 RDD 構成的 DStream

# 輸入 
a
a
b
# 輸出
(u'a', 2)
(u'b', 1)

 

鍵值對轉換

cogroup(self, other, numPartitions=None):把兩個 kv 構成的 DStream 根據 key 進行合並,注意是取 key 的並集

sc = SparkContext()

ssc = StreamingContext(sc, 30)
line1 = ssc.socketTextStream('192.168.10.11', 9999)
out1 = line1.map(lambda x: (len(x), x)).groupByKey()    # value 沒有進行任何操作
out1.pprint()

line2 = ssc.socketTextStream('192.168.10.11', 9998)
out2 = line2.map(lambda x: (len(x), x)).groupByKey()
out2.pprint()

out3 = out1.cogroup(out2)
out3.pprint()

ssc.start()
ssc.awaitTermination()

很遺憾,合並之后,value 是個對象集,不可見

 

join(self, other, numPartitions=None):把兩個 kv 構成的 DStream 連接起來

如 (K,V) 構成的 DStream 和 (K,W) 構成的 DStream 連接后是 (K, (V,W))

同樣有 左連接、右連接、全連接

out3 = out1.join(out2)      # 內連接   二者都有
out3 = out1.leftOuterJoin(out2) # 左連接   左邊有,找對應右邊的
out3 = out1.rightOuterJoin(out2)    # 右連接  右邊有,找對應左邊的
out3 = out1.fullOuterJoin(out2)     # 全連接   笛卡爾內積

 

reduceByKey(self, func, numPartitions=None):

groupByKey(self, numPartitions=None):將構成 DStream 的 RDD 中的元素進行 分組 

 

還有很多,不一一解釋了,記住本質,這些雖然是 DStream 的 API,但是這些 API 其實是作用於 構成 DStream 的 RDD 上的 

 

transform:這個比較特殊

transform(self, func):
        """
        Return a new DStream in which each RDD is generated by applying a function
        on each RDD of this DStream.

        `func` can have one argument of `rdd`, or have two arguments of
        (`time`, `rdd`)
        """

輸入一個 函數,這個函數作用於 構成 DStream 的每個 RDD ,並返回新的 RDD ,重新構成一個 DStream

transform 用於 機器學習 或者 圖計算 時優勢明顯

 

示例

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext()
ssc = StreamingContext(sc, 20)
line1 = ssc.socketTextStream('192.168.10.11', 9999)
out1 = line1.map(lambda x: (len(x), x))
### 輸入為
# a
# aaa
# aa
out1.pprint()
# (1, u'a')
# (3, u'aaa')
# (2, u'aa')        這是一個 RDD,被 transform 的 func 作用

out2 = out1.transform(lambda x: x.sortByKey())
out2.pprint()
# (1, u'a')
# (2, u'aa')
# (3, u'aaa')

out3 = out1.transform(lambda x: x.mapValues(lambda m: ''))  
out3.pprint()

ssc.start()
ssc.awaitTermination()

 

有狀態轉換

轉換操作作用於之前所有的 RDD 上

例如,每次根據采集到的數據流統計單詞個數,第一次采集到的是  a 2個 b 1個,第二次采集到的是 a 1個 b 1個,那么第二次的輸出也是 a 3 b 2,累計之前的

 

updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):更新當前 RDD 的狀態,不好解釋,用例子幫助理解

需要設置檢查點, 用於保存狀態

示例

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

sc = SparkContext()
ssc = StreamingContext(sc, 20)
ssc.checkpoint('/usr/lib/spark')        # 必須有檢查點

line1 = ssc.socketTextStream('192.168.10.11', 9999)
out1 = line1.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)    # 對當前 RDD 的處理

runningCounts = out1.updateStateByKey(updateFunction)   # 更新狀態
runningCounts.pprint()

ssc.start()
ssc.awaitTermination()

示例操作

 

由於統計全局,所以需要checkpoint數據會占用較大的存儲。而且效率也不高。所以很多時候不建議使用updateStateByKey

 

窗口操作

updateStateByKey 也是一種窗口,只是窗口大小不固定;

這里的窗口就是指滑窗,跟 均值濾波里面的滑窗意思一樣;

這里的滑窗內的元素是 RDD;

滑窗有 窗口尺寸 和 滑動步長 兩個概念

滑窗也是有狀態的轉換

這里的 尺寸 和 步長 都是用時間來描述;

尺寸 是 采集周期的 N 倍,步長 也是 采集周期的 N 倍

 

window(self, windowDuration, slideDuration=None):windowDuration 為 窗口時長,slideDuration 為步長

示例

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext()
ssc = StreamingContext(sc, 20)
line1 = ssc.socketTextStream('192.168.10.11', 9999)

## 先處理再滑窗
out1 = line1.map(lambda x: (len(x), x))
out2 = out1.window(40)
# out2 = out2.groupByKey()    # 滑窗后可繼續處理
out2.pprint()

ssc.start()
ssc.awaitTermination()

示例操作

可以看到有重復計算的內容

 

reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):當 slideDuration < windowDuration 時,計算是有重復的,那么我們可以不用重新獲取或者計算,而是通過獲取舊信息來更新新的信息,這樣可以提高效率

 

函數解釋

window 計算方式如下===

win1 = time1 + time2 + time3

win2 = time3 + time4 + time5

顯然 time3 被重復獲取並計算

 

reduceByWindow 計算方式如下====

win1 = time1 + time2 + time3

win2 = win1 + time4 + time5 -time1 -time2

 

reduceFunc 是對新產生的數據(time4,time5) 進行計算;

invReduceFunc 是對之前的舊數據(time1,time3) 進行計算;

 

示例

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def func1(x, y):
    return x+y

def func2(x, y):
    return -x-y

sc = SparkContext(appName="windowStream", master="local[*]")
# 第二個參數指統計多長時間的數據
ssc = StreamingContext(sc, 10)
ssc.checkpoint("/tmp/window")
lines = ssc.socketTextStream('192.168.10.11', 9999)
# 第一個參數執行指定函數, 第二個參數是窗口長度, 第三個參數是滑動間隔
lines = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1))
dstream = lines.reduceByWindow(func1, func2, 20, 10)
dstream.pprint()

ssc.start()
ssc.awaitTermination()

 

reduceByWindow 和 window 可實現相同效果;

reduceByWindow 的底層是 reduceByKeyAndWindow,用法也完全相同,reduceByKeyAndWindow 的效率更高

需要設置檢查點, 用於保存狀態

 

reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None,
numPartitions=None, filterFunc=None):與 reduceByWindow 的區別是 它的輸入需要 kv 對

dstream = lines.reduceByKeyAndWindow(func1, func2, 20, 10)

 

countByWindow(self, windowDuration, slideDuration):計數

需要設置檢查點, 用於保存狀態

示例

sc = SparkContext()
ssc = StreamingContext(sc, 20)
line1 = ssc.socketTextStream('192.168.10.11', 9999)

ssc.checkpoint('/usr/lib/spark')

## 先處理再滑窗
out1 = line1.map(lambda x: x)
out2 = out1.countByWindow(40, 20)
# out2 = out2.groupByKey()    # 滑窗后可繼續處理
out2.pprint()

ssc.start()
ssc.awaitTermination()

 

countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):

groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):

 

不一一解釋了

 

總結

1. 凡是帶 key 的都需要輸入 kv 對

2. 凡是需要記錄上個 狀態的 都需要設置檢查點

 

 

參考資料:

《Spark大數據分析核心概念技術及實踐OCR-2017》 電子書

 https://www.cnblogs.com/libin2015/p/6841177.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM