5. 實戰Structured Streaming
5.1. Static版本
先讀一份static 數據:
val static = spark.read.json("s3://xxx/data/activity-data/") static.printSchema root |-- Arrival_Time: long (nullable = true |-- Creation_Time: long (nullable = true) |-- Device: string (nullable = true) |-- Index: long (nullable = true) |-- Model: string (nullable = true) |-- User: string (nullable = true) |-- gt: string (nullable = true) |-- x: double (nullable = true) |-- y: double (nullable = true) |-- z: double (nullable = true)
看一下部分內容:
5.2. Streaming 版本
下面我們使用同樣的數據集創建一個streaming 版本,在流的情況下,它會一個接一個地讀取每個輸入文件。
Streaming DataFrames與static DataFrame基本一致,基本上,所有static DF中的transformation都可以應用在Streaming DF中。不過,一個小小的區別是:Structured Streaming並不允許我們執行schema 推斷(inference),除非顯式地啟用此功能。可以通過設置spark.sql.streaming.schemaInference 為 true來開啟schema inference。在這個例子中,我們會讀取一個文件的schema(當然我們已知了這些文件具有有效的schema),並從static DF中pass一個dataSchema對象到streaming DF中。不過在實際中一般要避免這么做,因為數據可能是會改變的。
val dataSchema = static.schema val streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).json("s3://tang-spark/data/activity-data")
(這里我們指定了maxFilesPerTrigger為1,僅是為了測試方便,讓stream一次讀一個文件,但是一般在生產環境中不會配置這么低。)
與其他Spark API 一樣,streaming DataFrame 的創建以及執行是lazy的。現在我們可以為streaming DF指定一個transformation,最后調用一個action開始執行這個流。在這個例子中,我們會展示一個簡單的transformation —— 根據字段gt,進行group 並 count:
val activityCounts = streaming.groupBy("gt").count()
若是使用的單機模式,也可以設置一個更小的spark.sql.shuffle.partitions 值(默認為200,Configures the number of partitions to use when shuffling data for joins or aggregations.),以避免創建過多的shuffle partitions:
spark.conf.set("spark.sql.shuffle.partitions", 5)
現在我們已經設置好了transformation,下一步僅需要指定一個action就可以開始執行query了。根據上文所述,我們還需要指定一個輸出的destination,或是針對這個query結果的output sink。對於這個簡單的例子,我們會使用memory sink,它會維護一個在內存中的一個table of the results。
在指定sink時,我們還需要定義Spark如何輸出它的數據,在這個例子中,我們會使用complete 的輸出模式。這個模式會在每次trigger后,重寫所有的keys以及它們的counts值:
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
在我們運行上面的代碼前,我們也要加上這條代碼:
activityQuery.awaitTermination()
在這條代碼開始運行后,streaming 計算將會在后台運行。而這個query object即是active streaming query 對應的handle,而我們必須使用activityQuery.awaitTermination() 等待query的結束,以防止driver進程在query仍是active時退出。在生產環境中必須要加上這條語句,否則stream無法執行。
開始執行:
[Stage 8:==========================> (94 + 4) / 200]
可以看到有 200 個task,因為使用的是默認的 spark.sql.shuffle.partitions 值。
可以使用 spark.streams.active 列出active的stream:
> spark.streams.active res24: Array[org.apache.spark.sql.streaming.StreamingQuery] = Array(org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@612c6083)
它也會為每個stream分配一個UUID,所以如果有必要的話,可以迭代這個list,並選出上面的那個流。不過在這個例子中,我們已經將這個流assign給一個變量了,所以沒有必要。
現在這個流正在運行,我們可以通過query這個in-memory table來檢查結果,這個表維護了當前streaming aggregation的輸出。這個table將被稱為 activity_counts,與流的名稱一樣。在檢查當前output table 的輸出時,僅需要query此表即可。我們會寫一個循環,每秒打出這個streaming query 的結果:
for (i <- 1 to 5){ spark.sql("select * from activity_counts").show() Thread.sleep(1000) }
結果類似於:
整體語句為:
val static = spark.read.json("s3://tang-spark/data/activity-data/") val dataSchema = static.schema val streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).json("s3://tang-spark/data/activity-data") val activityCounts = streaming.groupBy("gt").count() val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start() for (i <- 1 to 5){ spark.sql("select * from activity_counts").show() Thread.sleep(1000) }
6. Streams 中的 Transformations
Streaming中的transformations 基本涵蓋了static DF 中的 transformations,例如select、filter等簡單的transformations。不過仍是有些限制,例如在Spark 2.2 版本中,用戶無法sort一個沒有aggregated的stream,並且在沒有使用Stateful Processing時,無法執行多層的aggregation,等等。這些限制可能會隨着版本更替有所解決,對此最好參考Spark的官方文檔,查看更新情況。
6.1. Selections and Filtering
在Structured Streaming中,支持所有的select 與 filter。下面是一個簡單的例子,由於我們並不會隨着時間更新key,所以會使用Append output mode,將新數據append到輸出表中:
import org.apache.spark.sql.functions.expr val simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'")) .where("stairs") .where("gt is not null") .select("gt", "model", "arrival_time", "creation_time") .writeStream .queryName("simple_transform") .format("memory") .outputMode("append") .start()
下面查詢一下,先可以看看有哪些表:
然后查詢 simple_transform 表:
spark.sql("select * from simple_transform").show()
6.2. Aggregations
Structured Streaming 對 aggregation操作的支持非常好。我們可以指定任意聚合操作。例如,我們可以使用一個比較有特點的聚合(例如Cube),在sensor的phone model 、activity、以及average x,y,z acceleration 字段上聚合Cube:
val deviceModelStats = streaming.cube("gt", "model").avg() .drop("avg(Arrival_time)") .drop("avg(Creation_Time)") .drop("avg(Index)") .writeStream.queryName("device_counts").format("memory").outputMode("complete") .start() spark.sql("select * from device_counts").show()
相信大家已經發現了,這里queryName(“device_counts”) 也就是我們最后生成的表名。
6.3. Joins
在Spark 2.2 版本以后,Structured Streaming支持streaming DataFrames 與 static DataFrames進行join。Spark 2.3 會增加多個streams 的join 功能。
val historicalAgg = static.groupBy("gt", "model").avg() val deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index") .cube("gt", "model").avg() .join(historicalAgg, Seq("gt", "model")) .writeStream.queryName("device_counts_join").format("memory").outputMode("complete") .start() spark.sql("select * from device_counts_join").show()
在Spark 2.2 中,full outer join、left join(stream在右)、right join(stream在左)暫不支持。Structured Streaming 也還沒有支持stream-to-stream 的join,不過這個也是一個正在開發中的功能。
7. Input 與 Output
這節我們會深入討論,在Structured Streaming 中,sources、sinks以及output modes 是如何工作的。特別地,我們會討論數據流是怎樣、何時、從哪,進入以及流出系統。
下面討論的source 與 sink 都是端到端相同的情況一起討論的,當然端到端也可以是不一樣的(例如source 是kafka,sink 是file)
7.1. 數據在哪讀寫(Sources 與 Sinks)
Structured Streaming 支持多個production sources 與 sinks(files 以及Kafka),以及一些debug 工具(例如memory table sink)。我們之前已經介紹過這些,現在進一步討論。
File Source 與 Sink
可能能想到的最簡單的source就是file source,基本上我們能想到的所有文件類型都適用,如Parquet、text、JSON、CSV等。
在Spark static file source 與Structured Streaming file source 中,它們唯一的區別就是:在streaming 中,我們可以控制每次trigger中讀入的文件數,通過maxFilesPerTriger選項即可。
需要注意的是,任何添加到Streaming input 目錄中的文件,都必須是原子的。否則,在你寫入完成前,Spark會partially 處理文件。在如本地文件系統或是HDFS這種允許partial write 文件系統中,最好是在外部文件夾寫完文件后,再移動到 input 目錄中。在Amazon S3 中,對象一般僅在fully writes 后才可被訪問。
Kafka Source 與 Sink
Kafka 是一個分布式的publish-and-subscribe 系統,它可以像一個消息隊列一樣,讓我們可以publish records 到流中。可以將Kafka 認作為一個分布式的buffer,它讓我們在某個類別中(topic)存儲流中的records。
讀Kafka Source
在讀kafka 時,需要選擇一種選項:assign,subscribe,或是subscribePattern。在讀Kafka時,只能選擇其中一種選項。
- Assign :是一種細粒度的配置模式,不僅需要指定topic,還需要指定你想要讀的topic 的partitions。這個通過一個JSON string來配置,例如{“topicA”: [0, 1], “topicB”: [2, 4]}。
- Subscribe與SubscribePattern:是用於subscribe 一個或多個topics 的模式,可以指定一個list(subscribe),或是一個pattern(subscribePattern)
然后,我們還需要指定kafka.bootstrap.servers,除此之外,還有其他幾個options需要指定:
- StartingOffsets 與 endingOffsets:query開始時的初始點,可以是earliest,也就是最開始的offsets;latest,也就是最新的offsets;或者是一個JSON string,為每個TopicPartition指定的初始offset。在JSON中,-2 作為offset可以用於指向earliest,-1指向latest。例如,一個JSON定義為 {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}。
- failOnDataLoss:如果有數據丟失(例如topics 被刪除、或者offsets 超出范圍),是否要fail掉query。默認是true,可以根據任務需求進行修改。
- maxOffsetsPerTrigger:對於給定的一個trigger,讀入多少個offsets
當然,還有其他配置選項,如Kafka consumer timeouts,fetch retries,以及 intervals 等。
下面是一個例子:
// subscribe to 1 topic val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "b-1.xxxxxx:9092,b-2.xxxxxx:9092") .option("subscribe", "topic1") .load() // subscribe to multiple topics val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "b-1.xxxxxx:9092,b-2.xxxxxxx:9092") .option("subscribe", "topic1, topic2") .load() // subscribe to a pattern of topics val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "b-1.xxxxxx:9092,b-2.xxxxxxxx:9092") .option("subscribePattern", "topic.*") .load()
這里若是彈出以下報錯:
org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
則根據Spark 官網添加jar 包:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5
或:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 ...
在Kafka Source 中的每條row都會有以下schema:
- key: binary
- value: binary
- topic: string
- partition: int
- offset: long
- timestamp: long
在Kafka中的每條message都會以某種方式序列化。可以使用Spark原生的方法或是UDF將message 處理為一個更具結構化的形式,常見的模式有JSON或是Avro,去讀寫Kafka。
寫Kafka Sink
寫 Kafka Sink與讀Kafka Sink 類似,僅有幾個參數不一樣。我們仍需要指定Kafka bootstrap servers,而其他需要提供的選項是topic以及列名,有兩種方式提供,如下所示,下面兩個寫法是等價的:
// specify topic and columns together ds1.selectExpr("topic", "CAST(key as STRING)", "CAST(value AS STRING)") .writeStream.format("kafka") .option("checkpointLocation", "/kafka/checkpoint/dir") .option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092") .start() // specify column and topic seperately ds1.selectExpr("CAST(key as STRING)", "CAST(value AS STRING)") .writeStream.format("kafka") .option("checkpointLocation", "/kafka/checkpoint/dir") .option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092") .option("topic", "topic1") .start()
foreach Sink
foreach sink 類似於Dataset API 中的 foreachPartitions。這個操作允許operations可以在per-partition 的基礎下進行並行地計算。在使用foreach sink時,必須實現ForeachWriter 接口,可以在Scala/Java 文檔中查看,它包含3個方法:open、process 和 close。在trigger觸發了一組生成的rows后,會執行相對應的這三個方法。
總的來說,這個是一個自定義的方法,可以實現自己想要的業務邏輯。
針對測試的Sources與Sinks
這些主要用於做測試,一般不會在生產環境中使用,因為它們無法提供端到端的fault tolerance。
Socket source
通過TCP sockets發送的數據,指定host和port 來讀數據,如:
val socketDF = spark.readStream.format("socket")
.option("host", "localhost").option("port", 9999).load()
在客戶端可以執行:
nc -lk 9999
Console sink
可以將輸出吸入到console,支持append和complete的輸出模式:
activityCounts.format("console").write()
Memory sink
類似於console sink,區別是會將數據collect到driver中,然后以in-memory table的形式提供交互式query:
activityCounts.writeStream.format("memory").queryName("my_device_table")
7.2. 數據如何輸出(Output Modes)
在Structured Streaming 中,output modes的概念與static DF 中的一樣。
Append Mode
Append mode 是默認的行為,也是最簡單最容易理解的。在一個新row添加到result table時,它們會基於指定的trigger,輸出到sink。在fault-tolerant sink的條件下,這個模式可以確保每行row僅輸出一次(並且僅此一次)。當我們將append mode 與event-time 和watermarks 結合使用時,僅有最終的結果會輸出到sink。
Complete Mode
Complete mode 會輸出result table 的整個state到配置的output sink 中。在一些stateful 數據的場景下,例如所有的rows都會隨時間改變,或是配置的sink 並不支持行級的updates 時,complete mode 非常有用。如果query 並不包含aggregation,則這個等同於 append mode
Update Mode
Update mode 類似於 complete mode,區別是:僅有與上一次不同的rows 會被寫入到sink 中。不過,sink必須支持行級的 updates,才能支持這個模式。如果query 不包含任何 aggregation,則這個模式等同於 append mode。
什么場景選擇各個mode?
Structured Streaming 對於每個output mode,都會結合query的信息進行限制。例如,假設我們的query僅是做一個map操作,那么Structured Streaming不會允許complete mode,因為這樣會需要記錄所有的輸入records,並重寫整個output table。下面是一些小手冊,關於什么時候使用各個output mode。
什么時候數據輸出(Triggers)
在控制什么時候將數據輸出到sink時,我們會設置一個trigger。默認情況下,Structured Streaming會在上一個trigger完成處理后,啟動數據。我們可以使用triggers來確保不會有太多的updates壓垮output sink,或是控制output中的文件大小。當前,僅有一種定期觸發的trigger類型,基於的是processing time;以及一次性的trigger,用於手動一次執行一個processing step。在未來會加入更多的triggers。
Processing Time Trigger
對於processing time trigger,我們僅需要指定一個string 類型的duration時間即可(或是在Scalat中的Duration 類型、Java中的TimeUnit類型),例如:
import org.apache.spark.sql.streaming.Trigger activityCounts.writeStream .trigger(Trigger.ProcessingTime("100 seconds")) .format("memory") .outputMode("complete") .start()
ProcessingTime trigger 會等到給定duration的時間再輸出數據。例如,假設給定的duration是1分鍾,則trigger會在12:00,12:01,12:02 觸發,依次類推。如果由於上一個處理沒完成,導致了一個trigger time miss,則Spark會等到下一個trigger 點(也就是下一分鍾),而不是在上一個處理完成后立即終止。
Once Trigger
我們也可以僅讓一個streaming job 執行一次,指定once trigger即可。這個聽起來有點奇怪,不過在開發與生產中,這個trigger都非常有用。對於開發,可以用於測試。而對於生產,Once trigger可以讓我們以一個非常低頻的方式執行任務(例如,向數據源導入數據是一個不確定的時間周期)。由於Structured Streaming 仍可以追溯所有已經處理過的文件,以及計算后的state,它比自身再寫一個追溯batch job 的處理要簡單地多,並且可以節省較多的資源(因為流程序定期運行的話,會一直消耗計算資源)。使用方式如下:
import org.apache.spark.sql.streaming.Trigger activityCounts.writeStream.trigger(Trigger.Once()) .format("console") .outputMode("complete") .start()
8. Streaming Dataset API
最后一點需要提到的是,在Structured Streaming 中,不僅只限於對stream使用DataFrame API。還可以使用Datasets執行同樣的計算,但是用的是type-safe 的方式。我們可以將一個streaming DataFrame轉換為一個Dataset。