Spark結構式流編程指南
概覽
Structured Streaming 是一個可拓展,容錯的,基於Spark SQL執行引擎的流處理引擎。使用小量的靜態數據模擬流處理。伴隨流數據的到來,Spark SQL引擎會逐漸連續處理數據並且更新結果到最終的Table中。你可以在Spark SQL上引擎上使用DataSet/DataFrame API處理流數據的聚集,事件窗口,和流與批次的連接操作等。最后Structured Streaming 系統快速,穩定,端到端的恰好一次保證,支持容錯的處理。
小樣例
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
編程模型
結構化流的關鍵思想是將實時數據流視為一個連續附加的表
基本概念
將輸入的數據當成一個輸入的表格,每一個數據當成輸入表的一個新行。
"Output"是寫入到外部存儲的寫方式,寫入方式有不同的模式:
- Complete模式: 將整個更新表寫入到外部存儲,寫入整個表的方式由存儲連接器決定。
- Append模式:只有自上次觸發后在結果表中附加的新行將被寫入外部存儲器。這僅適用於結果表中的現有行不會更改的查詢。
- Update模式:只有自上次觸發后在結果表中更新的行將被寫入外部存儲器(在Spark 2.0中尚不可用)。注意,這與完全模式不同,因為此模式不輸出未更改的行。
處理事件時間和延遲數據
事件時間是嵌入在數據本身中的時間。對於許多應用程序,您可能希望在此事件時間操作。例如,如果要獲取IoT設備每分鍾生成的事件數,則可能需要使用生成數據的時間(即數據中的事件時間),而不是Spark接收的時間他們。此事件時間在此模型中非常自然地表示 - 來自設備的每個事件都是表中的一行,事件時間是該行中的一個列值。這允許基於窗口的聚合(例如每分鍾的事件數)僅僅是偶數時間列上的特殊類型的分組和聚合 - 每個時間窗口是一個組,並且每一行可以屬於多個窗口/組。因此,可以在靜態數據集(例如,來自收集的設備事件日志)以及數據流上一致地定義這種基於事件時間窗的聚合查詢,使得用戶的生活更容易。
此外,該模型自然地處理基於其事件時間比預期到達的數據。由於Spark正在更新結果表,因此當存在延遲數據時,它可以完全控制更新舊聚合,以及清除舊聚合以限制中間狀態數據的大小。由於Spark 2.1,我們支持水印,允許用戶指定后期數據的閾值,並允許引擎相應地清除舊的狀態。稍后將在“窗口操作”部分中對此進行詳細說明。
容錯語義
提供端到端的一次性語義是結構化流的設計背后的關鍵目標之一。為了實現這一點,我們設計了結構化流源,接收器和執行引擎,以可靠地跟蹤處理的確切進展,以便它可以通過重新啟動和/或重新處理來處理任何類型的故障。假定每個流源具有偏移量(類似於Kafka偏移量或Kinesis序列號)以跟蹤流中的讀取位置。引擎使用檢查點和預寫日志來記錄每個觸發器中正在處理的數據的偏移范圍。流接收器被設計為用於處理再處理的冪等。結合使用可重放源和冪等宿,結構化流可以確保在任何故障下的端到端的一次性語義。
使用DataFrame和DataSet API
從Spark 2.0開始,DataFrames和Datasets可以表示靜態,有界數據,以及流式,無界數據。與靜態DataSets/ DataFrames類似,您可以使用公共入口點SparkSession(Scala / Java / Python文檔)從流源創建流DataFrames /DataSets,並對它們應用與靜態DataFrames / Datasets相同的操作。如果您不熟悉Datasets / DataFrames,強烈建議您使用DataFrame / Dataset編程指南熟悉它們。
創建數據框流和數據集流
Streaming DataFrames可以通過SparkSession.readStream()返回的DataStreamReader接口(Scala / Java / Python docs)創建。類似於用於創建靜態DataFrame的讀取接口,您可以指定源 - 數據格式,模式,選項等的詳細信息。
數據源
在Spark 2.0,有幾個內置的數據源:
- 文件源:將寫入目錄中的文件讀取為數據流。支持的文件格式有text,csv,json,parquet。請參閱DataStreamReader界面的文檔以獲取更新的列表,以及每種文件格式支持的選項。注意,文件必須原子地放置在給定目錄中,在大多數文件系統中,可以通過文件移動操作來實現。
- Kafka源:從kafka拉取數據,支持kafka broker versions 0.10.0 or higher.從kafka集成指南獲取更多信息。
- Socket源(測試用):從套接字連接讀取UTF8文本數據。監聽服務器套接字在驅動程序。注意,這應該僅用於測試,因為這不提供端到端容錯保證
這些示例生成無類型的流式DataFrames,這意味着在編譯時不檢查DataFrame的模式,僅在提交查詢時在運行時檢查。一些操作,如map,flatMap等,需要在編譯時知道類型。要做到這些,你可以使用與靜態DataFrame相同的方法將這些無類型的流DataFrames轉換為類型化流數據集。有關更多詳細信息,請參閱SQL編程指南。此外,有關支持的流媒體源的更多詳細信息將在文檔中稍后討論。
數據框/數據集流的模式推理和分區
默認情況下,基於文件的源的結構化流要求您指定模式,而不是依靠Spark自動推斷。此限制確保即使在發生故障的情況下,一致的模式也將用於流式查詢。對於臨時用例,可以通過將spark.sql.streaming.schemaInference設置為true來重新啟用模式推斷。
當名為/ key = value /的子目錄存在時,發生分區發現,並且列表將自動遞歸到這些目錄中。如果這些列出現在用戶提供的模式中,它們將由Spark根據正在讀取的文件的路徑填充。當查詢開始時,組成分區方案的目錄必須存在,並且必須保持靜態。例如,可以添加/ data / year = 2016 / when / data / year = 2015 /存在,但是更改分區列是無效的(即通過創建目錄/ data / date = 2016-04-17 /)。
流式DataFrames/Datasets上的操作
您可以對流式DataFrames /數據集應用各種操作 - 從無類型,類似SQL的操作(例如select,where,groupBy)到類型化的RDD類操作(例如map,filter,flatMap)。有關更多詳細信息,請參閱SQL編程指南。讓我們來看看一些你可以使用的示例操作。
基本操作 - 選擇,投影,聚合
case class DeviceData(device: String, type: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("type").count() // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API
事件時間上的窗口操作
滑動事件時間窗口上的聚合通過結構化流直接進行。理解基於窗口的聚合的關鍵思想與分組聚合非常相似。在分組聚合中,為用戶指定的分組列中的每個唯一值維護聚合值(例如計數)。在基於窗口的聚合的情況下,對於行的事件時間落入的每個窗口維持聚合值。讓我們用插圖來理解這一點。
想象一下,我們的快速示例被修改,流現在包含行以及生成行的時間。我們不想運行字數,而是計算10分鍾內的字數,每5分鍾更新一次。也就是說,在10分鍾窗口12:00-12:10,12:05-12:15,12:10-12:20等之間接收的詞中的字數。注意,12:00 -12:10意味着數據在12:00之后但在12:10之前到達。現在,考慮在12:07收到的一個字。這個單詞應該增加對應於兩個窗口12:00 - 12:10和12:05 - 12:15的計數。因此,計數將通過分組鍵(即字)和窗口(可以從事件時間計算)來索引。
結果表將如下所示:
由於此窗口類似於分組,因此在代碼中,可以使用groupBy()和window()操作來表示窗口化聚合。您可以在Scala / Java / Python中查看以下示例的完整代碼。
處理延遲數據和水位線
現在考慮如果事件中的一個到達應用程序的遲到會發生什么。例如,例如,在12:04(即事件時間)生成的詞可以由應用在12:11接收到。應用程序應使用時間12:04而不是12:11來更新窗口12:00 - 12:10的舊計數。這在我們的基於窗口的分組中自然地發生 - 結構化流可以長時間地保持部分聚合的中間狀態,使得晚期數據可以正確地更新舊窗口的聚集,如下所示。
但是,要運行此查詢的天數,系統必須綁定其累積的中間內存中狀態的數量。這意味着系統需要知道何時可以從內存中狀態刪除舊聚合,因為應用程序將不再接收該聚合的延遲數據。為了實現這一點,在Spark 2.1中,我們引入了水印,讓我們的引擎自動跟蹤數據中的當前事件時間,並嘗試相應地清理舊的狀態。您可以通過指定事件時間列和根據事件時間預計數據延遲的閾值來定義查詢的水印。對於在時間T開始的特定窗口,引擎將保持狀態並允許后期數據更新狀態,直到(由引擎看到的最大事件時間 - 后期閾值> T)。換句話說,閾值內的晚數據將被聚合,但晚於閾值的數據將被丟棄。讓我們用一個例子來理解這個。我們可以使用withWatermark()在上面的例子中輕松定義水印,如下所示。
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
在這個例子中,我們定義查詢的水印對列“timestamp”的值,並且還定義“10分鍾”作為允許數據超時的閾值。如果此查詢在Append輸出模式(稍后在“輸出模式”部分中討論)中運行,則引擎將從列“timestamp”跟蹤當前事件時間,並在最終確定窗口計數和添加之前等待事件時間的額外“10分鍾”他們到結果表。這是一個例證。
如圖所示,由引擎跟蹤的最大事件時間是藍色虛線,並且在每個觸發的開始處設置為(最大事件時間 - '10分鍾')的水印是紅色線。例如,當引擎觀察數據(12:14,狗),它將下一個觸發器的水印設置為12:04。對於窗口12:00 - 12:10,部分計數保持為內部狀態,而系統正在等待延遲數據。在系統發現數據(即(12:21,owl))使得水印超過12:10之后,部分計數被最終確定並附加到表。此計數將不會進一步更改,因為所有超過12:10的“太晚”數據將被忽略。
請注意,在追加輸出模式下,系統必須等待“延遲閾值”時間才能輸出窗口的聚合。如果數據可能很晚,(例如1天),並且您希望部分計數而不等待一天,這可能不是理想的。將來,我們將添加更新輸出模式,這將允許每次更新聚合寫入到每個觸發器。
用於清除聚合狀態的水印的條件重要的是要注意,水印應當滿足以下條件以清除聚合查詢中的狀態(從Spark 2.1開始,將來會改變)。
- 輸出模式必須為追加。完成模式要求保留所有聚合數據,因此不能使用水印來刪除中間狀態。有關每種輸出模式的語義的詳細說明,請參見“輸出模式”部分。
- 聚合必須具有事件時列,或事件時列上的窗口。
- withWatermark必須在與聚合中使用的時間戳列相同的列上調用。例如,df.withWatermark(“time”,“1 min”)。groupBy(“time2”)。count()在Append輸出模式下無效,因為水印是在與聚合列不同的列上定義的。
- 其中在要使用水印細節的聚合之前必須調用withWatermark。例如,df.groupBy(“time”).count().withWatermark(“time”,“1 min”)在Append輸出模式中無效。
Join操作
流DataFrames可以與靜態DataFrames連接以創建新的流DataFrames。這里有幾個例子。
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
不支持的操作
但是,請注意,所有適用於靜態DataFrames /數據集的操作在流式DataFrames /數據集中不受支持。雖然這些不支持的操作中的一些將在未來的Spark版本中得到支持,但還有一些基本上難以有效地在流數據上實現。例如,輸入流數據集不支持排序,因為它需要跟蹤流中接收的所有數據。因此,這在根本上難以有效地執行。從Spark 2.0開始,一些不受支持的操作如下:
- 在流數據集上還不支持多個流聚集(即,流DF上的聚合鏈)。
- 在流數據集上不支持限制和獲取前N行。
- 不支持對流數據集進行不同操作。
- 排序操作僅在聚合后在完整輸出模式下支持流數據集。
- 條件支持流式傳輸和靜態數據集之間的外連接。
- 不支持帶有流數據集的完全外連接
- 不支持左外部連接與右側的流數據集
- 不支持左側的流數據集的右外部聯接
- 尚不支持兩個流數據集之間的任何類型的連接。
此外,還有一些Dataset方法不能用於流數據集。它們是將立即運行查詢並返回結果的操作,這對流數據集沒有意義。相反,這些功能可以通過顯式地啟動流查詢來完成(參見下一部分)。
- count() - 無法從流數據集返回單個計數。
相反,使用ds.groupBy.count()返回包含運行計數的流數據集。 - foreach() - 而是使用ds.writeStream.foreach(...)(參見下一節)。
- show() - 而是使用控制台接收器(請參閱下一節)。
如果您嘗試任何這些操作,您將看到一個AnalysisException如“操作XYZ不支持與流DataFrames /數據集”。
啟動流式查詢
一旦定義了最終結果DataFrame / Dataset,剩下的就是啟動流計算。為此,您必須使用通過Dataset.writeStream()返回的DataStreamWriter(Scala / Java / Python文檔)。您必須在此界面中指定以下一個或多個。
- 輸出接收器的詳細信息:數據格式,位置等
- 輸出模式:指定寫入輸出接收器的內容。
- 查詢名稱:(可選)指定查詢的唯一名稱以進行標識。
- 觸發間隔:可選擇指定觸發間隔。如果未指定,系統將在上一個處理完成后立即檢查新數據的可用性。如果由於先前處理尚未完成而錯過觸發時間,則系統將嘗試在下一觸發點處觸發,而不是在處理完成之后立即觸發。
- 檢查點位置:對於可以保證端到端容錯的某些輸出接收器,請指定系統將寫入所有檢查點信息的位置。這應該是HDFS兼容的容錯文件系統中的目錄。檢查點的語義將在下一節中更詳細地討論。
輸出模式
有幾種類型的輸出模式:
- 附加模式(默認) - 這是默認模式,其中只有自上次觸發后添加到結果表中的新行將輸出到接收器。這僅支持那些添加到結果表中的行從不會更改的查詢。因此,該模式保證每行只輸出一次(假設容錯宿)。例如,只有select,where,map,flatMap,filter,join等的查詢將支持Append模式。
- 完成模式 - 每次觸發后,整個結果表將輸出到接收器。聚合查詢支持此選項。
- 更新模式 - (在Spark 2.1中不可用)只有結果表中自上次觸發后更新的行才會輸出到接收器。更多信息將在未來版本中添加。
不同類型的流查詢支持不同的輸出模式。這里是兼容性矩陣:
查詢類型 | 支持的輸出模式 | 注 | |
---|---|---|---|
無聚合的查詢 | 支持完整模式 | 因為不可能保留結果表中的所有數據。 | |
帶有聚合的聚合 | 聚合在帶水印的事件時間聚合 | 附加,完全 | 附加模式使用水印來刪除舊的聚合狀態。但是窗口化聚合的輸出被延遲了在withWatermark() 中指定的晚期閾值,如模式語義,在結束表之后,只有在結束表(在水印被交叉之后)才能將行添加一次。有關詳細信息,請參閱延遲數據部分。完成模式不刪除舊的聚合狀態,因為從定義該模式保留結果表中的所有數據。 |
其他聚合 | 完全 | 不支持完全附加模式,因為聚合可以更新,因此違反了此模式的語義。完成模式不刪除舊的聚合狀態,因為從定義該模式保留結果表中的所有數據。 |
輸出接收器
有幾種類型的內置輸出接收器:
- 文件接收器 - 將輸出存儲到目錄。
- Foreach sink - 對輸出中的記錄運行任意計算。有關詳細信息,請參閱后面的部分。
- 控制台接收器(用於調試) - 每次有觸發器時將輸出打印到控制台/ stdout。這應該用於低數據量上的調試目的,因為每次觸發后,整個輸出被收集並存儲在驅動程序的內存中。
- 內存接收器(用於調試) - 輸出作為內存表存儲在內存中。支持附加和完成輸出模式。這應該用於低數據量上的調試目的,因為每次觸發后,整個輸出被收集並存儲在驅動程序的內存中。
下面是所有接收器的表格和相應的設置:
接收器 | 支持的輸出模式 | 用法 | 容錯 | 備注 |
---|---|---|---|---|
文件接收器 | Append | writeStream.format("parquet").start() |
Yes | 支持對分區表的寫入。按時間分區可能有用。 |
Foreach 接收器 | 所有模式 | writeStream.foreach(...).start() |
取決於ForeachWriter實現 | 更多細節在下一節 |
控制台接收器 | Append, Complete | writeStream.format("console").start() |
No | |
內存接收器 | Append, Complete | writeStream.format("memory").queryName("table").start() |
No | 將輸出數據保存為表,用於交互式查詢。表名是查詢名稱。 |
最后,你必須調用start()才能真正開始執行查詢。這返回一個StreamingQuery對象,它是連續運行的執行的句柄。您可以使用此對象來管理查詢,我們將在下一小節中討論。現在,讓我們通過幾個例子來理解這一切。
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
// Print new data to console
noAggDF
.writeStream()
.format("console")
.start();
// Write new data to Parquet files
noAggDF
.writeStream()
.parquet("path/to/destination/directory")
.start();
// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();
// Print updated aggregations to console
aggDF
.writeStream()
.outputMode("complete")
.format("console")
.start();
// Have all the aggregates in an in-memory table
aggDF
.writeStream()
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start();
spark.sql("select * from aggregates").show(); // interactively query in-memory table
使用foreach
foreach操作允許對輸出數據計算任意操作。從Spark 2.1開始,這只適用於Scala和Java。要使用這個,你必須實現接口ForeachWriter(Scala / Java docs),它有一個方法,當觸發后產生一系列行作為輸出時被調用。請注意以下要點。
編寫器必須是可序列化的,因- 為它將被序列化並發送到執行器以供執行。
- 所有三個方法,打開,處理和關閉將被調用的執行者。
- 只有當調用open方法時,寫程序必須執行所有的初始化(例如打開連接,啟動事務等)。請注意,如果在創建對象時在類中有任何初始化,那么該初始化將在驅動程序中進行(因為這是創建實例的地方),這可能不是您想要的。
- 版本和分區是open中的兩個參數,它們唯一地表示需要被推出的一組行。版本是一個單調增加的id,隨着每個觸發器增加。partition是表示輸出的分區的id,因為輸出是分布式的,並且將在多個執行器上處理。
- open可以使用版本和分區來選擇是否需要寫行序列。因此,它可以返回true(繼續寫入)或false(不需要寫入)。如果返回false,那么將不會在任何行上調用進程。例如,在部分故障之后,失敗觸發器的一些輸出分區可能已經被提交到數據庫。基於存儲在數據庫中的元數據,寫者可以識別已經提交的分區,因此返回false以跳過再次提交它們。
- 每當調用open時,也將調用close(除非JVM由於某些錯誤而退出)。即使open返回false,也是如此。如果在處理和寫入數據時出現任何錯誤,將使用錯誤調用close。您有責任清除在開放中創建的狀態(例如連接,事務等),以便沒有資源泄漏。
管理流式查詢
啟動查詢時創建的StreamingQuery對象可用於監視和管理查詢。
StreamingQuery query = df.writeStream().format("console").start(); // get the query object
query.id(); // get the unique identifier of the running query
query.name(); // get the name of the auto-generated or user-specified name
query.explain(); // print detailed explanations of the query
query.stop(); // stop the query
query.awaitTermination(); // block until query is terminated, with stop() or with error
query.exception(); // the exception if the query has been terminated with error
query.sourceStatus(); // progress information about data has been read from the input sources
query.sinkStatus(); // progress information about data written to the output sink
您可以在單個SparkSession中啟動任意數量的查詢。他們將同時運行共享集群資源。您可以使用sparkSession.streams()獲取可用於管理當前活動查詢的StreamingQueryManager(Scala / Java / Python文檔)。
SparkSession spark = ...
spark.streams().active(); // get the list of currently active streaming queries
spark.streams().get(id); // get a query object by its unique id
spark.streams().awaitAnyTermination(); // block until any one of them terminates
監視流查詢
有兩個API用於以交互式和異步方式監視和調試活動的查詢。
交互式API
您可以使用streamingQuery.lastProgress()和streamingQuery.status()直接獲取活動查詢的當前狀態和指標。 lastProgress()在Scala和Java中返回一個StreamingQueryProgress對象,在Python中返回一個具有相同字段的字典。它具有關於在流的最后觸發中所進行的進展的所有信息 - 什么數據被處理,什么是處理速率,等待時間等。還有streamingQuery.recentProgress,它返回最后幾個進度的數組。
此外,streamingQuery.status()在Scala和Java中返回StreamingQueryStatus對象,在Python中返回具有相同字段的字典。它提供有關查詢立即執行的操作的信息 - 是觸發器活動,正在處理數據等。這里有幾個例子。
StreamingQuery query = ...
System.out.println(query.lastProgress());
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
System.out.println(query.status());
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
異步API
您還可以通過附加StreamingQueryListener(Scala / Java docs)異步監視與SparkSession相關聯的所有查詢。使用sparkSession.streams.attachListener()附加自定義StreamingQueryListener對象后,當查詢啟動和停止以及活動查詢中有進度時,您將獲得回調。這里是一個例子
SparkSession spark = ...
spark.streams.addListener(new StreamingQueryListener() {
@Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});
使用檢查點從故障中恢復
在故障或故意關閉的情況下,您可以恢復先前查詢的先前進度和狀態,並繼續在其停止的地方。這是通過使用檢查點和預寫日志來完成的。您可以配置具有檢查點位置的查詢,並且查詢將保存所有進度信息(即每個觸發器中處理的偏移范圍)和正在運行的聚合(例如快速示例中的字計數)到檢查點位置。此檢查點位置必須是HDFS兼容文件系統中的路徑,並且可以在啟動查詢時在DataStreamWriter中設置為選項。
aggDF
.writeStream()
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start();