Spark 3.2.1 Structured Streaming編程指南


一.概述

Structured Streaming是一個可擴展、容錯的流處理引擎,建立在Spark SQL引擎之上。開發者可以用離線批處理數據相同的表示來表示流計算的邏輯,並且保持其邏輯的一致性(流批一體)。Spark SQL引擎會處理好增量連續運行,並隨着流式數據的接收持續更新最終結果。開發者可以使用Dataset/DataFrame API ,使用Scala,Java,Python或者R的方式編程,表達 streaming 聚合,事件時間窗口,流批Join等。計算邏輯在Spark SQL引擎上執行,充分利用Spark SQL引擎的優勢。最后,系統通過Checkpoint及Write-Ahead Log保證端到端的exactly-once容錯機制。簡單來說,Structured Streaming提供高性能,可擴展,容錯,端到端exactly-once的流處理

,開發人員無需對流進行額外的考慮。

內部實現上,Structured Streaming 的query默認采用micro-batch處理引擎,將數據流當作一系列小的批任務處理,端到端延時最低能到100毫秒並且保證exactly=once容錯機制。在Spark2.3中,加入了一個名為Continuous的新的低延時處理模式,可以達到端到端延時1ms並且保證at-least-once。在兩種模式間切換時無需修改Dataset/DataFrame的處理邏輯。

在本指南中,會詳細介紹變成模型和API。大部分概念會使用默認的micro-batch處理模式來進行解釋,隨后再討論Continuous處理模式。接下來,我們先從一個簡單的Structured Streaming query--流式word count開始。

 

二.快速示例

假設你需要維護一個持續從TCP socket接收文本數據並word count的程序。看看如何使用Structured Streaming來表達。下面是Scala的例子,如果下載Spark,還可以直接運行該示例。讓我們通過示例一步步的理解其工作原理。

首先,我們需要import必須的類並創建一個local模式的SparkSession,這是所有與Spark相關的功能的起點。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._

接下來,創建一個Streaming DataFrame,它表示從本地主機9999端口接收的文本數據,並轉換DataFrame來計算wordcount。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

lines DataFrame表示包含流式文本數據的無界表。此表包含一列名為“value”的字符串,流式文本數據中的每一行都成為表中的一行。請注意,由於我們只是在設置轉換,還沒有開始轉換,所以目前還沒有收到任何數據。接下來,我們使用將DataFrame轉換為字符串數據集。作為[String],這樣我們就可以應用flatMap操作將每一行拆分為多個單詞。結果單詞數據集包含所有單詞。最后,我們通過按數據集中的唯一值分組並計數來定義wordCounts DataFrame。請注意,這是一個流DataFrame,表示流的wordcount。

我們現在已經設置了對流數據的query。剩下的就是開始接收數據並計算計數。為了做到這一點,我們將其設置為在每次更新時將完整的計數集(由outputMode(“complete”)指定)打印到控制台。然后使用start()啟動流計算。

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

執行此代碼后,流計算將在后台啟動。query對象是活動流式查詢的句柄,我們使用waitTermination()等待查詢的終止,以防止進程在查詢活動時退出。
要實際執行此示例代碼,您可以在自己的Spark應用程序中編譯代碼,或者在下載Spark后運行示例。我們正在展示后者。首先,您需要使用

$ nc -lk 9999

然后,在另一個終端中,可以使用

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

然后,在運行netcat服務器的終端中鍵入的任何行都將每秒計數並打印在屏幕上。如下所示:

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop

 

# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

 

三.編程模型

Structured Streaming 的關鍵思想是將實時數據流視為一個不斷追加的表。這種思想創造了一種新的流處理模型,它與批處理模型非常相似。您將把流計算表示為靜態表上的標准批處理查詢,Spark將其作為無界輸入表上的增量查詢運行。讓我們更詳細地了解這個模型。

 

3.1 基本概念

將輸入數據流視為“輸入表”。流中的每個數據項都像是一個新行被追加到輸入表中。

image

 

對輸入的查詢將生成“結果表”。每個觸發間隔(比如,每1秒)都會向輸入表追加新行,最終更新結果表。無論何時更新結果表,我們都希望將更改后的結果行寫入外部接收器。

image

“Output”定義為寫入外部存儲器的內容。可以在不同的模式下定義輸出:

Complete Mode-整個更新的結果表將寫入外部存儲器。由存儲連接器決定如何處理整個表的寫入。

