1.時間語義
Flink是一個實時計算引擎,談到實時概念,就必然會設計到時間概念。Flink的時間語義是保證實時及實時數據處理的一致性,及時性。Flink時間語義分為下面三種
Event Time:事件創建時間
Ingestion Time:事件攝入時間(數據進入Flink的時間)
Processing Time:時間創建時間(執行操作算子的本地系統時間)
通過一個實際的場景就很好理解
在實際業務場景中,為保證實時性和數據的正確性通常對Event Time<事件創建時間>處理比較常見。
2.窗口(Window)
Flink的核心在處理流式數據及無限流(個人覺得在有限流及批數據處理上Spark在應用和處理上是優於Flink),無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。這里讀者就會有疑問了,這不就是微批嗎?為什么不用sparkstreaming,sparkstreaming的核心就是微批處理。這里個人認為在底層邏輯上是有很大差別的。flink-window本身對攝取數據的方式不做改變,只是在算子計算中根據時間控制截取有限數據塊,而且這個時間控制和有限數據塊不宜過大,過大就失去核心意義了。而sparkstreaming是在攝入的時候就是一批一批的攝入,而每批的攝入不宜國小,如果過小會有急劇的性能壓力,會使數據計算阻塞。
Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的buckets桶,我們可以在這些桶上做計算操作。
Window類型
時間窗口(Time Window),按照時間生成Window
滾動時間窗口
滑動時間窗口
會話窗口
計數窗口(Count Window),按照指定的數據條數生成一個Window,與時間無關
滾動計數窗口
滑動計數窗口
滾動窗口
依據固定的窗口長度對數據進行切分,時間對齊,窗口長度固定,沒有重疊
滑動窗口
可以按照固定的長度向后滑動固定的距離,滑動窗口由固定的窗口長度和滑動間隔組成,可以有重疊(是否重疊和滑動距離有關系)
滑動窗口是固定窗口的更廣義的一種形式,滾動窗口可以看做是滑動窗口的一種特殊情況(即窗口大小和滑動間隔相等)
會話窗口(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成,也就是一段時間沒有接收到新數據就會生成新的窗口
創建不同類型的窗口
滾動時間窗口(tumbling time window)
.timeWindow(Time.seconds(15))
滑動時間窗口(sliding time window)
.timeWindow(Time.seconds(15),Time.seconds(5))
會話窗口(session window)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
滾動計數窗口(tumbling count window)
.countWindow(5)
滑動計數窗口(sliding count window)
.countWindow(10,2)
3.水位線(watermark)
watermark概念
流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、分布式等原因,導致亂序的產生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event Time順序排列的。
Flink對於遲到數據有三層保障,先來后到的保障順序是:
WaterMark => 約等於放寬窗口標准
allowedLateness => 允許遲到(ProcessingTime超時,但是EventTime沒超時)
sideOutputLateData => 超過遲到時間,另外捕獲,之后可以自己批處理合並先前的數據
那么此時出現一個問題,一旦出現亂序,如果只根據eventTime決定window的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了,這個特別的機制,就是Watermark。
Watermark在flink中本質是解決數據的一致性(順序性),那么如何避免亂序數據帶來的計算不正確?
a.遇到一個時間戳達到了窗口關閉時間,不應該立即觸發窗口計算,而是等待一段時間,等遲到的數據來了再關閉窗口
b.Watermark是一種衡量Event Time進展的機制,可以設定延遲觸發
c.Watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現
d.數據流中的Watermark用於表示”timestamp小於Watermark的數據,都已經到達了“,因此,window的執行也是由Watermark觸發的。
e.Watermark可以理解成一個延遲觸發機制,我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime小於maxEventTime - t的所有數據都已經到達,如果有窗口的停止時間等於maxEventTime – t,那么這個窗口被觸發執行。
Watermark = maxEventTime-延遲時間t
watermark特點
a.watermark是一條特殊的數據記錄
b.watermark必須單調遞增,以確保任務的事件時間時鍾在向前推進,而不是在后退
c.watermark與數據的時間戳相關
案例
測試代碼
package com.meijs;
import java.io.Serializable;
public class Temperature implements Serializable {
private String id;
private double temperature;
private long eventTime;
//這里必須有空構造方法,不然flink會報錯
public Temperature() {
}
public Temperature(String id, double temperature, long eventTime) {
this.id = id;
this.temperature = temperature;
this.eventTime = eventTime;
}
@Override
public String toString() {
return "Temperature{" +
"id='" + id + '\'' +
", temperature=" + temperature +
", eventTime=" + eventTime +
'}';
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public long getEventTime() {
return eventTime;
}
public void setId(String id) {
this.id = id;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
public void setEventTime(long eventTime) {
this.eventTime = eventTime;
}
}
package com.meijs;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WatermarkTest {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStream<String> dataStream = env.socketTextStream("192.168.154.130", 7777);
SingleOutputStreamOperator<Temperature> minTemp = dataStream.map(line -> {
String[] lines = line.split(",");
return new Temperature(lines[0], Double.parseDouble(lines[1]), Long.parseLong(lines[2]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Temperature>(Time.seconds(2)) {
@Override
public long extractTimestamp(Temperature element) {
return element.getEventTime()*1000L;
}
}).keyBy("id")
.timeWindow(Time.seconds(5))
.minBy("temperature");
minTemp.print();
env.execute("WatermarkTest");
}
}
運行測試
a.啟動一個sock服務
nc -lk 7777
b.啟動Java測試類,啟動后往sock中一行行輸入數據,觀察現象如下
可以看到第一次sock輸入的數據如下
1,37.2,1642599451
1,37.3,1642599452
1,37.1,1642599453
1,38.0,1642599455
1,38.2,1642599456
2,37.3,1642599454
2,38.2,1642599457
輸出的結果如下
Temperature{id='1', temperature=37.1, eventTime=1642599453}
Temperature{id='2', temperature=37.3, eventTime=1642599454}
可以看到第二次sock輸入的數據如下
1,37.2,1642599451
1,37.3,1642599452
1,37.1,1642599453
1,38.0,1642599455
1,38.2,1642599456
2,37.3,1642599454
2,38.2,1642599457
2,39.3,1642599458
2,39.5,1642599459
1,39.7,1642599461
2,39.7,1642599460
2,39.5,1642599462
輸出的結果如下
Temperature{id='2', temperature=38.2, eventTime=1642599457}
Temperature{id='1', temperature=38.0, eventTime=1642599455}
通過對watermark,window,時間語義的綜合分析,我們知道正常情況下
第一個事件時間窗口應該是如下:
[1642599450,1642599451,1642599452,1642599453,1642599454)
第二個事件時間窗口應該是如下:
[1642599455,1642599456,1642599457,1642599458,1642599459)
在第一個時間窗口中
原本應該按時到的1642599454的數據延遲了兩秒才到,1642599455,1642599456早到了一秒,而我們設置的watermark為2,剛好晚到兩秒的數據可以在第一個時間窗口內。
同理第二個也是如此
思考:
a.為什么時間事件語義是從1642599450開始的,而不是從我們輸入的第一個數據1642599451開始的,這里我們跟一下源碼
從timeWindow->KeyedStream(從timeWindow)->TumblingProcessingTimeWindows(assignWindows)->TimeWindow(getWindowStartWithOffset)
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
if (staggerOffset == null) {
staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
long start = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
這里我們的timestamp=1642599451,offset=0,windowsize=5,最后計算得出的結果即起始位置為1642599450,offset為偏移量,即對當前數據是否需要往前或往后偏移。
b.我們這里設置可以修正晚2s到來導致的數據的順序性,如果大於2s如何處理?
在flink中對於該類數據可以設置側輸出流,如下代碼
package com.meijs;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
public class WatermarkTest {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
DataStream<String> dataStream = env.socketTextStream("192.168.154.130", 7777);
OutputTag<Temperature> outputTag = new OutputTag<Temperature>("late") {
};
SingleOutputStreamOperator<Temperature> minTemp = dataStream.map(line -> {
String[] lines = line.split(",");
return new Temperature(lines[0], Double.parseDouble(lines[1]), Long.parseLong(lines[2]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Temperature>(Time.seconds(2)) {
@Override
public long extractTimestamp(Temperature element) {
return element.getEventTime()*1000L;
}
}).keyBy("id")
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(4))
.sideOutputLateData(outputTag)
.minBy("temperature");
minTemp.print("watermark");
minTemp.getSideOutput(outputTag).print("late");
env.execute("WatermarkTest");
}
}
輸入數據和輸出結果如下:
注意:
a.watermark事件時間不建議設置過大,也就是說不建議處理過大的延遲,最好小於分鍾級別。也不能過小。對於過大的延遲可以采取側輸出流合並。
b.不建議時間窗口設置過大,過大會影響實時流的時效。
關於flink watermark推薦一篇文章(https://blog.csdn.net/lmalds/article/details/52704170),