Spark Structured Streaming(二)實戰


5. 實戰Structured Streaming

5.1. Static版本

先讀一份static 數據:

val static = spark.read.json("s3://xxx/data/activity-data/")

static.printSchema
root
 |-- Arrival_Time: long (nullable = true
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

 

看一下部分內容:

5.2. Streaming 版本

下面我們使用同樣的數據集創建一個streaming 版本,在流的情況下,它會一個接一個地讀取每個輸入文件。

Streaming DataFrames與static DataFrame基本一致,基本上,所有static DF中的transformation都可以應用在Streaming DF中。不過,一個小小的區別是:Structured Streaming並不允許我們執行schema 推斷(inference),除非顯式地啟用此功能。可以通過設置spark.sql.streaming.schemaInference 為 true來開啟schema inference。在這個例子中,我們會讀取一個文件的schema(當然我們已知了這些文件具有有效的schema),並從static DF中pass一個dataSchema對象到streaming DF中。不過在實際中一般要避免這么做,因為數據可能是會改變的。

val dataSchema = static.schema

val streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).json("s3://tang-spark/data/activity-data")

(這里我們指定了maxFilesPerTrigger為1,僅是為了測試方便,讓stream一次讀一個文件,但是一般在生產環境中不會配置這么低。)

與其他Spark API 一樣,streaming DataFrame 的創建以及執行是lazy的。現在我們可以為streaming DF指定一個transformation,最后調用一個action開始執行這個流。在這個例子中,我們會展示一個簡單的transformation —— 根據字段gt,進行group 並 count:

val activityCounts = streaming.groupBy("gt").count()

若是使用的單機模式,也可以設置一個更小的spark.sql.shuffle.partitions 值(默認為200,Configures the number of partitions to use when shuffling data for joins or aggregations.),以避免創建過多的shuffle partitions:

spark.conf.set("spark.sql.shuffle.partitions", 5)

現在我們已經設置好了transformation,下一步僅需要指定一個action就可以開始執行query了。根據上文所述,我們還需要指定一個輸出的destination,或是針對這個query結果的output sink。對於這個簡單的例子,我們會使用memory sink,它會維護一個在內存中的一個table of the results。

在指定sink時,我們還需要定義Spark如何輸出它的數據,在這個例子中,我們會使用complete 的輸出模式。這個模式會在每次trigger后,重寫所有的keys以及它們的counts值:

val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()

在我們運行上面的代碼前,我們也要加上這條代碼:

activityQuery.awaitTermination()

在這條代碼開始運行后,streaming 計算將會在后台運行。而這個query object即是active streaming query 對應的handle,而我們必須使用activityQuery.awaitTermination() 等待query的結束,以防止driver進程在query仍是active時退出。在生產環境中必須要加上這條語句,否則stream無法執行。

 

開始執行:

[Stage 8:==========================>                             (94 + 4) / 200]

 

可以看到有 200 個task,因為使用的是默認的 spark.sql.shuffle.partitions 值。

 

可以使用 spark.streams.active 列出active的stream:

> spark.streams.active

res24: Array[org.apache.spark.sql.streaming.StreamingQuery] = Array(org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@612c6083)

它也會為每個stream分配一個UUID,所以如果有必要的話,可以迭代這個list,並選出上面的那個流。不過在這個例子中,我們已經將這個流assign給一個變量了,所以沒有必要。

現在這個流正在運行,我們可以通過query這個in-memory table來檢查結果,這個表維護了當前streaming aggregation的輸出。這個table將被稱為 activity_counts,與流的名稱一樣。在檢查當前output table 的輸出時,僅需要query此表即可。我們會寫一個循環,每秒打出這個streaming query 的結果:

for (i <- 1 to 5){
     spark.sql("select * from activity_counts").show()
     Thread.sleep(1000)
   }

結果類似於:

 

整體語句為:

val static = spark.read.json("s3://tang-spark/data/activity-data/")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).json("s3://tang-spark/data/activity-data")
val activityCounts = streaming.groupBy("gt").count()
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()

for (i <- 1 to 5){
     spark.sql("select * from activity_counts").show()
     Thread.sleep(1000)
   }

