一、簡介
開源流式處理系統在不斷地發展,從一開始只關注低延遲指標到現在兼顧延遲、吞吐與結果准確性,在發展過程中解決了很多問題,編程API的易用性也在不斷地提高。本文介紹一下 Flink 中的核心概念,這些概念是學習與使用 Flink 十分重要的基礎知識,在后續開發 Flink 程序過程中將會幫助開發人員更好地理解 Flink 內部的行為和機制。
這里引用一張圖來對常用的實時計算框架做個對比:
Flink 是有狀態的和容錯的,可以在維護一次應用程序狀態的同時無縫地從故障中恢復。它支持大規模計算能力,能夠在數千個節點上並發運行。它具有很好的吞吐量和延遲特性。同時,Flink 提供了多種靈活的窗口函數。Flink 在流式計算里屬於真正意義上的單條處理,每一條數據都觸發計算,而不是像 Spark 一樣的 Mini Batch 作為流式處理的妥協。Flink的容錯機制較為輕量,對吞吐量影響較小,而且擁有圖和調度上的一些優化,使得 Flink 可以達到很高的吞吐量。而 Strom 的容錯機制需要對每條數據進行ack,因此其吞吐量瓶頸也是備受詬病。
二、工作原理
Flink基本工作原理如下圖:
JobClient:負責接收程序,解析和優化程序的執行計划,然后提交執行計划到JobManager。這里執行的程序優化是將相鄰的Operator融合,形成Operator Chain,Operator的融合可以減少task的數量,提高TaskManager的資源利用率。
JobManagers:負責申請資源,協調以及控制整個job的執行過程,具體包括,調度任務、處理checkpoint、容錯等等。
TaskManager:TaskManager運行在不同節點上的JVM進程,負責接收並執行JobManager發送的task,並且與JobManager通信,反饋任務狀態信息,如果說JobManager是master的話,那么TaskManager就是worker用於執行任務。每個TaskManager像是一個容器,包含一個或者多個Slot。
Slot:Slot是TaskManager資源粒度的划分,每個Slot都有自己獨立的內存。所有Slot平均分配TaskManager的內存,值得注意的是,Slot僅划分內存,不涉及CPU的划分,即CPU是共享使用。每個Slot可以運行多個task。Slot的個數就代表了一個程序的最高並行度。
Task:Task是在operators的subtask進行鏈化之后形成的,具體Flink job中有多少task和operator的並行度和鏈化的策略有關。
SubTask:因為Flink是分布式部署的,程序中的每個算子,在實際執行中被分隔為一個或者多個subtask,運算符子任務(subtask)的數量是該特定運算符的並行度。數據流在算子之間流動,就對應到SubTask之間的數據傳輸。Flink允許同一個job中來自不同task的subtask可以共享同一個slot。每個slot可以執行一個並行的pipeline。可以將pipeline看作是多個subtask的組成的。
三、核心概念
1、Time(時間語義)
Flink 中的 Time 分為三種:事件時間、達到時間與處理時間。
1)事件時間:是事件真實發生的時間。
2)達到時間:是系統接收到事件的時間,即服務端接收到事件的時間。
3)處理時間:是系統開始處理到達事件的時間。
在某些場景下,處理時間等於達到時間。因為處理時間沒有亂序的問題,所以服務端做基於處理時間的計算是比較簡單的,無遲到與亂序數據。
Flink 中只需要通過 env 環境變量即可設置Time:
//創建環境上下文 val env = StreamExecutionEnvironment.getExecutionEnvironment // 設置在當前程序中使用 ProcessingTime env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2、Window(窗口)
窗口本質就是將無限數據集沿着時間(或者數量)的邊界切分成有限數據集。
1)Time Window:基於時間的,分為Tumbling Window(無數據重疊)和Sliding Window(有數據重疊) 。
2)Count Window:基於數量的,分為Tumbling Window(無數據重疊)和Sliding Window(有數據重疊)。
3)Session Window:基於會話的,一個session window關閉通常是由於一段時間沒有收到元素。
4)Global Window:全局窗口。
在實際操作中,window又分為兩大類型的窗口:Keyed Window 和 Non-keyed Window,兩種類型的窗口操作API有細微的差別。
3、Trigger
1)自定義觸發器
觸發器決定了窗口何時會被觸發計算,Flink 中開發人員需要在 window 類型的操作之后才能調用 trigger 方法傳入觸發器定義。Flink 中的觸發器定義需要繼承並實現 Trigger 接口,該接口有以下方法:
- onElement(): 每個被添加到窗口中的元素都會被調用
- onEventTime(): 當事件時間定時器觸發時會被調用,比如watermark到達
- onProcessingTime(): 當處理時間定時器觸發時會被調用,比如時間周期觸發
- onMerge(): 當兩個窗口合並時兩個窗口的觸發器狀態將會被調動並合並
- clear(): 執行需要清除相關窗口的事件
以上方法會返回決定如何觸發執行的 TriggerResult:
- CONTINUE: 什么都不做
- FIRE: 觸發計算
- PURGE: 清除窗口中的數據
- FIRE_AND_PURGE: 觸發計算后清除窗口中的數據
2)預定義觸發器
如果開發人員未指定觸發器,則 Flink 會自動根據場景使用默認的預定義好的觸發器。在基於事件時間的窗口中使用 EventTimeTrigger,該觸發器會在watermark通過窗口邊界后立即觸發(即watermark出現關閉改窗口時)。在全局窗口(GlobalWindow)中使用 NeverTrigger,該觸發器永遠不會觸發,所以在使用全局窗口時用戶需要自定義觸發器。
4、State
Managed State 是由flink runtime管理來管理的,自動存儲、自動恢復,在內存管理上有優化機制。且Managed State 支持常見的多種數據結構,如value、list、map等,在大多數業務場景中都有適用之處。總體來說是對開發人員來說是比較友好的,因此 Managed State 是 Flink 中最常用的狀態。Managed State 又分為 Keyed State 和 Operator State 兩種。
Raw State 由用戶自己管理,需要序列化,只能使用字節數組的數據結構。Raw State 的使用和維度都比 Managed State 要復雜,建議在自定義的Operator場景中酌情使用。
5、狀態存儲
Flink中狀態的實現有三種:MemoryState、FsState、RocksDBState。三種狀態存儲方式與使用場景各不相同,詳細介紹如下:
1)MemoryStateBackend
構造函數:MemoryStateBackend(int maxStateSize, boolean asyncSnapshot)
存儲方式:State存儲於各個 TaskManager內存中,Checkpoint存儲於 JobManager內存
容量限制:單個State最大5M、maxStateSize<=akka.framesize(10M)、總大小不超過JobManager內存
使用場景:無狀態或者JobManager掛掉不影響的測試環境等,不建議在生產環境使用
2)FsStateBackend
構造函數:FsStateBackend(URI checkpointUri, boolean asyncSnapshot)
存儲方式:State存儲於 TaskManager內存,Checkpoint存儲於 外部文件系統(本次磁盤 or HDFS)
容量限制:State總量不超過TaskManager內存、Checkpoint總大小不超過外部存儲空間
使用場景:常規使用狀態的作業,分鍾級的窗口聚合等,可在生產環境使用
3)RocksDBStateBackend
構造函數:RocksDBStateBackend(URI checkpointUri, boolean enableincrementCheckpoint)
存儲方式:State存儲於 TaskManager上的kv數據庫(內存+磁盤),Checkpoint存儲於 外部文件系統(本次磁盤 or HDFS)
容量限制:State總量不超過TaskManager內存+磁盤、單key最大2g、Checkpoint總大小不超過外部存儲空間
使用場景:超大狀態的作業,天級的窗口聚合等,對讀寫性能要求不高的場景,可在生產環境使用
根據業務場景需要用戶選擇最合適的 StateBackend ,代碼中只需在相應的 env 環境中設置即可:
// flink 上下文環境變量 val env = StreamExecutionEnvironment.getExecutionEnvironment // 設置狀態后端為 FsStateBackend,數據存儲到 hdfs /tmp/flink/checkpoint/test 中 env.setStateBackend(new FsStateBackend("hdfs://ns1/tmp/flink/checkpoint/test", false))
6、Checkpoint
Checkpoint 是分布式全域一致的,數據會被寫入hdfs等共享存儲中。且其產生是異步的,在不中斷、不影響運算的前提下產生。
用戶只需在相應的 env 環境中設置即可:
// 1000毫秒進行一次 Checkpoint 操作 env.enableCheckpointing(1000) // 模式為准確一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 兩次 Checkpoint 之間最少間隔 500毫秒 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // Checkpoint 過程超時時間為 60000毫秒,即1分鍾視為超時失敗 env.getCheckpointConfig.setCheckpointTimeout(60000) // 同一時間只允許1個Checkpoint的操作在執行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
Asynchronous Barrier Snapshots(ABS)
異步屏障快照算法,這個算法基本上是Chandy-Lamport算法的變體,針對DAG(有向無環圖)的ABS算法執行流程如下所示:
- Barrier周期性的被注入到所有的Source中,Source節點看到Barrier后,會立即記錄自己的狀態,然后將Barrier發送到Transformation Operator。
- 當Transformation Operator從某個input channel收到Barrier后,它會立刻Block住這條通道,直到所有的input channel都收到Barrier,這個等待的過程就叫做屏障對齊(barrier alignment),此時該Operator就會記錄自身狀態,並向自己的所有output channel廣播Barrier。
- Sink接受Barrier的操作流程與Transformation Oper一樣。當所有的Barrier都到達Sink之后,並且所有的Sink也完成了Checkpoint,這一輪Snapshot就完成了。
下面這個圖展示了一個ABS算法的執行過程:
Exactly-Once vs At-Least-Once
上面講到的屏障對齊過程是Flink exactly-once語義的基礎,因為屏障對齊能夠保證多輸入流的算子正常處理不同checkpoint區間的數據,避免它們發生交叉,即不會有數據被處理兩次。
但是對齊過程需要時間,有一些對延遲特別敏感的應用可能對准確性的要求沒有那么高。所以Flink也允許在StreamExecutionEnvironment.enableCheckpointing()方法里指定At-Least-Once語義,會取消屏障對齊,即算子收到第一個輸入的屏障之后不會阻塞,而是觸發快照。這樣一來,部分屬於檢查點n + 1的數據也會包括進檢查點n的數據里, 當恢復時,這部分交叉的數據就會被重復處理。
7、Watermark
Flink 程序並 不能自動提取數據源中哪個字段/標識為數據的事件時間,從而也就無法自己定義 Watermark 。
開發人員需要通過 Flink 提供的 API 來 提取和定義 Timestamp/Watermark,可以在 數據源或者數據流中 定義。
1)自定義數據源設置 Timestamp/Watermark
自定義的數據源類需要繼承並實現 SourceFunction[T] 接口,其中 run 方法是定義數據生產的地方:
//自定義的數據源為自定義類型MyType class MySource extends SourceFunction[MyType]{ //重寫run方法,定義數據生產的邏輯 override def run(ctx: SourceContext[MyType]): Unit = { while (/* condition */) { val next: MyType = getNext() //設置timestamp從MyType的哪個字段獲取(eventTimestamp) ctx.collectWithTimestamp(next, next.eventTimestamp) if (next.hasWatermarkTime) { //設置watermark從MyType的那個方法獲取(getWatermarkTime) ctx.emitWatermark(new Watermark(next.getWatermarkTime)) } } } }
2)在數據流中設置 Timestamp/Watermark
在數據流中,可以設置 stream 的 Timestamp Assigner ,該 Assigner 將會接收一個 stream,並生產一個帶 Timestamp和Watermark 的新 stream。
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
8、廣播狀態(Broadcast State)
和 Spark 中的廣播變量一樣,Flink 也支持在各個節點中各存一份小數據集,所在的計算節點實例可在本地內存中直接讀取被廣播的數據,可以避免Shuffle提高並行效率。
廣播狀態(Broadcast State)的引入是為了支持一些來自一個流的數據需要廣播到所有下游任務的情況,它存儲在本地,用於處理其他流上的所有傳入元素。
// key the shapes by color KeyedStream<Item, Color> colorPartitionedStream = shapeStream.keyBy(new KeySelector<Shape, Color>(){...}); // a map descriptor to store the name of the rule (string) and the rule itself. MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {})); // broadcast the rules and create the broadcast state BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor); DataStream<Match> output = colorPartitionedStream.connect(ruleBroadcastStream).process(new KeyedBroadcastProcessFunction<Color, Item, Rule, String>(){...});
9、Operator Chain
Flink作業中,可以指定相關的chain將相關性非常強的轉換操作(operator)綁定在一起,使得上下游的Task在同一個Pipeline中執行,避免因為數據在網絡或者線程之間傳輸導致的開銷。
一般情況下Flink在Map類型的操作中默認開啟 Operator Chain 以提高整體性能,開發人員也可以根據需要創建或者禁止 Operator Chain 對任務進行細粒度的鏈條控制。
//創建 chain dataStream.filter(...).map(...).startNewChain().map(...) //禁止 chain dataStream.map(...).disableChaining()
創建的鏈條只對當前的操作符和之后的操作符有效,不不影響其他操作,如上代碼只針對兩個map操作進行鏈條綁定,對前面的filter操作無效,如果需要可以在filter和map之間使用 startNewChain方法即可。
10、Side Output
除了從DataStream操作的結果中獲取主數據流之外,Flink還可以產生任意數量額外的側輸出(Side Output)結果流。側輸出結果流的數據類型不需要與主數據流的類型一致,不同側輸出流的類型也可以不同。當要拆分數據流時(通常必須復制流),從每個流過濾出不想擁有的數據時Side Output將非常有用。
DataStream<Integer> input = ...; final OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; SingleOutputStreamOperator<Integer> mainDataStream = input .process(new ProcessFunction<Integer, Integer>() { @Override public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception { // 將數據發送到常規輸出中 out.collect(value); // 將數據發送到側輸出中 ctx.output(outputTag, "sideout-" + String.valueOf(value)); } });
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
參考:
https://zhuanlan.zhihu.com/p/93507000
https://www.jianshu.com/p/3093f6d92750
https://www.jianshu.com/p/8d6569361999
https://ci.apache.org/projects/flink/flink-docs-release-1.9/