Append Mode-只有自上次觸發器以來追加到結果表中的新行才會寫入外部存儲器。這僅適用於結果表中的現有行預計不會更改的查詢。

Update Mode-只有自上次觸發后在結果表中更新的行才會寫入外部存儲器(自Spark 2.1.1起可用)。請注意,這與Complete Mode的不同之處在於,此模式僅輸出自上次觸發以來已更改的行。如果查詢不包含聚合,則相當於追加模式。

請注意,每種模式都適用於某些類型的查詢。這將在后面詳細討論。

ote that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame.

為了說明該模型的使用,讓我們結合上面的快速示例來理解該模型。第一行DataFrame是輸入表,最后一行wordCounts DataFrame是結果表。請注意,streaming DataFrame上lines到wordCounts的query與static DataFrame完全相同。但是,當該查詢啟動時,Spark將持續檢查來自socket連接的新數據。如果有新數據,Spark將運行“增量”查詢,將以前運行的計數與新數據結合起來,以計算更新的計數,如下所示。

image

 

需要注意的是,Structured Streaming並不能物化整個表。它從流數據源讀取最新的可用數據,增量處理以更新結果,然后丟棄源數據。它只保留更新結果所需的最小中間狀態數據(如前面示例中的中間計數)。

此模型與許多其他流處理引擎有顯著不同。許多流媒體系統要求用戶自己維護正在運行的聚合,因此必須考慮容錯性和數據一致性(至少一次、最多一次或恰好一次)。在該模型中,Spark負責在有新數據時更新結果表,從而避免用戶對其進行推理。作為一個例子,讓我們看看這個模型如何處理基於事件時間的處理和延遲到達的數據。

 

3.2 處理事件時間和延遲數據

事件時間是數據本身產生的時間,包含在數據內容中。對於許多應用程序,您可能希望在事件時間上進行操作。例如,如果希望獲得每分鍾由物聯網設備生成的事件數,那么最好是能使用生成數據的時間(即數據中的事件時間),而不是Spark接收數據的時間。這個事件時間在這個模型中非常自然地表示出來——來自設備的每個事件都是表中的一行,而事件時間是行中的一列值。這允許基於窗口的聚合(例如,每分鍾的事件數)只是事件時間列上的一種特殊類型的分組和聚合——每個時間窗口是一個組,每一行可以屬於多個窗口/組。因此,可以在離線數據(例如已經收集好的設備事件日志)以及實時數據流上一致地定義這種基於事件時間窗口的聚合查詢,這種流批一體的處理方式能夠幫助開發人員減輕大量的重復開發工作。
此外,該模型天然的支持根據事件時間處理比預期晚到的數據。由於Spark自己更新結果表,因此它可以完全控制在有延遲數據時更新舊聚合,以及清理舊聚合以限制中間狀態數據的大小。從Spark 2.1開始,我們就支持Watermark,它允許用戶指定延遲數據的閾值,並允許引擎相應地清除舊狀態。稍后將在“窗口操作”一節中詳細解釋這些操作。

 

3.3 容錯語義

提供端到端的exactly-once語義是Structured Streaming設計背后的關鍵目標之一。為了實現這一點,我們設計了Structured Streaming sources, sinks 和execution engine,以可靠地跟蹤處理的確切進度,從而可以通過重新啟動和/或重新處理來處理任何類型的故障。假設每個流媒體源都有偏移量(類似於Kafka偏移量或Kinesis序列號)來跟蹤流中的讀取位置。引擎使用檢查點和WAL來記錄每個Trigger中正在處理的數據的偏移范圍。Sinks設計為冪等重復處理。總之,使用可重放Source和冪等Sink,Structured Streaming 可以確保在任何故障情況下端到端只執行一次語義。

 

4. DataSet和DataFrame API

4.1 創建Streaming DataFrame和Streaming Dataset

Streaming DataFrame可以通過SparkSession.readStream()返回的DataStreamReader接口創建。與創建靜態DataFrame的讀取接口類似,可以指定source的詳細信息——data format, schema, options等。

 

4.1.1 Input Source

內置的Input Source如下:

File source-讀取寫入到目錄中的文件作為數據流。文件會按照文件修改時間的順序進行處理。如果設置了latestFirst,則順序將顛倒。支持的文件格式有text、CSV、JSON、ORC和Parquet。請參閱DataStreamReader的文檔,以獲取最新的列表,以及每種文件格式支持的option。請注意,文件必須以原子方式放置在給定的目錄中,在大多數文件系統中,可以通過文件move操作來實現。

