DStream 其實是 RDD 的序列,它的語法與 RDD 類似,分為 transformation(轉換) 和 output(輸出) 兩種操作;
DStream 的轉換操作分為 無狀態轉換 和 有狀態轉換,且 tansformation 也是惰性的;
DStream 的輸出操作請參考 我的博客 Streaming
無狀態轉換
轉換操作只作用於單個 RDD,即單個數據流的 batch;
例如,每次根據采集到的數據流統計單詞個數,第一次采集到的是 a 2個 b 1個,第二次采集到的是 a 1個 b 1個,那么第二次的輸出也是 a 1 b 1,並不與前面的累計
DStream 的轉換操作最終還是會轉化成 RDD 的轉換操作,這個轉化由 spark streaming 完成
本文只介紹部分 API,詳細請參考源碼 /usr/lib/spark/python/pyspark/streaming/dstream.py
基本轉換
為了解釋下面的操作,假定輸入都一樣,如下
hellp spark
hello hadoop hive
map(self, f, preservesPartitioning=False):生成一個新的 DStream,不解釋了
[u'hellp', u'spark'] [u'hello', u'hadoop', u'hive']
flatMap(self, f, preservesPartitioning=False):把一個 func 作用於 DStream 的所有元素,並將生成的結果展開
hellp
spark
hello
hadoop
hive
filter(self, f):不解釋了
repartition(self, numPartitions):把 DStream 中每個 RDD 進行分區,用於提高並發
union(self, other):在一個 RDD 上再加一個 RDD
聚合轉換
count(self):
reduce(self, func):
countByValue(self):返回一個由鍵值對型 RDD 構成的 DStream
# 輸入 a a b # 輸出 (u'a', 2) (u'b', 1)
鍵值對轉換
cogroup(self, other, numPartitions=None):把兩個 kv 構成的 DStream 根據 key 進行合並,注意是取 key 的並集
sc = SparkContext() ssc = StreamingContext(sc, 30) line1 = ssc.socketTextStream('192.168.10.11', 9999) out1 = line1.map(lambda x: (len(x), x)).groupByKey() # value 沒有進行任何操作 out1.pprint() line2 = ssc.socketTextStream('192.168.10.11', 9998) out2 = line2.map(lambda x: (len(x), x)).groupByKey() out2.pprint() out3 = out1.cogroup(out2) out3.pprint() ssc.start() ssc.awaitTermination()
很遺憾,合並之后,value 是個對象集,不可見
join(self, other, numPartitions=None):把兩個 kv 構成的 DStream 連接起來
如 (K,V) 構成的 DStream 和 (K,W) 構成的 DStream 連接后是 (K, (V,W))
同樣有 左連接、右連接、全連接
out3 = out1.join(out2) # 內連接 二者都有 out3 = out1.leftOuterJoin(out2) # 左連接 左邊有,找對應右邊的 out3 = out1.rightOuterJoin(out2) # 右連接 右邊有,找對應左邊的 out3 = out1.fullOuterJoin(out2) # 全連接 笛卡爾內積
reduceByKey(self, func, numPartitions=None):
groupByKey(self, numPartitions=None):將構成 DStream 的 RDD 中的元素進行 分組
還有很多,不一一解釋了,記住本質,這些雖然是 DStream 的 API,但是這些 API 其實是作用於 構成 DStream 的 RDD 上的
transform:這個比較特殊
transform(self, func): """ Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream. `func` can have one argument of `rdd`, or have two arguments of (`time`, `rdd`) """
輸入一個 函數,這個函數作用於 構成 DStream 的每個 RDD ,並返回新的 RDD ,重新構成一個 DStream
transform 用於 機器學習 或者 圖計算 時優勢明顯
示例
from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext() ssc = StreamingContext(sc, 20) line1 = ssc.socketTextStream('192.168.10.11', 9999) out1 = line1.map(lambda x: (len(x), x)) ### 輸入為 # a # aaa # aa out1.pprint() # (1, u'a') # (3, u'aaa') # (2, u'aa') 這是一個 RDD,被 transform 的 func 作用 out2 = out1.transform(lambda x: x.sortByKey()) out2.pprint() # (1, u'a') # (2, u'aa') # (3, u'aaa') out3 = out1.transform(lambda x: x.mapValues(lambda m: '')) out3.pprint() ssc.start() ssc.awaitTermination()
有狀態轉換
轉換操作作用於之前所有的 RDD 上
例如,每次根據采集到的數據流統計單詞個數,第一次采集到的是 a 2個 b 1個,第二次采集到的是 a 1個 b 1個,那么第二次的輸出也是 a 3 b 2,累計之前的
updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):更新當前 RDD 的狀態,不好解釋,用例子幫助理解
需要設置檢查點, 用於保存狀態
示例
from pyspark import SparkContext from pyspark.streaming import StreamingContext def updateFunction(newValues, runningCount): if runningCount is None: runningCount = 0 return sum(newValues, runningCount) # add the new values with the previous running count to get the new count sc = SparkContext() ssc = StreamingContext(sc, 20) ssc.checkpoint('/usr/lib/spark') # 必須有檢查點 line1 = ssc.socketTextStream('192.168.10.11', 9999) out1 = line1.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) # 對當前 RDD 的處理 runningCounts = out1.updateStateByKey(updateFunction) # 更新狀態 runningCounts.pprint() ssc.start() ssc.awaitTermination()
示例操作

