5.結構化流的輸出
一旦定義好了streaming DataFrame/Dataset的最終結果,剩下的就是一些計算輸出了.為此,必須使用 DataStreamWriter
通過 Dataset.writeStream()
返回.此時必須以下一個或多個
輸出落地 的詳細信息: Data format, location 等等
輸出模式(Output mode)
查詢名稱(Query name) 可選,指定用於標識的查詢的唯一名稱
觸發器間隔(Trigger interval):可選地,指定觸發間隔。如果沒有指定,系統將檢查盡快獲得新數據前處理完成。如果一個觸發時間錯過了,因為前面的處理還沒有完成,則系統將觸發立即處理
檢查點位置(Checkpoint location) 對於可以保證 end-to-end fault-tolerance (端對端容錯)能力的某些 output sinks ,請指定系統將寫入所有 checkpoint (檢查點)信息的位置。
這應該是與 HDFS 兼容的容錯文件系統中的目錄.
5.1 輸出模式
追加(append)(默認) 這是默認的模式.在上次觸發后,只有新行添加到結果表(result-table)才會觸發。
這是只支持那些查詢,行添加到結果表永遠不會改變,因為這種模式保證每一行將只輸出一次。例如,只有選擇查詢,地圖,flatMap,過濾,加入等將支持附加模式
完全(complete) 每次觸發時都會將整個結果表(result-table)輸出.例如聚合結果等
更新(update) 只有結果表在上次被觸發后被更新才會觸發
不同的查詢支持不同的輸出模式,具體見下:
查詢類型 | 支持模式 | 描述 | |
帶聚合的查詢 | 聚合帶事件時間的水印 | Append, Update, Complete | Append 使用水印來刪除舊的聚集狀態 窗口將在水印最終閾值的時候聚合.所以結果將在水印過期,最終結果確定的情況下添加進結果表 Update 使用水印來刪除舊的聚集狀態 Complete 不會使用水印來刪除舊的聚集狀態 |
不帶水印的聚合 | Complete, Update | 由於沒有使用水印,舊的聚集狀態不會被刪除.不支持append,因為對整數據的聚合結果會不斷更新,不符合append的語義 | |
mapGroupsWithState | Update | ||
flatMapGroupsWithState | 追加操作模式 | Append | flatMapGroupsWithState 之后允許 Aggregations (聚合) |
更新操作模式 | Update | flatMapGroupsWithState 之后不允許 Aggregations (聚合)。 |
|
帶join的查詢 | Append | join不支持Update,Complete模式 | |
其它查詢 | Append, Update | 不支持Complete.因為非聚合數據的結果表,全部保存進結果表時不允許的 |
5.2 輸出落地
5.2.1 落地文件
落地文件,是適合長期保存.這里的文件是指HDFS等任何Hadoop支持的文件系統.
落地文件,支持自動分區.並且支持有且僅有一次的容錯
val query = streamingSelectDF .writeStream .format("parquet") .option("path", "文件保存路徑") .partitionBy("zip", "day") //分區字段 .trigger(ProcessingTime("25 seconds")) .start()
5.2.2 落地Kafka
kafka專章描述,具體見后
5.2.3 落地控制台(debug使用)
輸出被直接打印到控台或者stdout
日志
5.2.4 落地Memory接收器(debug使用)
5.2.5 foreach
foreach落地,是用戶自定義輸出的一種方式.foreach需要用戶實現 ForeachWriter 類.實際處理取決於用戶如何實現.(foreach只能使用在Scala/Java中)
它將在觸發器(trigger)之后生成結果行時調用一個用戶實現.
注意: foreach的實現載體是多個executor中的某一個
5.2.5.1 實現
open: writer 初始化時執行(例如打開連接,開啟事務等).它具有兩個入參: version=>每個觸發器單調遞增的ID partitionId =>分區ID.
open將返回一個布爾值.當為false時,后續將不會產生任何調用.用戶可以根據自己的邏輯,用這個返回值指出本次輸出后續是否還有必要執行
proccess:根據open的返回值決定是否需要執行.
close:無論open返回什么值,close必然會執行.這里適合用戶做一些資源回收操作
class WriterToMysql extends ForeachWriter[Row] { private var connection: Connection = null; override def open(partitionId: Long, version: Long): Boolean = { Class.forName("com.mysql.jdbc.Driver") connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC&characterEncoding=UTF-8&useSSL=false", "root", "12abAB") return true } override def process(value: Row): Unit = { val stmt = connection.createStatement() try { stmt.execute( s""" |replace into `test`.`structured_streaming_window_watermark_app_test` |(`window`,`word`,`count`) |VALUES |('${value.get(0).toString}','${value.get(1)}',${value.getLong(2)}); """.stripMargin) } finally { stmt.close() } } override def close(errorOrNull: Throwable): Unit = { try { if (connection != null) { connection.close() } } finally {} } } //使用 val query = windowWordCounts.writeStream.outputMode("update") .foreach(new WriterToMysql()) .option("truncate", "false") .start();
5.2.6 輸出落地可以承載的輸出模式
接收器 | 支持的輸出模式 | 可選項 | 容錯機制 | 備注 |
文件 | Append | path | 支持容錯(有且僅有一次) | 支持分區 |
kafka | Append, Update, Complete | 詳見kafka專章 | 支持容錯(最少一次) | |
foreach | Append, Update, Complete | 取決於foreach的實現 | ||
控制台 | Append, Update, Complete | numRows:每次打印的行數(默認20),truncate:輸出太長時截斷(默認true) | 不支持容錯 | |
內存 | Append, Complete | 不支持容錯,但重啟時重建整張結果表 |
5.3 觸發器
Spark2.3 已經具有以下四種觸發器
默認觸發器
如果不設置任何觸發器,將默認使用此觸發器.觸發規則是:上一個微批處理結束,啟動下一個微批處理
時間間隔觸發(Fixed interval micro-batches)
設置了觸發間隔的觸發器.
如果上一個微批處理在觸發間隔前結束,會等待間隔時間結束后再觸發.
如果在觸發間隔前還沒有處理結束.后續暫時不會觸發,會等待上個微批處理結束后再立即啟動
.trigger(Trigger.ProcessingTime("2 seconds"))
只觸發一次(One-time micro-batch)
觸發一次處理全部數據,然后自然停止.適合用在關閉集群時數據的最后處理和清理
.trigger(Trigger.Once())
連續觸發器(Continuous with fixed checkpoint interval)
連續處理是Spark2.3才提供的觸發器.它是以一個低延遲,連續處理模式(沒有間隔概念)的模式進行處理.
.trigger(Trigger.Continuous("1 second"))
不過很可惜,Spark2.3中連續處理依然只是一個實驗版本,在此只做一個簡單介紹.
連續處理會啟動多個長時間的運行的任務.這些任務不斷地從源中讀取數據、處理數據並不斷地向接收器寫入數據。查詢所需的任務數量取決於查詢可以並行地從源中讀取多少個分區
因此,在開始一個連續的處理查詢之前,必須確保集群中有足夠多的內核並行執行所有任務.(10分區的Kafka,必須至少有10個內核才能進行查詢)
現階段連續處理只能接收Map系操作,還不支持聚合系操作.
支持的數據源只Kafka,Rate兩種
不支持失敗任務的重試.任何失敗都會導致查詢終止,必須手動的從檢查點恢復.