官方文檔地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html
能實現可擴展,高吞吐,可容錯,的流式處理
從外接數據源接受數據流,處理數據流使用的是復雜的高度抽象的算法函數map reduce join window等
輸出的數據可以存儲到文件系統和數據庫甚至是直接展示在命令行
也可以應用ml 和graph processing在這些數據流上
spark streaming本質還是spark只是實現了所謂的微批量
spark streaming中連續數據流用DStream表示,DStream可以從輸入數據創建,也可以從其他的DStream轉化來
本質上DStream是一組RDD組成的序列
一個迅速上手的例子:
# coding: utf-8
# In[ ]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# In[ ]:
#創建兩個工作線程,將這兩個線程喂給StreamingContext,時間間隔是1秒
#這里有個錯誤Cannot run multiple SparkContexts at once
#參考:http://stackoverflow.com/questions/28259756/how-to-create-multiple-sparkcontexts-in-a-console
#先要嘗試關閉sc才能創建多個SparkContext
try:
sc.stop()
except:
pass
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
#sc.stop()
# In[ ]:
#創建一個DStream,從本機的這個端口取數據
lines = ssc.socketTextStream("localhost", 9999)
# In[ ]:
#lines中的數據記錄是一行行的文本,下面將文本切割成字
words = lines.flatMap(lambda line: line.split(" "))
#這里的flatMap是一對多DStream操作,生成一個新的DStream,就是變成了字流了
# In[ ]:
#下面數一數每一批次的字的個數
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# In[ ]:
# 打印DStream中的每個RDD的前十個元素
wordCounts.pprint()
# In[ ]:
ssc.start() # 開始計算
ssc.awaitTermination() #等待計算停止
# In[ ]:
#將這個文件的py腳本提交計算: ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
#在命令行輸入nc -lk 9999 然后模擬輸入字符串文本,那么在pyspark命令行會打印出每秒鍾輸入的數據的統計結果
基本概念
要想寫自己的streaming程序,首先要添加maven或者sbt的依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
對於外部輸入流的依賴現在不在核心api中了,需要單獨添加依賴。
初始化StreamingContext
可以從SparkContext對象中創建
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
appName是程序名字可以在UI中顯示
master是Spark,Mesos,YARN cluster URL 或者是 聲明的Local[*]字符串使得運行在本地模式
當運行在集群上的時候,不要寫死在代碼里面,而是要從spark-submit啟動,傳遞進去。
對於本地測試或者單元測試,可以傳遞local[*]
在context被定義之后,要做下面的事情:
1. 通過創建DStream去定義輸入資源
2. 通過對DStream的轉換和輸出操作定義流的計算
3. 使用streamingContext.start()開始接收數據並處理數據
4. 使用streamingContext.awaitTermination()等待處理過程停止(手動或者因為錯誤)
5. 可以使用streamingContext.stop()手動停止處理過程
要點:
1. 一旦一個context被啟動,不能再添加任何新的流進去了
2. 一旦context被停止,就不能重啟了
3. 在JVM中只能有一個StreamingContext被激活
4. 在streamingContext上使用stop()也會停止SparkContext(),要想單獨停止前者,設置stop()的可選的參數 stopSparkContext()參數為false
5. 一個sparkContext可以被重復使用,去創建多個StreamingContext,只要前一個StreamingContext被單獨停止,下一個就可以接着創建。
Discretized Streams(DStreams)
是Spark Streaming的基本的抽象,代表了一個連續的數據流
可以是從數據源接受的輸入數據流,也可以是轉換輸入數據流得到的數據流
一個DStream代表一串RDD,RDD是不可分割的基本數據單元抽象。
每一個在DStream中的RDD包含特定時間間隔的數據
如果時間是1秒的話,從0-1秒的很多RDD,與從1-2的RDD等,組成了DStream
任何對DStream的操作,都會被翻譯成對底層的RDD的操作,例如,將Lines轉換成words的操作
對DStream的操作,隱藏了很多細節,給開發者提供高度抽象的API
輸入DStreams和Receivers
每一個輸出DStream(除了file strem)都關聯一個Receiver對象,這個對象從數據源接受數據存儲在spark的內存中等待處理。
Spark Streaming支持兩種類型的內建數據源
1. Basic Sources:直接在StreamingContext API中可用的Sources,比如file systems和socket connections
2. Advances Sources:從外部工具類中調過來的例如Kafka,Flume,Kinesis等,需要鏈接一些外部依賴。
關鍵點:
1. 注意在本地運行SparkStreaming的時候,不要使用local或者local[1]作為主機的URL,因為這些都是意味着開一個線程,因為如果只是輸入一個數據源,那么這個單一的線程會用來運行receiver,那么沒有線程去處理接收到的數據了。所以本地運行的時候,參數local[n]中的n最好大於運行中的receiver。
2. 相應的在集群上運行的時候,分配的核心數要比接收者的數目多,否則的話系統能接收數據,但是不能處理。
Basic Sources 基礎數據源
在基礎例子中已經看到過ssc.socketTextStream(),下面看file streams
streamingContext.textFileStream(dataDirectory)
可以創建一個DStream
spark會監控這個路徑,處理在那個路徑中的任何文件
注意:
1. 路徑中的文件要有相同的數據格式
2. 文件必須通過自動專業或者重命名進入到這個路徑的
3. 一旦進入,這些文件不能被改變,所以如果文件被連續附加,那么新的數據不能被讀取的
針對簡單的文本文件,有個簡單的方法streamingContext.textFileStream(dataDirectory)
因為file stream不需要運行receiver所以不需要分配核心或者線程去處理
Python API不支持fileStream只是支持textFileStream
可以使用RDD的queue去創建DStream,使用streamingContext.queueStream(queueofRDDs)
Advanced Souces 高級數據源
As of Spark 2.0.0, Kafka, Kinesis and Flume are available in the Python API.
因為這些高級的數據源的支持比較復雜,需要依賴單獨的包,現在被轉移出了核心的API,所以不能再shell中使用,也就不能在shell中測試這些數據源。如果非要的話,需要下載對應的maven jar包,和對應的依賴,然后添加到classpath
Custom Sources自定義數據源
現在python還不支持,但是要想從自定義的數據源創建DStream,就要自己實現用戶定義的receiver,這可以接受自定義的數據,並且發送到spark中
Receiver Reliability 接收者的可靠性
按照可靠性可以把數據分為兩種,有的數據源例如Kafka和Flume運行傳送被回復的數據、
如果系統正確接受到這些要被確認的數據,可以保證不會因為某種失敗而導致數據丟失。這導致兩種類型的接收者。
1. 可靠的接收者:當數據被接受並存儲到spark之后,必須回復確認消息給可靠數據源。
2. 不可靠的接收者:不用回復確認,針對沒有確認機制的數據源,或者有確認機制但是不需要執行復雜確認機制的數據源。
Transformations on DStreams DStream的轉換
| map(fun) | 這個函數將輸入的DStream的每一個元素傳遞給func得到一個新的DStream |
| flatMap(func) | 同上,只是每個輸入可以map到多個輸出項 |
| filter(func) | 選擇func返回結果為true的DStream中的記錄組成新的DStream |
| reparitition(numPartitions) | 通過改變划分去改變DStream的並行水平 |
| union(otherStream) | 合並 |
| count() | 返回一個新的DStream,是原始的DStream中的每個RDD的元素的數目 |
| reduce(func) | 使用函數func聚合原始數據匯總的每個RDD得到一個新的單一元素RDD組成的DStream |
| countByValue() | 調用類型K的DStream時候返回一個新的DStream有(K,long)對,其中long是k在每個RDD中出現的頻率 |
| reduceByKey(func,[numTasks]) | 將(k,v)中的v按照k使用func進行聚合 |
| join(otherStream,[numTasks]) | (k,v)(k,w)得到(k,(v,w)) |
| cogroup(otherStream,[numTasks]) | (k,v)(k,w)得到(k,Seq[V],Seq[W]) |
| tansform(func) | 作用在每個RDD上得到新的RDD組成的DStream |
| updateStateByKey(func) | 每個鍵都是通過將給定的函數作用在其值上得到的新的DStream |
下面是對某些轉換的詳細的討論
UpdateStateByKey Operation
允許當使用新的信息連續更新的時候,維護任意的狀態
1. 定義狀態,這個狀態可以是任意的數據類型
2. 定義狀態的更新函數
不管有沒有數據,spark都會更新狀態,如果更新函數返回為none那么鍵值對就會被消除
假設想要維持一個運行時數目,那么運行時數目就是一個狀態,是個整數,下面是一個更新函數
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
假設使用前面的paris DStream包含(word,1)對
runningCounts = pairs.updateStateByKey(updateFunction)
轉換操作:
tranform操作允許任意的RDD-to-RDD函數應用到DStream,下面是一個例子:
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
窗口操作:
允許我們應用transformation到一個滑動窗口的數據上
上面的例子說明了每個窗口操作要聲明下面的兩個參數
windows length:窗口的長度,上面的例子是3
sliding interval:窗口被執行的時間間隔,例子中的書2
上面的兩個參數都應該是元素DStream批間隔(上面的間隔是1)的整數倍
下面是一個窗口操作的例子,假設我們想生成過去的30秒的數據的wordcounts,每10秒鍾一次
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
下面是一個
常見的窗口操作的描述,所有的操作都傳遞兩個參數,窗口的長度和時間間隔
| window(長度,間隔) | 原來的DStream按照新的指定窗口進行切分返回新的DStream |
| countByWindow(長度,間隔) | 返回滑動窗口的元素個數 |
| reduceByWindow(func, windowLength, slideInterval) |
讀原來的DStream數據進行聚合得到新的DStream |
| reduceByKeyAndWindow(func,長度,間隔,[numtasks]) | (k,v)中的k被函數合並得到新的DStream |
| reduceByKeyAndWindow(func,invFunc,長度,間隔,[numtasks]) | 比上面的更高效,對窗口內的數據增量聚合和逐步移去得到聚合后新的DStream |
| countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 根據窗口計算每個元素的頻次 |
Join Operations
下面是簡單的流的join
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin
下面是基於窗口的流的Join
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
sream-dataset的join
流和數據集的join操作是使用lambda表達式實現的
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
DStream的輸出操作
| print() |
前十個元素打印出來 |
| saveAsTextFiles(prefix, [suffix]) |
將DStream中的內容以文本方式保存成文件,每次批處理間隔內產生的文件按照prefix-TIME_IN_MS[.suffix]命名 |
| saveAsObjectFiles(prefix, [suffix]) |
將DStream中的內容按對象序列化並且以SequenceFile格式保存,每次批處理間隔文件按照上面的命名 |
| saveAsHadoopFiles(prefix, [suffix]) |
將DStream中的內容按對象序列化並且以hadoop格式保存,每次批處理間隔文件按照上面的命名 |
| foreachRDD(func) |
對每個RDD應用這個函數,將RDD保存在外部文件中 |
Design Patterns for using foreachRDD
foreachRDD的設計模式
dstream.foreachRDD非常強大,但是容易出錯
將數據寫到外部系統需要創建一個連接對象,使用這個對象例如Tcp Connection發送數據到遠程的系統
開發者可能會錯誤的連接到Spark Driver,然后試圖在worker中使用將數據保存到RDD中
例如:
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
這是錯誤的,因為這要求連接對象序列化並且從driver發送到worker
這樣的連接對象很少能跨機器轉讓
正確的做法是在worker中創建連接對象
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
通常的,創建一個對象需要時間和資源的管理費用,因此,為每個記錄創建和摧毀連接對象可能會帶來不必要的管理費用,這可能會顯著降低系統的吞吐量,一個更好的解決方案是使用rdd.foreachPartition去創造唯一連接對象,並且用這個對象發送所有的RDD。
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
最終的優化是,跨RDD或者批次,重用連接對象
程序員可以維護一個連接對象的靜態的池。
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
