1、簡介
Spark Streaming處理的數據流圖:
Spark Streaming在內部的處理機制是,接收實時流的數據,並根據一定的時間間隔拆分成一批批的數據,然后通過Spark Engine處理這些批數據,最終得到處理后的一批批結果數據。
對應的批數據,在Spark內核對應一個RDD實例,因此,對應流數據的DStream可以看成是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分成一批一批后,通過一個先進先出的隊列,然后 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,然后進行處理,這是一個典型的生產者消費者模型。
1.2 術語定義
l離散流(discretized stream)或DStream:Spark Streaming對內部持續的實時數據流的抽象描述,即我們處理的一個實時數據流,在Spark Streaming中對應於一個DStream 實例。
l批數據(batch data):這是化整為零的第一步,將實時流數據以時間片為單位進行分批,將流處理轉化為時間片數據的批處理。隨着持續時間的推移,這些處理結果就形成了對應的結果數據流了。
l時間片或批處理時間間隔( batch interval):人為地對流數據進行定量的標准,以時間片作為我們拆分流數據的依據。一個時間片的數據對應一個RDD實例。
l窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,
l滑動時間間隔:前一個窗口到后一個窗口所經過的時間長度。必須是批處理時間間隔的倍數
lInput DStream :一個input DStream是一個特殊的DStream,將Spark Streaming連接到一個外部數據源來讀取數據。
2、運行原理
2.1 Streaming架構
SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,可以對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等復雜操作,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。
l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。
圖Spark Streaming構架
2.2 編程模型
DStream(Discretized Stream)作為Spark Streaming的基礎抽象,它代表持續性的數據流。這些數據流既可以通過外部輸入源賴獲取,也可以通過現有的Dstream的transformation操作來獲得。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每個RDD都包含了自己特定時間間隔內的數據流。如圖7-3所示。
圖7-3 DStream中在時間軸下生成離散的RDD序列
對DStream中數據的各種操作也是映射到內部的RDD上來進行的,如圖7-4所示,對Dtream的操作可以通過RDD的transformation生成新的DStream。這里的執行引擎是Spark。
2.2.1 如何使用Spark Streaming
""" Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds. Usage: direct_kafka_wordcount.py <broker_list> <topic> To run this on your local machine, you need to setup Kafka and create a producer first, see http://kafka.apache.org/documentation.html#quickstart
and then run the example `$ bin/spark-submit --jars \ external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \ examples/src/main/python/streaming/direct_kafka_wordcount.py \ localhost:9092 test` """ from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") ssc = StreamingContext(sc, 2) brokers, topic = sys.argv[1:] kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
#這里kafka產生的是一個map, key是null, value是實際發送的數據,所以取x[1] lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
1.創建StreamingContext對象 同Spark初始化需要創建SparkContext對象一樣,使用Spark Streaming就需要創建StreamingContext對象。創建StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱。Spark Streaming需要指定處理數據的時間間隔,如上例所示的2s,那么Spark Streaming會以2s為時間窗口進行數據處理。此參數需要根據用戶的需求和集群的處理能力進行適當的設置;
2.創建InputDStream Spark Streaming需要指明數據源。如socketTextStream,Spark Streaming以socket連接作為數據源讀取數據。當然Spark Streaming支持多種不同的數據源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等數據源;
3.操作DStream 對於從數據源得到的DStream,用戶可以在其基礎上進行各種操作,如上例所示的操作就是一個典型的WordCount執行流程:對於當前時間窗口內從數據源得到的數據首先進行分割,然后利用Map和ReduceByKey方法進行計算,當然最后還有使用print()方法輸出結果;
4.啟動Spark Streaming 之前所作的所有步驟只是創建了執行流程,程序沒有真正連接上數據源,也沒有對數據進行任何操作,只是設定好了所有的執行計划,當ssc.start()啟動后程序才真正進行所有預期的操作。
至此對於Spark Streaming的如何使用有了一個大概的印象,在后面的章節我們會通過源代碼深入探究一下Spark Streaming的執行流程
2.2.3 DStream的操作
與RDD類似,DStream也提供了自己的一系列操作方法,這些操作可以分成三類:普通的轉換操作、窗口轉換操作和輸出操作。
2.2.3.1 普通的轉換操作
普通的轉換操作如下表所示:
轉換 |
描述 |
map(func) |
源 DStream的每個元素通過函數func返回一個新的DStream。 |
flatMap(func) |
類似與map操作,不同的是每個輸入元素可以被映射出0或者更多的輸出元素。 |
filter(func) |
在源DSTREAM上選擇Func函數返回僅為true的元素,最終返回一個新的DSTREAM 。 |
repartition(numPartitions) |
通過輸入的參數numPartitions的值來改變DStream的分區大小。 |
union(otherStream) |
返回一個包含源DStream與其他 DStream的元素合並后的新DSTREAM。 |
count() |
對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。 |
reduce(func) |
使用函數func(有兩個參數並返回一個結果)將源DStream 中每個RDD的元素進行聚 合操作,返回一個內部所包含的RDD只有一個元素的新DStream。 |
countByValue() |
計算DStream中每個RDD內的元素出現的頻次並返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現的頻次。 |
reduceByKey(func, [numTasks]) |
當一個類型為(K,V)鍵值對的DStream被調用的時候,返回類型為類型為(K,V)鍵值對的新 DStream,其中每個鍵的值V都是使用聚合函數func匯總。注意:默認情況下,使用 Spark的默認並行度提交任務(本地模式下並行度為2,集群模式下位8),可以通過配置numTasks設置不同的並行任務數。 |
join(otherStream, [numTasks]) |
當被調用類型分別為(K,V)和(K,W)鍵值對的2個DStream時,返回類型為(K,(V,W))鍵值對的一個新 DSTREAM。 |
cogroup(otherStream, [numTasks]) |
當被調用的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。 |
transform(func) |
通過對源DStream的每RDD應用RDD-to-RDD函數返回一個新的DStream,這可以用來在DStream做任意RDD操作。 |
updateStateByKey(func) |
返回一個新狀態的DStream,其中每個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函數func后的更新。這個方法可以被用來維持每個鍵的任何狀態數據。 |
在上面列出的這些操作中,transform()方法和updateStateByKey()方法值得我們深入的探討一下:
l transform(func)操作
該transform操作(轉換操作)連同其其類似的 transformWith操作允許DStream 上應用任意RDD-to-RDD函數。它可以被應用於未在DStream API 中暴露任何的RDD操作。例如,在每批次的數據流與另一數據集的連接功能不直接暴露在DStream API 中,但可以輕松地使用transform操作來做到這一點,這使得DStream的功能非常強大。
l updateStateByKey操作
該 updateStateByKey 操作可以讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :
(1) 定義狀態 - 狀態可以是任意的數據類型。
(2) 定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。
讓我們用一個例子來說明,假設你要進行文本數據流中單詞計數。在這里,正在運行的計數是狀態而且它是一個整數。我們定義了更新功能如下:
詳細案例參考:
http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#transformations-on-dstreams
此函數應用於含有鍵值對的DStream中(如前面的示例中,在DStream中含有(word,1)鍵值對)。它會針對里面的每個元素(如wordCount中的word)調用一下更新函數,newValues是最新的值,runningCount是之前的值。
2.2.3.2 窗口轉換操作
Spark Streaming 還提供了窗口的計算,它允許你通過滑動窗口對數據進行轉換,窗口轉換操作如下:
轉換 |
描述 |
window(windowLength, slideInterval) |
返回一個基於源DStream的窗口批次計算后得到新的DStream。 |
countByWindow(windowLength,slideInterval) |
返回基於滑動窗口的DStream中的元素的數量。 |
reduceByWindow(func, windowLength,slideInterval) |
基於滑動窗口對源DStream中的元素進行聚合操作,得到一個新的DStream。 |
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) |
基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操作,得到一個新的DStream。 |
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) |
一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合並移去最早的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那么我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法可以復用中間三秒的統計量,提高統計的效率。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) |
基於滑動窗口計算源DStream中每個RDD內每個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue一樣,reduce任務的數量可以通過一個可選參數進行配置。
|
2.2.3.3 輸出操作
Spark Streaming允許DStream的數據被輸出到外部系統,如數據庫或文件系統。由於輸出操作實際上使transformation操作后的數據可以通過外部系統被使用,同時輸出操作觸發所有DStream的transformation操作的實際執行(類似於RDD操作)。以下表列出了目前主要的輸出操作:
轉換 |
描述 |
print() |
在Driver中打印出DStream中數據的前10個元素。 |
saveAsTextFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存為文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) |
將DStream中的內容按對象序列化並且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存為Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) |
最基本的輸出操作,將func函數應用於DStream中的RDD上,這個操作會輸出數據到外部系統,比如保存RDD到文件或者網絡數據庫等。需要注意的是func函數是在運行該streaming應用的Driver進程里執行的。 |
dstream.foreachRDD是一個非常強大的輸出操作,它允將許數據輸出到外部系統。詳細案例請參考:
http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#output-operations-on-dstreams
3、spark整合kafka
用spark streaming流式處理kafka中的數據,第一步當然是先把數據接收過來,轉換為spark streaming中的數據結構Dstream。接收數據的方式有兩種:1.利用Receiver接收數據,2.直接從kafka讀取數據。
基於Receiver的方式
這種方式利用接收器(Receiver)來接收kafka中的數據,其最基本是使用Kafka高階用戶API接口。對於所有的接收器,從kafka接收來的數據會存儲在spark的executor中,之后spark streaming提交的job會處理這些數據。如下圖:
還有幾個需要注意的點:
- 在Receiver的方式中,ssc中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理數據上的並行度。
- 對於不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來並行接收數據,之后可以利用union來統一成一個Dstream。
- 如果我們啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
構造函數為KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
對於所有的receivers接收到的數據將會保存在spark executors中,然后通過Spark Streaming啟動job來處理這些數據,默認會丟失,可啟用WAL日志,該日志存儲在HDFS上
直接讀取方式
在spark1.3之后,引入了Direct方式。不同於Receiver的方式,Direct方式沒有receiver這一層,其會周期性的獲取Kafka中每個topic的每個partition中的最新offsets,之后根據設定的maxRatePerPartition偏移量范圍來處理每個batch。其形式如下圖:
這種方法相較於Receiver方式的優勢在於:
- 簡化的並行:在Receiver的方式中我們提到創建多個Receiver之后利用union來合並成一個Dstream的方式提高數據傳輸並行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的並行讀取Kafka數據,會創建和kafka分區一樣的rdd個數。
- 高效:在Receiver的方式中,為了達到0數據丟失需要將數據存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數據,第一次是被kafka復制,另一次是寫到wal中,浪費!而第二種方式不存在這個問題,只要我們Kafka的數據保留時間足夠長,我們都能夠從Kafka進行數據恢復。
- 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取數據的方式,但由於Spark Streaming消費的數據和Zookeeper中記錄的offset不同步,這種方式偶爾會造成數據重復消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了zk和ssc偏移量不一致的問題。缺點是無法使用基於zookeeper的kafka監控工具。
以上主要是對官方文檔[1]的一個簡單翻譯,詳細內容大家可以直接看下官方文檔這里不再贅述。
http://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html
不同於Receiver的方式,是從Zookeeper中讀取offset值,那么自然zookeeper就保存了當前消費的offset值,那么如果重新啟動開始消費就會接着上一次offset值繼續消費。
而在Direct的方式中,我們是直接從kafka來讀數據,那么offset需要自己記錄,可以利用checkpoint、數據庫或文件記錄或者回寫到zookeeper中進行記錄。這里我們給出利用Kafka底層API接口,將offset及時同步到zookeeper中的通用類,我將其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一個通用類,而KafkaCluster是kafka源碼中的一個類,由於包名權限的原因我把它單獨提出來,ComsumerMain簡單展示了通用類的使用方法,在每次創建KafkaStream時,都會先從zooker中查看上次的消費記錄offsets,而每個batch處理完成后,會同步offsets到zookeeper中。
refer:http://blog.csdn.net/zhong_han_jun/article/details/50814038
參考:
Spark入門實戰系列--7.Spark Streaming(上)--實時流計算Spark Streaming原理介紹
Spark Streaming 與 Kafka 集成分析
http://blog.selfup.cn/1665.html#comments
Spark踩坑記——Spark Streaming+Kafka