structed streaming的執行批次,較spark streaming有所改變。更加靈活。總結下來,可大白話地分為三類:
1盡可能快的執行,不定時間
2按固定間隔時間執行
3僅執行一次
詳情如下:
| Trigger類型 | 使用 | 注意 |
|---|---|---|
| unspecified (default) | as soon as micro-batch | If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.如果不設置,默認使用微批,但沒有時間間隔,盡可能快的處理 |
| Interval micro-batch(固定間隔的微批) | Trigger.ProcessingTime(long interval, TimeUnit timeUnit) | 根據數據實際情況,不定時批次1. 沒有明確指明觸發器時,默認使用該觸發器,即Trigger.ProcessingTime(0L), 表示將盡可能快地執行查詢。2. 該模式下,將按用戶指定的時間間隔啟動微批處理。3. 如果前一個微批在該間隔內完成,則引擎將等待該間隔結束,然后再開始下一個微批處理。4. 如果前一個微批花費的時間比間隔要長,下一個微批將在前一個微批處理完成后立即開始。5. 如果沒有新數據可用,則不會啟動微批處理。 |
| One-time micro-batch (一次性微批) | Trigger.Once() | 僅執行一次 |
| Continuous with fixed checkpoint interval(連續處理) | Trigger.Continuous(long interval, TimeUnit timeUnit) | 以固定的Checkpoint間隔(interval)連續處理。在這種模式下,連續處理引擎將每隔一定的間隔(interval)做一次checkpoint,可獲得低至1ms的延遲。但只保證 at-least-once |
為什么continuous只支持at-least-once
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
注意這里的 1 second 指的是每隔 1 秒記錄保存一次狀態,而不是說每隔 1 秒才處理數據
continuous 不再是周期性啟動 task,而是啟動長期運行的 task,也不再是處理一批數據,而是不斷地一個數據一個數據地處理,並且也不用每次都記錄偏移,而是異步地,周期性的記錄狀態,這樣就能實現低延遲.
綜上,continuous模式下長期運行一個task,而不會實時去記錄offset,所以不能保證eactly-once.
三種批次方式的驗證
1.Interval micro-batch(固定間隔的微批)
`{
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger(StructuredSparing.class).setLevel(Level.ERROR);
SparkSession session = SparkSession
.builder()
.master("local")
.config("spark.sql.streaming.checkpointLocation", "D://checkpoint")
.getOrCreate();
Dataset<Row> stream = session.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load();
StreamingQuery query = stream.writeStream()
.queryName("StructuredSparingTest")
.format("console")
.trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
}`
設置為5秒一個批次。
通過UI界面可以很直觀地看出,在有數據的時候5秒一個批次,在沒有數據的時候,10秒甚至3分鍾才執行一個批次。

通過與spark streaming進行比較可以更加直觀.在spark streaming里設置8秒一個批次,在UI界面可以看到,不管有無數據,spark streaming嚴格按照8秒的批次執行。

2.One-time micro-batch (一次性微批)
.trigger(Trigger.Once())
執行結果,略。
3.Continuous方式
.trigger(Trigger.Continuous(100,TimeUnit.MILLISECONDS))
設置100毫秒一個執行批次,通過UI界面可以看出,時間已經1.2分鍾,但是active job一直只有一個,一直在running,證明啟動了一個長期運行的task,不斷地一批數據一批數據連續處理。