6. Streams 中的 Transformations

Streaming中的transformations 基本涵蓋了static DF 中的 transformations,例如select、filter等簡單的transformations。不過仍是有些限制,例如在Spark 2.2 版本中,用戶無法sort一個沒有aggregated的stream,並且在沒有使用Stateful Processing時,無法執行多層的aggregation,等等。這些限制可能會隨着版本更替有所解決,對此最好參考Spark的官方文檔,查看更新情況。

 

6.1. Selections and Filtering

在Structured Streaming中,支持所有的select 與 filter。下面是一個簡單的例子,由於我們並不會隨着時間更新key,所以會使用Append output mode,將新數據append到輸出表中:

import org.apache.spark.sql.functions.expr

val simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))
  .where("stairs")
  .where("gt is not null")
  .select("gt", "model", "arrival_time", "creation_time")
  .writeStream
  .queryName("simple_transform")
  .format("memory")
  .outputMode("append")
  .start()

 

下面查詢一下,先可以看看有哪些表:

 

然后查詢 simple_transform 表:

spark.sql("select * from simple_transform").show()

 

6.2. Aggregations

Structured Streaming 對 aggregation操作的支持非常好。我們可以指定任意聚合操作。例如,我們可以使用一個比較有特點的聚合(例如Cube),在sensor的phone model 、activity、以及average x,y,z acceleration 字段上聚合Cube:

val deviceModelStats = streaming.cube("gt", "model").avg()
  .drop("avg(Arrival_time)")
  .drop("avg(Creation_Time)")
  .drop("avg(Index)")
  .writeStream.queryName("device_counts").format("memory").outputMode("complete")
  .start()

spark.sql("select * from device_counts").show()

 

相信大家已經發現了,這里queryName(“device_counts”) 也就是我們最后生成的表名。

 

6.3. Joins

在Spark 2.2 版本以后,Structured Streaming支持streaming DataFrames 與 static DataFrames進行join。Spark 2.3 會增加多個streams 的join 功能。

val historicalAgg = static.groupBy("gt", "model").avg()
val deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")
  .cube("gt", "model").avg()
  .join(historicalAgg, Seq("gt", "model"))
  .writeStream.queryName("device_counts_join").format("memory").outputMode("complete")
  .start()

spark.sql("select * from device_counts_join").show()

 

在Spark 2.2 中,full outer join、left join(stream在右)、right join(stream在左)暫不支持。Structured Streaming 也還沒有支持stream-to-stream 的join,不過這個也是一個正在開發中的功能。

 

7. Input 與 Output

這節我們會深入討論,在Structured Streaming 中,sources、sinks以及output modes 是如何工作的。特別地,我們會討論數據流是怎樣、何時、從哪,進入以及流出系統。

下面討論的source 與 sink 都是端到端相同的情況一起討論的,當然端到端也可以是不一樣的(例如source 是kafka,sink 是file)

 

7.1. 數據在哪讀寫(Sources 與 Sinks)

Structured Streaming 支持多個production sources 與 sinks(files 以及Kafka),以及一些debug 工具(例如memory table sink)。我們之前已經介紹過這些,現在進一步討論。

 

File Source 與 Sink

可能能想到的最簡單的source就是file source,基本上我們能想到的所有文件類型都適用,如Parquet、text、JSON、CSV等。

在Spark static file source 與Structured Streaming file source 中,它們唯一的區別就是:在streaming 中,我們可以控制每次trigger中讀入的文件數,通過maxFilesPerTriger選項即可。

需要注意的是,任何添加到Streaming input 目錄中的文件,都必須是原子的。否則,在你寫入完成前,Spark會partially 處理文件。在如本地文件系統或是HDFS這種允許partial write 文件系統中,最好是在外部文件夾寫完文件后,再移動到 input 目錄中。在Amazon S3 中,對象一般僅在fully writes 后才可被訪問。

 

Kafka Source 與 Sink

Kafka 是一個分布式的publish-and-subscribe 系統,它可以像一個消息隊列一樣,讓我們可以publish records 到流中。可以將Kafka 認作為一個分布式的buffer,它讓我們在某個類別中(topic)存儲流中的records。

 

