1. 流處理的場景
我們在定義流處理時,會認為它處理的是對無止境的數據集的增量處理。不過對於這個定義來說,很難去與一些實際場景關聯起來。在我們討論流處理的優點與缺點時,先介紹一下流處理的常用場景。
- 通知與警報:可能流應用最明顯的例子就是通知(notification)與警報(alerting)。
- 實時報道:許多公司會使用流系統來跑一個實時的、讓每個員工都可以看到的dashboard。例如,一個報告實時游客的dashboard
- 增量ETL:其中一個最常見的流應用就是:減少各個公司在抽取數據到一個數據倉庫中時都必須忍受的延遲。Spark 批處理經常用於ETL的工作負載,將原始數據轉換成一個結構化的格式,例如Parquet,使得后續的query更高效。使用structured streaming,這些jobs可以與新數據在秒級時間內結合,讓下游用戶可以更快地query這些數據。在這個使用場景下,非常關鍵的一點是:數據在處理時必須滿足exactly-once,並且在一種fault-tolerant 形式下。我們不希望任何數據在被放入數據倉庫前丟失,也不希望任何數據被放入兩次。還需要滿足的一點是,流系統在對數據倉庫執行update時,必須是事物地,這樣才不會導致運行其上的query只訪問到部分寫入的數據。
- 實時更新數據:例如有一個dashboard給用戶query數據,流系統可以保持數據的最新版本。一般在這種場景下需要流應用能對一個key-value store(或是其他系統)執行增量更新,作為同步。一般還需要這些更新是事務的,如ETL里一樣,防止應用崩潰。
- 實時決策:例如信用卡欺詐,需要基於歷史的狀態,來判斷信用卡的transaction 是否涉嫌欺詐
- 在線機器學習:類似於實時決策,不過實時決策是hard-code 規則,而在線機器學習是使用持續的數據更新模型,系統更為復雜。一般涉還涉及到多個用戶、與static數據集的join、與機器學習庫集成等等。
2. Continues Vs Micro-Batch Execution
Continues 是一次處理一個record。
優點是:提供盡可能低的latency(如果整體input rate 相對較低的話)
缺點是:吞吐(throughput)不夠高,因為在處理每個record時,會引入額外的overhead(例如調用操作系統發送網絡包到下游)
限制是:operators的拓撲一般是固定的,在runtime時不能被拿走,除非停止整個系統,否則會引入負載不均衡的問題。
Micro-batch系統是等待聚集一個小的batch之后(例如500ms的數據),再使用分布式的tasks對每個batch並行處理。
優點是:
- 對每個節點,都能達到高吞吐,並且還能執行批處理中的優化(例如向量化處理),並且不會引入額外的per-record overhead。所以它可以使用更少的節點處理相同速率的數據。
- 還能使用動態的load balance 技術,處理變化的負載(例如增加節點或減少節點)。
缺點是:下游應用會有更高的延時(latency),因為需要等待一個micro-batch的數據聚集。
在實際應用中,如果一個流應用是非常大規模的、甚至需要做分布式計算的,則傾向於高吞吐優先。
在選擇兩種模式時,主要的選擇因素是:latency以及total costs of operation(TCO)。Micro-batch一般可以完美的deliver latencies 100ms 到 1s左右(取決於 application)。在達到這種延遲時,一般也需要更少的節點達到與continues模式同樣的吞吐,所以它的運作成本也相對更低(包括更低的維護成本,因為節點更少)。若是需要更低的延時,則應考慮continues processing system;或者使用micro-batch系統與一個fast serving layer 結合,以提供低延時的queries(例如,將數據輸出到MySQL或者Cassandra,它們可以為客戶端提供milliseconds級別的服務)
3. Structured Streaming基礎
Structured Streaming 是基於Spark SQL 引擎而構建的流處理框架,它直接使用了已有的Spark structured API(DataFrames,Datasets,以及SQL)。
Structured Streaming的核心思想是:將流數據視為一個數據持續追加的表。然后job會定期檢查新的輸入數據,處理,更新一些內部狀態(存儲在一個state store中),並更新它的結果。它的API很重要的一個特點是:我們並不需要更改代碼來分別batch或是stream處理,僅需要指定query是運行為batch還是streaming模式即可。在Structured Streaming內部會自動決定如何增量處理我們的query,例如:在新數據到 達時,高效地更新它的結果,並且會議fault-tolerant的方式運行。
簡單地說,Structured Streaming就是”your DataFrame,but streaming“。
4. 核心概念
1. Transformation 與 Action:Transformation 與 Spark中的常見transformation 差不多,而對於Action,在Structured Streaming中僅有一個action,就是starting a stream,它會開始持續運行並輸出結果。
2. Input Sources
Structured Streaming 支持多種輸入源,對於Spark 2.2 版本來說,支持Kafka、位於分布式文件系統(例如HDFS、S3)上的文件(Spark僅持續讀取一個文件夾中的新文件)、socket source(用於測試)
3. Sinks
Sinks 指定的是流輸出的目的地。Sinks和處理引擎同時也負責可靠地追蹤數據完整的處理過程。下面是Spark 2.2 支持的output sinks:
- Kafaka
- 幾乎任何文件格式
- 一個foreach sink,用於運行測試計算輸出
- Console sink(用於測試)
- Memory sink(用於debug)
4. 輸出模式(output modes)
為Structured Streaming job 定義了sink后,僅完成了一半的輸出配置。我們還需要定義Spark如何向sink寫入數據。例如,我們是想要追加新數據?還是根據輸入的數據流更新已有數據(例如更新一個給定web page 的點擊量)?我們需要每次都完整地重寫結果集嗎(例如,使用計算出來的完整點擊量,覆蓋之前的文件內容)?諸如此類,我們需要定義一個輸出模式(output mode),類似於static Structured APIs 中的output modes。
支持的輸出模式有:
- Append:僅追加新紀錄到output sink中
- Update:更新有變動的records
- Complete:重寫整個輸出
這里很重要的一點是,對於特定queries、以及特定sinks,僅支持特定的output modes。例如,假設我們的job僅在stream上執行一個map操作。輸出的數據則會隨着輸入的數據不斷到來而趨近與無窮,此時使用Complete 則是不合理的,因為它每次都會將所有數據寫入到一個新文件中。而若是我們的job執行的是一個聚合操作,僅聚合數據到一個有限數量的keys中,則Complete和Update就是合適的,但是Append就不合適,因為有些key的值需要隨時間更新。
5. 觸發器(Triggers)
Output modes 定義的是輸入如何輸出,而triggers定義的是數據何時輸出,也就是說,Structured Streaming什么時候應該檢查新輸入數據並更新它的結果。默認情況下,Structured Streaming會在完成處理了上一組輸入數據后,立即look for 新的輸入records。不過,若是sink是一組文件,則這個行為會導致寫入大量的小的輸出文件。所以,Spark 也支持基於處理時間來觸發trigger(僅以固定時間頻率look for 新數據)
6. Event-Time Processing
Structured Streaming 也支持event-time processing(也就是基於record內自帶的時間戳處理亂序數據)
7. Event-Time 數據
Event-time 表示的是嵌入到數據中的time字段。也就是說,在處理時並非按照record到達的時間,而是record生成的時間,因為records可能會由於網絡等原因造成亂序到達。在Structured Streaming中表示event-time非常簡單,因為這個系統是將輸入數據視為一個table,所以event-time就是table中的一個字段而已。所以應用可以執行grouping、aggregating、以及windowing,使用標准SQL操作符即可。不過在Structured Streaming 內部,它在知道了一個column是一個event-time 字段后,會執行一些特別的actions,包括優化query執行、或是決定什么時候可以放心的忽略一個time window有關的state。大部分這些actions由watermarks控制
8. Watermarks
Watermarks是流系統里的一個功能,可以允許我們指定how late they expect to see data in event time。例如,在一個處理手機終端日志的系統中,我們可能會預期日志能達到最高30分鍾的延遲到達(由於上傳中的delay)。對於支持event time 的系統,包括Structured Streaming,一般會允許設置watermarks,用來限制它們需要remember old data的時長。Watermarks 也可以用於控制什么時候為一個特定的event time window輸出一個結果(例如等待直到它的watermark過去后)。