由於統計全局,所以需要checkpoint數據會占用較大的存儲。而且效率也不高。所以很多時候不建議使用updateStateByKey
窗口操作
updateStateByKey 也是一種窗口,只是窗口大小不固定;
這里的窗口就是指滑窗,跟 均值濾波里面的滑窗意思一樣;
這里的滑窗內的元素是 RDD;
滑窗有 窗口尺寸 和 滑動步長 兩個概念

滑窗也是有狀態的轉換
這里的 尺寸 和 步長 都是用時間來描述;
尺寸 是 采集周期的 N 倍,步長 也是 采集周期的 N 倍;
window(self, windowDuration, slideDuration=None):windowDuration 為 窗口時長,slideDuration 為步長
示例
from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext() ssc = StreamingContext(sc, 20) line1 = ssc.socketTextStream('192.168.10.11', 9999) ## 先處理再滑窗 out1 = line1.map(lambda x: (len(x), x)) out2 = out1.window(40) # out2 = out2.groupByKey() # 滑窗后可繼續處理 out2.pprint() ssc.start() ssc.awaitTermination()
示例操作

可以看到有重復計算的內容
reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):當 slideDuration < windowDuration 時,計算是有重復的,那么我們可以不用重新獲取或者計算,而是通過獲取舊信息來更新新的信息,這樣可以提高效率
函數解釋

window 計算方式如下===
win1 = time1 + time2 + time3
win2 = time3 + time4 + time5
顯然 time3 被重復獲取並計算
reduceByWindow 計算方式如下====
win1 = time1 + time2 + time3
win2 = win1 + time4 + time5 -time1 -time2
reduceFunc 是對新產生的數據(time4,time5) 進行計算;
invReduceFunc 是對之前的舊數據(time1,time3) 進行計算;
示例
from pyspark import SparkContext from pyspark.streaming import StreamingContext def func1(x, y): return x+y def func2(x, y): return -x-y sc = SparkContext(appName="windowStream", master="local[*]") # 第二個參數指統計多長時間的數據 ssc = StreamingContext(sc, 10) ssc.checkpoint("/tmp/window") lines = ssc.socketTextStream('192.168.10.11', 9999) # 第一個參數執行指定函數, 第二個參數是窗口長度, 第三個參數是滑動間隔 lines = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)) dstream = lines.reduceByWindow(func1, func2, 20, 10) dstream.pprint() ssc.start() ssc.awaitTermination()
reduceByWindow 和 window 可實現相同效果;
reduceByWindow 的底層是 reduceByKeyAndWindow,用法也完全相同,reduceByKeyAndWindow 的效率更高
需要設置檢查點, 用於保存狀態
reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None,
numPartitions=None, filterFunc=None):與 reduceByWindow 的區別是 它的輸入需要 kv 對
dstream = lines.reduceByKeyAndWindow(func1, func2, 20, 10)
countByWindow(self, windowDuration, slideDuration):計數
需要設置檢查點, 用於保存狀態
示例
sc = SparkContext() ssc = StreamingContext(sc, 20) line1 = ssc.socketTextStream('192.168.10.11', 9999) ssc.checkpoint('/usr/lib/spark') ## 先處理再滑窗 out1 = line1.map(lambda x: x) out2 = out1.countByWindow(40, 20) # out2 = out2.groupByKey() # 滑窗后可繼續處理 out2.pprint() ssc.start() ssc.awaitTermination()
countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
不一一解釋了
總結
1. 凡是帶 key 的都需要輸入 kv 對
2. 凡是需要記錄上個 狀態的 都需要設置檢查點
參考資料:
《Spark大數據分析核心概念技術及實踐OCR-2017》 電子書
https://www.cnblogs.com/libin2015/p/6841177.html