讀Kafka Source

在讀kafka 時,需要選擇一種選項:assign,subscribe,或是subscribePattern。在讀Kafka時,只能選擇其中一種選項。

  • Assign :是一種細粒度的配置模式,不僅需要指定topic,還需要指定你想要讀的topic 的partitions。這個通過一個JSON string來配置,例如{“topicA”: [0, 1], “topicB”: [2, 4]}。
  • Subscribe與SubscribePattern:是用於subscribe 一個或多個topics 的模式,可以指定一個list(subscribe),或是一個pattern(subscribePattern)

然后,我們還需要指定kafka.bootstrap.servers,除此之外,還有其他幾個options需要指定:

  • StartingOffsets 與 endingOffsets:query開始時的初始點,可以是earliest,也就是最開始的offsets;latest,也就是最新的offsets;或者是一個JSON string,為每個TopicPartition指定的初始offset。在JSON中,-2 作為offset可以用於指向earliest,-1指向latest。例如,一個JSON定義為 {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}。
  • failOnDataLoss:如果有數據丟失(例如topics 被刪除、或者offsets 超出范圍),是否要fail掉query。默認是true,可以根據任務需求進行修改。
  • maxOffsetsPerTrigger:對於給定的一個trigger,讀入多少個offsets

當然,還有其他配置選項,如Kafka consumer timeouts,fetch retries,以及 intervals 等。

下面是一個例子:

// subscribe to 1 topic
val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "b-1.xxxxxx:9092,b-2.xxxxxx:9092")
  .option("subscribe", "topic1")
  .load()

// subscribe to multiple topics
val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "b-1.xxxxxx:9092,b-2.xxxxxxx:9092")
  .option("subscribe", "topic1, topic2")
  .load()

// subscribe to a pattern of topics
val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "b-1.xxxxxx:9092,b-2.xxxxxxxx:9092")
  .option("subscribePattern", "topic.*")
  .load()

 

這里若是彈出以下報錯:

org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

 

則根據Spark 官網添加jar 包:

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5

或:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 ...

 

在Kafka Source 中的每條row都會有以下schema:

  • key: binary
  • value: binary
  • topic: string
  • partition: int
  • offset: long
  • timestamp: long

 

在Kafka中的每條message都會以某種方式序列化。可以使用Spark原生的方法或是UDF將message 處理為一個更具結構化的形式,常見的模式有JSON或是Avro,去讀寫Kafka。

 

寫Kafka Sink

寫 Kafka Sink與讀Kafka Sink 類似,僅有幾個參數不一樣。我們仍需要指定Kafka bootstrap servers,而其他需要提供的選項是topic以及列名,有兩種方式提供,如下所示,下面兩個寫法是等價的:

// specify topic and columns together
ds1.selectExpr("topic", "CAST(key as STRING)", "CAST(value AS STRING)")
  .writeStream.format("kafka")
  .option("checkpointLocation", "/kafka/checkpoint/dir")
  .option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092")
  .start()

// specify column and topic seperately
ds1.selectExpr("CAST(key as STRING)", "CAST(value AS STRING)")
  .writeStream.format("kafka")
  .option("checkpointLocation", "/kafka/checkpoint/dir")
  .option("kafka.bootstrap.servers", "b-1.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.tang-kafka-plain.hnjunx.c4.kafka.cn-north-1.amazonaws.com.cn:9092")
  .option("topic", "topic1")
  .start()

foreach Sink

foreach sink 類似於Dataset API 中的 foreachPartitions。這個操作允許operations可以在per-partition 的基礎下進行並行地計算。在使用foreach sink時,必須實現ForeachWriter 接口,可以在Scala/Java 文檔中查看,它包含3個方法:open、process 和 close。在trigger觸發了一組生成的rows后,會執行相對應的這三個方法。

總的來說,這個是一個自定義的方法,可以實現自己想要的業務邏輯。

 

針對測試的Sources與Sinks

這些主要用於做測試,一般不會在生產環境中使用,因為它們無法提供端到端的fault tolerance。

 