Kafka source-從卡夫卡讀取數據。它與Kafka broker版本0.10.0或更高版本兼容。有關更多詳細信息,請參閱《Kafka 集成指南》。

Socket source(用於測試)-從socket連接讀取UTF8文本數據。socket接收器位於Driver端。請注意,這只應用於測試,因為它不提供端到端的容錯保證。

Rate source(用於測試)-以每秒指定的行數生成數據,每個輸出行包含時間戳和值。其中,timestamp是包含消息分派時間的時間戳類型,value是包含消息計數的Long類型,從0開始作為第一行。此源用於測試和基准測試。

 

有些數據源是不容錯的,因為它們不能保證在發生故障后可以使用checkpoint偏移量重放數據。請參閱前面關於容錯語義的部分。以下是Spark中所有Source的詳細信息。

File source(容錯)參數列表:

path:輸入目錄的路徑,對所有文件格式通用。
maxFilesPerTrigger:每個Trigger中包含的最大新文件數(默認值:無最大值)
latestFirst:是否首先處理最新的新文件,在有大量積壓文件時很有用(默認值:false)
filenameOnly:是否僅基於文件名而非完整路徑檢查新文件(默認值:false)。設置為“true”時,以下文件將被視為同一文件,因為它們的文件名“dataset.txt”是相同的:
"file:///dataset.txt"
“s3://a/dataset.txt”
“s3n://a/b/dataset.txt”
“s3a://a/b/c/dataset.txt”
maxFileAge:目錄中文件的最長期限,超過該期限將被忽略。對於第一批,所有文件都將被視為有效。如果latestFirst設置為'true',並設置了maxFilesPerTrigger,則此參數將被忽略,因為可能會忽略有效且應處理的舊文件。最大期限是根據最新文件的時間戳而不是當前系統的時間戳來指定的。(默認值:1周)
cleanSource:處理后清理已完成文件的選項。
可用選項有"archive", "delete", "off"。如果未提供該選項,則默認值為“off”。
當提供“archive”時,還必須提供附加選項sourceArchiveDir。“sourceArchiveDir”的值在深度上不能與source的通配符(根目錄中的目錄數)匹配,其中深度是兩個path上的最小深度。這將確保存檔文件永遠不會作為新的源文件。
例如,假設使用/hello?/spark/*作為 source 通配符,“/hello1/spark/archive/dir”不能用作“sourceArchiveDir”的值,如/hello?/spark/*和'/hello1/spark/archive'將匹配。/archived/here可以,因為它不匹配。
Spark將根據源文件自身的路徑移動源文件。例如,如果源文件的路徑是/a/b/dataset.txt,存檔目錄的路徑為/archive/here,文件將被移動到/archive/here/a/b/dataset.txt。
注1:存檔(通過移動)或刪除已完成的文件都會在每個微批處理中引入開銷(降低速度,即使它發生在單獨的線程中),因此在啟用此選項之前,您需要了解文件系統中每個操作的成本。另一方面,啟用此選項的好處是將降低list源文件這一高消耗操作的成本。
可以使用spark配置已完成文件清理器中使用的線程數。具體配置是spark.sql.streaming.fileSource.cleaner.numThreads(默認值:1)。
注2:啟用此選項時,確保該source僅被一處輸入使用,且未被其他輸出使用。
注3:刪除和移動操作都是盡可能的操作。未能刪除或移動文件不會導致流式查詢失敗。Spark在某些情況下可能無法清理某些源文件,例如,應用程序沒有正常關閉,有太多文件排隊清理。
有關特定於文件格式的選項,請參閱DataStreamReader中的相關方法。例如,有關“parquet”格式的選項,請參閱DataStreamReader.parquet()。
此外,還有一些session配置會影響某些文件格式。有關更多詳細信息,請參閱《SQL編程指南》。例如,“parquet”見parquet配置

 

Socket Source(非容錯):

支持的配置有host和port,均為必填。

 

Rate Source(容錯):

rowsPerSecond(例如100,默認值:1):每秒應該生成多少行。
rampUpTime(例如5s,默認值:0s):在生成速度變為rowsPerSecond之前,需要多長時間才能加速。使用小數點將被截斷為整數秒。
numPartitions(例如10,默認值:Spark的default parallelism):生成的行的分區號。
source將盡最大努力達到rowsPerSecond,但可能會受到資源限制,可以調整numPartitions以幫助達到所需的速度。

 

Kafka Source(容錯):

參考《Kafka Integration Guide

 

其他Source:

Pulsar:參考https://github.com/streamnative/pulsar-spark

 

示例:

val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

 

該示例生成untyped的Streaming DataFrame,這意味着DataFrame的Schema不會在編譯時被檢查,只有在提交查詢時才會在運行時被檢查。有些操作,如map、flatMap等,需要在編譯時就知道類型。為此,可以使用與Static DataFrame相同的方法將這些untyped Streaming DataFrame轉換為typed Streaming DataFrame。有關更多詳細信息,請參閱《SQL編程指南》。此外,本文后面將討論有關受支持的Streaming Source的更多詳細信息。
從Spark 3.1開始,還可以使用DataStreamReader.table()從表中創建Streaming DataFrame。

 

4.1.2 Schema推理和Partition

默認情況下,在Structured Streaming中,File Source的需要手動指定Schema,而不是依靠Spark自動推斷。此限制確保流式處理使用一致的Schema,即使在失敗的情況下也是如此。對於Ad-hoc的場景,可以通過設置spark.sql.streaming.schemaInference=true來啟用自動推斷。
如果存在名為/key=value/的子目錄則會觸發分區發現,並且list操作將自動遞歸到這些目錄中,。如果這些列出現在程序指定的shema中,Spark將根據正在讀取的文件的路徑來填充它們。當處理開始時,組成分區Schema的目錄必須存在,並且必須保持靜態。例如,當/data/year=2015/存在時可以添加/data/year=2016/,但更改分區列(即創建目錄/data/date=2016-04-17/)是無效的。

 

 

4.2 操作 Streaming DataFrame/Dataset

在Streaming DataFrame/Dataset上可以進行各種操作,從untyped的類SQL操作(如select、where、groupBy)到typed的RDD類操作(例如map、filter、flatMap)。有關更多詳細信息,請參閱《SQL編程指南》。讓我們來看幾個可以使用的示例操作。

4.2.1 基本操作-Selection, Projection, Aggregation

Streaming支持對DataFrame/Dataset的大多數常見操作。本節后面將討論不受支持的幾個操作。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

 

還可以將Streaming DataFrame/Dataset注冊為臨時視圖,然后對其應用SQL命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF

可以通過下面方式判斷DataFrame是否是Streaming類型的:

df.isStreaming

調試過程中可能需要檢查查詢計划,因為Spark會在解釋處理Streaming Dataset的SQL語句時注入帶狀態的操作。一旦帶狀態的操作被注入到查詢計划中,您可能需要使用有狀態操作中的注意事項來檢查查詢。(例如輸出模式、水印、狀態存儲大小維護等)

 

4.2.2 基於事件時間的窗口函數

滑動事件時間窗口上的聚合對於Structured Streaming來說非常簡單,與分組聚合非常相似。在分組聚合中,為用戶指定的分組列中的每個唯一值維護聚合值(例如計數)。如果是基於窗口的聚合,則為事件時間所在的每個窗口維護聚合值。讓我們通過一個例子來理解這一點。

修改一下快速示例,流現在包含行以及生成行的時間。我們希望以10分鍾為窗口統計一次單詞,並且每5分鍾更新一次。也就是說,在12:00-12:10、12:05-12:15、12:10-12:20等10分鍾窗口之間接收數據的wordcount。請注意,12:00-12:10表示在12:00之后但在12:10之前到達的數據。現在,考慮一個在12:07收到的單詞。這個詞應該增加對應於兩個窗口12:00-12:10和12:05-12:15的計數。因此,計數將同時由分組鍵(即單詞)和窗口(可根據事件時間計算)索引。

如下是結果集的情況:

image

window操作和group非常類似,因此在代碼中可以通過groupBy和window兩個函數進行表達:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

 

4.2.2.1 處理滯后數據和水印

現在考慮一下如下場景,如果其中一個事件遲到了會發生什么。例如,應用程序可以在12:11接收12:04(即事件時間)生成的單詞。應用程序應該使用時間12:04而不是12:11來更新窗口12:00-12:10的舊計數。這種場景在基於窗口的分組中自然而然的就被處理了——Structured Streaming可以在很長一段時間內保持部分聚合的中間狀態,以便后期數據可以正確地更新舊窗口的聚合,如下所示。

image

然而,考慮到系統會持續運行很多天,系統必須限制累積的中間內存狀態的數量。這意味着系統需要知道何時可以從內存狀態中刪除舊聚合,因為應用程序將不再接收該聚合的延遲數據。為了實現這一點,我們在Spark 2.1中引入了水印,它讓引擎自動跟蹤數據中的當前事件時間,並嘗試相應地清除舊狀態。您可以通過指定事件時間列和閾值來定義查詢的水印,該閾值根據事件時間來確定數據的預期延遲時間。對於在時間T結束的特定窗口,引擎將一直保留狀態,並允許延遲數據更新狀態,直到(引擎收到的最大事件時間-延遲閾值>T)。換句話說,閾值內的延遲數據將被聚合,但閾值之后的數據將開始被刪除(有關確切的保證,請參閱本節后面的內容)。讓我們用一個例子來理解這一點。在前面的示例中,我們可以使用withWatermark()輕松定義水印,如下所示。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

 

在本例中,我們將根據列“timestamp”的值定義水印,並將“10分鍾”定義為允許數據延遲的閾值。如果在update輸出模式下運行此查詢(稍后在“輸出模式”一節中討論),引擎將不斷更新結果表中窗口的計數,直到窗口比水印早,水印比“timestamp”列中的當前事件時間滯后10分鍾。如下圖:

image

 

如圖所示,引擎跟蹤的最大事件時間是藍色虛線,每個觸發器開始處設置為(最大事件時間-“10分鍾”)的水印是紅線。例如,當引擎收到數據(12:14,dog)時,它會將下一個觸發器的水印設置為12:04。此水印允許引擎在額外10分鍾內保持中間狀態,以便對延遲數據進行計數。例如,數據(12:09,cat)是無序且延遲的,它在windows 12:00-12:10和12:05-12:15中出現。由於它仍在觸發器中的水印12:04之前,引擎仍將中間計數保持為狀態,並正確更新相關窗口的計數。但是,當水印更新到12:11時,窗口的中間狀態(12:00-12:10)被清除,所有后續數據(例如(12:04,donkey))被認為“太晚”,因此被忽略。請注意,在每次觸發后,更新的計數(即紫色行)將被寫入Sink作為觸發輸出,這由update的輸出模式決定。

某些Sink(例如文件)可能不支持Update模式所需的細粒度的更新。為了使用它們,我們還支持Append模式,在這種模式下,只有最后的計數被寫入Sink。這一點如下所示。

請注意,在非流數據集上使用withWatermark是無效的。由於水印不應以任何方式影響任何批處理查詢,我們將直接忽略它。

 

image

 

 

與之前的Update模式類似,引擎為每個窗口保持中間計數。但是,部分計數不會更新到結果表,也不會寫入接收器。引擎等待“10分鍾”來計算延遲的數據,然后刪除窗口<水印的中間狀態,並將最終計數附加到結果表/接收器。例如,只有在水印更新到12:11之后,窗口12:00-12:10的最終計數才會附加到結果表中。

 

4.2.2.2 三類時間窗口

Spark支持三種時間窗口:滾動(固定)、滑動和會話。

image

滾動窗口是一系列固定大小、不重疊且連續的時間間隔。一個輸入只能綁定到一個窗口。
從“固定大小”的角度來看,滑動窗口與翻滾窗口類似,但如果滑動的持續時間小於窗口的持續時間,窗口可以重疊,在這種情況下,輸入可以綁定到多個窗口。
翻滾和滑動窗口使用窗口功能,這已在上述示例中描述。
與前兩種類型相比,會話窗口具有不同的特性。會話窗口具有窗口長度的動態大小,具體取決於輸入。會話窗口從一個輸入開始,如果在間隔時間內收到以下輸入,則會自行擴展。對於固定的間隔持續時間,當在收到最新輸入后間隔持續時間內沒有收到輸入時,會話窗口關閉。
會話窗口使用session_window 函數。該函數的用法與窗口函數類似。

import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window($"timestamp", "5 minutes"),
        $"userId")
    .count()

 

除了靜態值,我們還可以提供一個表達式,根據輸入行動態指定間隙持續時間。請注意,間隙持續時間為負或為零的行將從聚合中篩選出來。

使用動態間隙持續時間,會話窗口的關閉不再依賴於最新的輸入。會話窗口的范圍是所有事件范圍的並集,這些范圍由事件開始時間和任務執行期間計算的間隔持續時間決定。

import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
  .when($"userId" === "user2", "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        Column(sessionWindow),
        $"userId")
    .count()

 

請注意,在流式任務中使用會話窗口時有一些限制,如下所示:

不支持“Update”作為輸出模式。

在分組鍵中,除了session_窗口外,還應至少有一列。這一限制在批式任務中沒有。

 

4.2.2.3 水印清除聚合狀態的條件

需要注意的是,必須滿足以下條件,水印才能清除聚合查詢中的狀態(從Spark 2.1.1開始,可能會在將來更改)。

輸出模式必須為追加或更新。完整模式要求保留所有聚合數據,因此不能使用水印刪除中間狀態。有關每個輸出模式的語義的詳細解釋,請參見“輸出模式”部分。

聚合必須具有事件時間列或事件時間列上的窗口。

withWatermark必須在與聚合中使用的時間戳列相同的列上調用。例如,df.withWatermark("time", "1 min").groupBy("time2").count() 在追加輸出模式下無效,因為水印是在聚合列以外的列上定義的。

必須在聚合之前調用withWatermark,才能使用水印。例如,df.groupBy("time").count().withWatermark("time", "1 min")在追加輸出模式下無效。

 

4.2.2.4 水印聚合的語義保證

水印延遲(使用withWatermark設置)為“2小時”,可確保引擎不會丟棄延遲小於2小時的任何數據。換言之,在此之前處理的最新數據(就事件時間而言)落后2小時以內的任何數據都保證被聚合。

然而,保證僅在一個方向上是嚴格的。延遲超過2小時的數據不保證被刪除;它可能會聚合,也可能不會聚合。數據越延遲,引擎處理數據的可能性就越小。

 

4.2.3 Join操作(留空)

 

4.2.4 流式重復數據消除

 

您可以使用事件中的唯一標識符消除數據流中的重復記錄。這與使用唯一標識符列進行靜態重復數據消除完全相同。查詢將存儲來自以前記錄的必要數量的數據,以便可以過濾重復記錄。與聚合類似,您可以使用帶或不帶水印的重復數據消除。
使用水印-如果對重復記錄的到達時間有上限,則可以在事件時間列上定義水印,並使用guid和事件時間列進行重復數據消除。查詢將使用水印從過去的記錄中刪除舊的狀態數據,這些記錄預計不會再獲得任何副本。這限制了查詢必須維護的狀態量。
沒有水印——由於重復記錄的到達時間沒有界限,因此查詢將所有過去記錄的數據存儲為狀態。

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")

 

4.2.5 處理多個水印(留空)

 

 

 

4.2.6 任意狀態化的操作

許多用例需要比聚合更高級的有狀態操作。例如,在許多用例中,您必須從事件的數據流中跟蹤session。要執行這種session跟蹤,必須將任意類型的數據保存為狀態,並使用每個觸發器中的數據流事件對狀態執行任意操作。自Spark 2.2以來,可以使用操作mapGroupsWithState和更強大的操作flatMapGroupsWithState來實現這一點。這兩種操作都允許在分組的數據集上應用用戶定義的代碼來更新用戶定義的狀態。有關更多具體細節,請查看API文檔(Scala/Java)和示例(Scala/Java)。
盡管Spark無法檢查並強制執行它,但狀態函數應該根據輸出模式的語義來實現。例如,在更新模式下,Spark不希望狀態函數發出的行比當前水印加上允許的延遲記錄延遲早,而在追加模式下,狀態函數可以發出這些行。

(在session_window出來之前(Spark3.2)flatMapGroupsWithState是最常使用的替代方案,但常被開發人員詬病,Flink早已支持session_window)

 

4.2.7 不支持的操作

-流式數據集不支持多個流式聚合(即流式DF上的聚合鏈)。
-流式數據集不支持limit和獲取前N行。
-不支持對流式數據集執行distinct的操作。
-在流式數據集上聚合后不支持重復數據消除操作。
-只有在完全輸出模式,並聚合后,流式數據集才支持排序操作。
-流數據集上不支持幾種類型的外部聯接。有關更多詳細信息,請參閱“連接操作”部分中的支持矩陣。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM