1.Spark Streaming是什么
Spark Streaming是在Spark上建立的可擴展的高吞吐量實時處理流數據的框架,數據可以是來自多種不同的源,例如kafka,Flume,Twitter,ZeroMQ或者TCP Socket等。在這個框架下,支持對流數據的各種運算,比如map,reduce,join等。處理過后的數據可以存儲到文件系統或數據庫。
利用Spark Streaming,你可以使用與批量加載數據相同的API來創建數據管道,並通過數據管道處理流式數據。此外,Spark Steaming的“micro-batching”方式提供相當好的彈性來應對某些原因造成的任務失敗。
2. Spark Streaming的基本原理
Spark Streaming對數據的處理方式主要采用的方法是對Stream數據進行時間切片,分成小的數據片段,通過類似批處理的方式處理數據片段。
Spark Streaming把實時輸入數據流以時間片Δt (如1秒)為單位切分成塊。Spark Streaming會把每塊數據作為一個RDD,並使用RDD操作處理每一小塊數據。
Spark Streaming將流式計算分解成一系列短小的批處理作業。Spark Streaming的輸入數據分成一段一段的數據(DStreaming),每一段數據都轉換成Spark中的RDD,然后將Spark Streaming中對DStream的操作變為針對Spark中對RDD的操作,將RDD經過操作變成中間結果保存在內存中。
3. DStream
上面提到了DStreaming,那么DStreaming到底是什么呢:
DStreaming相當於在Streaming的框架下對RDD進行封裝,表示的是我們處理的一個實時數據流。類似於RDD,DStream提供了轉換操作,窗口轉換操作和輸出操作三種操作方法。
4.Spark Streaming的優勢
Spark Streaming是一種構建在Spark上的實時計算框架,它擴展了Spark處理大規模流式數據的能力。
實時性:它能運行在100+的結點上,並達到秒級延遲。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會經過Spark的任務集的調度過程。其最小的Batch Size的選取在0.5~2秒鍾之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高的所有流式准實時計算場景。
高效和容錯的特性:對於流式計算來說,容錯性至關重要。在spark中每一個RDD都是一個不可變的分布式可重算的數據集,其記錄着確定性的操作,只要輸入數據是可容錯的,那么任意一個RDD的分區出錯或不可用,都是可以利用原始輸入數據通過轉換操作而重新算出的。而spark Streaming使用基於內存的Spark作為執行引擎, 其容錯性自然很好。
吞吐量:Spark Streaming能集成Spark的批處理和交互查詢,其吞吐量比Storm至少高2~5倍。並且它為實現復雜的算法提供了和批處理類似的簡單接口。
接下來用Spark Streaming連接TCP Socket來說明如何使用Spark Streaming:
1 創建StreamingContext對象
首先使用StreamingContext模塊,這個模塊的作用是提供所有的流數據處理的功能:
1 from pyspark import SparkContext 2 from pyspark.streaming import StreamingContext 3 4 sc = SparkContext("local[2]", "streamwordcount") 5 # 創建本地的SparkContext對象,包含2個執行線程 6 7 ssc = StreamingContext(sc, 2) 8 # 創建本地的StreamingContext對象,處理的時間片間隔時間,設置為2s
2 創建DStream對象
我們需要連接一個打開的 TCP 服務端口,從而獲取流數據,這里使用的源是TCP Socket,所以使用socketTextStream()函數:
lines = ssc.socketTextStream("localhost", 8888) # 創建DStream,指明數據源為socket:來自localhost本機的8888端口
3 對DStream進行操作
我們開始對lines進行處理,首先對當前2秒內獲取的數據進行分割並執行標准的MapReduce流程計算。
words = lines.flatMap(lambda line: line.split(" ")) # 使用flatMap和Split對2秒內收到的字符串進行分割
得到的words是一系列的單詞,再執行下面的操作:
pairs = words.map(lambda word: (word, 1)) # map操作將獨立的單詞映射到(word,1)元組 wordCounts = pairs.reduceByKey(lambda x, y: x + y) # reduceByKey操作對pairs執行reduce操作獲得(單詞,詞頻)元組
5 輸出數據
將處理后的數據輸出到一個文件中:
outputFile = "/home/feige/streaming/ss" # 輸出文件夾的前綴,Spark Streaming會自動使用當前時間戳來生成不同的文件夾名稱 wordCounts.saveAsTextFiles(outputFile) # 將結果輸出
6 啟動應用
要使程序在Spark Streaming上運行起來,需要執行Spark Streaming啟動的流程,調用start()
函數啟動,awaitTermination()
函數等待處理結束的信號。
ssc.start() # 啟動Spark Streaming應用 ssc.awaitTermination()
打開終端執行:
nc -lk 8888
nc的-l
參數表示創建一個監聽端口,等待新的連接。-k
參數表示當前連接結束后仍然保持監聽,必須與-l
參數同時使用。
執行完上面的命令后不關閉終端,我們將在這個終端中輸入一些處理的數據:
打開一個新的終端來執行我們的Spark Streaming應用:
這里是spark streaming執行的過程
現在我們來看看程序執行的效果,程序每隔2秒掃描一次監控窗口輸入的內容,我們查看一下:
結束語:
最近壓力比較大,瑣事諸多,相信這段時間過后一切都會好起來的,加油!!!