Socket source

通過TCP sockets發送的數據,指定host和port 來讀數據,如:

val socketDF = spark.readStream.format("socket")

  .option("host", "localhost").option("port", 9999).load()

 

在客戶端可以執行:

nc -lk 9999

 

Console sink

可以將輸出吸入到console,支持append和complete的輸出模式:

activityCounts.format("console").write()

 

Memory sink

類似於console sink,區別是會將數據collect到driver中,然后以in-memory table的形式提供交互式query:

activityCounts.writeStream.format("memory").queryName("my_device_table")

 

7.2. 數據如何輸出(Output Modes)

在Structured Streaming 中,output modes的概念與static DF 中的一樣。

 

Append Mode

Append mode 是默認的行為,也是最簡單最容易理解的。在一個新row添加到result table時,它們會基於指定的trigger,輸出到sink。在fault-tolerant sink的條件下,這個模式可以確保每行row僅輸出一次(並且僅此一次)。當我們將append mode 與event-time 和watermarks 結合使用時,僅有最終的結果會輸出到sink。

 

Complete Mode

Complete mode 會輸出result table 的整個state到配置的output sink 中。在一些stateful 數據的場景下,例如所有的rows都會隨時間改變,或是配置的sink 並不支持行級的updates 時,complete mode 非常有用。如果query 並不包含aggregation,則這個等同於 append mode

 

Update Mode

Update mode 類似於 complete mode,區別是:僅有與上一次不同的rows 會被寫入到sink 中。不過,sink必須支持行級的 updates,才能支持這個模式。如果query 不包含任何 aggregation,則這個模式等同於 append mode。

 

什么場景選擇各個mode?

Structured Streaming 對於每個output mode,都會結合query的信息進行限制。例如,假設我們的query僅是做一個map操作,那么Structured Streaming不會允許complete mode,因為這樣會需要記錄所有的輸入records,並重寫整個output table。下面是一些小手冊,關於什么時候使用各個output mode。

 

什么時候數據輸出(Triggers)

在控制什么時候將數據輸出到sink時,我們會設置一個trigger。默認情況下,Structured Streaming會在上一個trigger完成處理后,啟動數據。我們可以使用triggers來確保不會有太多的updates壓垮output sink,或是控制output中的文件大小。當前,僅有一種定期觸發的trigger類型,基於的是processing time;以及一次性的trigger,用於手動一次執行一個processing step。在未來會加入更多的triggers。

 

Processing Time Trigger

對於processing time trigger,我們僅需要指定一個string 類型的duration時間即可(或是在Scalat中的Duration 類型、Java中的TimeUnit類型),例如:

import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream
  .trigger(Trigger.ProcessingTime("100 seconds"))
  .format("memory")
  .outputMode("complete")
  .start()

ProcessingTime trigger 會等到給定duration的時間再輸出數據。例如,假設給定的duration是1分鍾,則trigger會在12:00,12:01,12:02 觸發,依次類推。如果由於上一個處理沒完成,導致了一個trigger time miss,則Spark會等到下一個trigger 點(也就是下一分鍾),而不是在上一個處理完成后立即終止。

 

Once Trigger

我們也可以僅讓一個streaming job 執行一次,指定once trigger即可。這個聽起來有點奇怪,不過在開發與生產中,這個trigger都非常有用。對於開發,可以用於測試。而對於生產,Once trigger可以讓我們以一個非常低頻的方式執行任務(例如,向數據源導入數據是一個不確定的時間周期)。由於Structured Streaming 仍可以追溯所有已經處理過的文件,以及計算后的state,它比自身再寫一個追溯batch job 的處理要簡單地多,並且可以節省較多的資源(因為流程序定期運行的話,會一直消耗計算資源)。使用方式如下:

import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.Once())
  .format("console")
  .outputMode("complete")
  .start()

8. Streaming Dataset API

最后一點需要提到的是,在Structured Streaming 中,不僅只限於對stream使用DataFrame API。還可以使用Datasets執行同樣的計算,但是用的是type-safe 的方式。我們可以將一個streaming DataFrame轉換為一個Dataset。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM