Flink核心是一個流式的數據流執行引擎,其針對數據流的分布式計算提供了數據分布、數據通信以及容錯機制等功能
Flink提供了諸多更高抽象層的API以便用戶編寫分布式任務:
DataSet API, 對靜態數據進行批處理操作,將靜態數據抽象成分布式的數據集,用戶可以方便地使用Flink提供的各種操作符對分布式數據集進行處理,支持Java、Scala和Python。 DataStream API,對數據流進行流處理操作,將流式的數據抽象成分布式的數據流,用戶可以方便地對分布式數據流進行各種操作,支持Java和Scala。
Table API,對結構化數據進行查詢操作,將結構化數據抽象成關系表,並通過類SQL的DSL對關系表進行各種查詢操作,支持Java和Scala。
對於一個流處理系統,其節點間數據傳輸的標准模型是:當一條數據被處理完成后,序列化到緩存中,然后立刻通過網絡傳輸到下一個節點,由下一個節點繼續處理
而對於一個批處理系統,其節點間數據傳輸的標准模型是:當一條數據被處理完成后,序列化到緩存中,並不會立刻通過網絡傳輸到下一個節點,當緩存寫滿,就持久化到本地硬盤上,當所有數據都被處理完成后,才開始將處理后的數據通過網絡傳輸到下一個節點。
這兩種數據傳輸模式是兩個極端,對應的是流處理系統對低延遲的要求和批處理系統對高吞吐量的要求
Flink的執行引擎同時支持了這兩種數據傳輸模型,Flink以固定的緩存塊為單位進行網絡數據傳輸,用戶可以通過緩存塊超時值指定緩存塊的傳輸時機,超時值為0,則是流處理系統的標准模型,此時可以獲得最低的處理延遲,緩存塊的超時值為無限大,則Flink的數據傳輸方式類似上文所提到批處理系統的標准模型。
緩存塊的超時閾值越小,則流處理數據的延遲越低,但吞吐量也會變低。根據超時閾值來靈活權衡系統延遲和吞吐量。Flink基於分布式快照與可部分重發的數據源實現了容錯。
用戶可自定義對整個Job進行快照的時間間隔,當任務失敗時,Flink會將整個Job恢復到最近一次快照,並從數據源重發快照之后的數據。
按照用戶自定義的快照間隔時間,flink會定時在數據源中插入快照標記的消息,快照消息和普通消息都在DAG中流動,但不會被用戶定義的邏輯所處理,每一個快照消息都將其所在的數據流分成2部分:本次快照數據和下次快照數據。當操作符處理到快照標記消息,對自己的狀態進行快照標記並緩存。操作符對自己的快照和狀態可以是異步,增量操作,並不阻塞消息處理。當所有的終點操作符都收到快照標記信息並對自己的狀態快照和存儲后,整個分布式快照就完成了。同時通知數據源釋放該快照標記消息之前的所有消息。若之后的節點崩潰等異常,就可以恢復分布式快照狀態。並從數據源重發該快照以后的消息。
flink基於分布式快照實現了一次性。
目前大部分流處理系統來說,時間窗口一般是根據Task所在節點的本地時鍾進行切分,
是可能無法滿足某些應用需求,比如:
消息本身帶有時間戳,用戶希望按照消息本身的時間特性進行分段處理。
由於不同節點的時鍾可能不同,以及消息在流經各個節點的延遲不同,在某個節點屬於同一個時間窗口處理的消息,流到下一個節點時可能被切分到不同的時間窗口中,從而產生不符合預期的結果
Flink支持3種類型的時間窗口:
1.Operator Time。根據Task所在節點的本地時鍾來切分的時間窗口
2.Event Time。消息自帶時間戳,根據消息的時間戳進行處理,確保時間戳在同一個時間窗口的所有消息一定會被正確處理。由於消息可能亂序流入Task,所以Task需要緩存當前時間窗口消息處理的狀態,直到確認屬於該時間窗口的所有消息都被處理,才可以釋放,如果亂序的消息延遲很高會影響分布式系統的吞吐量和延遲。
3.ingress Time。有時消息本身並不帶有時間戳信息,但用戶依然希望按照消息而不是節點時鍾划分時間窗口,例如避免上面提到的第二個問題,此時可以在消息源流入Flink流處理系統時自動生成增量的時間戳賦予消息,之后處理的流程與Event Time相同。Ingress Time可以看成是Event Time的一個特例,由於其在消息源處時間戳一定是有序的,所以在流處理系統中,相對於Event Time,其亂序的消息延遲不會很高,因此對Flink分布式系統的吞吐量和延遲的影響也會更小。
當操作符通過基於Event Time的時間窗口來處理數據時,它必須在確定所有屬於該時間窗口的消息全部流入此操作符后才能開始數據處理。但是由於消息可能是亂序的,所以操作符無法直接確認何時所有屬於該時間窗口的消息全部流入此操作符。WaterMark包含一個時間戳,Flink使用WaterMark標記所有小於該時間戳的消息都已流入
一個可能的優化措施是,對於聚合類的操作符,可以提前對部分消息進行聚合操作,當有屬於該時間窗口的新消息流入時,基於之前的部分聚合結果繼續計算,這樣的話,只需緩存中間計算結果即可,無需緩存該時間窗口的所有消息
flink基於watermark實現了基於時間戳的全局排序:
排序操作:排序操作符緩存所有流入的消息,當接收到watermark時,對時間戳小於該watermark的消息進行排序,並發送到下一個節點。在此排序操作符中釋放所有時間戳小於該watermark的消息,繼續緩存流入的消息。等待下一次watermark觸發下一次排序。
watermark保證了其之后不會出現時間戳比它小的消息,因此可以保證排序的正確性。請注意:排序操作符有多個節點,只能保證每個節點流出的消息有序,節點之間的消息不能有序,要實現全局有序,則只能有一個排序操作符節點。
Java對象的存儲密度相對偏低,例如[1],“abcd”這樣簡單的字符串在UTF-8編碼中需要4個字節存儲
采用了UTF-16編碼存儲字符串的Java則需要8個字節,同時Java對象還有header等其他額外信息,一個4字節字符串對象在Java中需要48字節的空間來存儲。對於大部分的大數據應用,內存都是稀缺資源,更有效率地內存存儲,意味着CPU數據訪問吞吐量更高,以及更少磁盤落地的存在。
垃圾回收也是Java應用的不定時炸彈,有時秒級甚至是分鍾級的垃圾回收極大影響了Java應用的性能和可用性。
通過JVM參數調優提高垃圾回收效率需要用戶對應用和分布式計算框架以及JVM的各參數有深入了解,而且有時候這也遠遠不夠:
為了解決以上提到的問題,高性能分布式計算框架通常需要以下技術:
Flink的處理策略:
定制的序列化工具,顯式內存管理的前提步驟就是序列化,用的序列化框架,如Java默認使用java.io.Serializable
制的序列化框架,如Hadoop的org.apache.hadoop.io.Writable需要用戶實現該接口並自定義類的序列化和反序列化方法。這種方式效率最高。
對於計算密集的數據結構和算法,直接操作序列化后的二進制數據,而不是將對象反序列化后再進行操作。
緩存友好的數據結構和算法。對於計算密集的數據結構和算法,直接操作序列化后的二進制數據,而不是將對象反序列化后再進行操作。同時,只將操作相關的數據連續存儲,可以最大化的利用L1/L2/L3緩存,減少Cache miss的概率,提升CPU計算的吞吐量。以排序為例,由於排序的主要操作是對Key進行對比,如果將所有排序數據的Key與Value分開並對Key連續存儲,那么訪問Key時的Cache命中率會大大提高。
分布式計算框架可以使用定制序列化工具的前提是要待處理數據流通常是同一類型,由於數據集對象的類型固定,從而可以只保存一份對象Schema信息,節省大量的存儲空間
對於固定大小的類型,也可通過固定的偏移位置存取。在需要訪問某個對象成員變量時,通過定制的序列化工具,並不需要反序列化整個Java對象,而是直接通過偏移量,從而只需要反序列化特定的對象成員變量。如果對象的成員變量較多時,能夠大大減少Java對象的創建開銷,以及內存數據的拷貝大小。Flink數據集都支持任意Java或是Scala類型,通過自動生成定制序列化工具,既保證了API接口對用戶友好(不用像Hadoop那樣數據類型需要繼承實現org.apache.hadoop.io.Writable接口),也達到了和Hadoop類似的序列化效率。
Flink對數據集的類型信息進行分析,然后自動生成定制的序列化工具類。Flink支持任意的Java或是Scala類型,通過Java Reflection框架分析基於Java的Flink程序UDF(User Define Function)的返回類型的類型信息,通過Scala Compiler分析基於Scala的Flink程序UDF的返回類型的類型信息。類型信息由TypeInformation類表示,這個類有諸多具體實現類
例如
1.BasicTypeInfo任意Java基本類型(裝包或未裝包)和String類型
2.BasicArrayTypeInfo任意Java基本類型數組(裝包或未裝包)和String數組
3.WritableTypeInfo任意Hadoop的Writable接口的實現類
4.TupleTypeInfo任意的Flink tuple類型(支持Tuple1 to Tuple25)Flink tuples是固定長度固定類型的Java Tuple實現
5.CaseClassTypeInfo任意的 Scala CaseClass(包括 Scala tuples)
6.PojoTypeInfo任意的POJO (Java or Scala),Java對象的所有成員變量,要么是public修飾符定義,要么有getter/setter方法
7.GenericTypeInfo任意無法匹配之前幾種類型的類。
前6種類型數據集幾乎覆蓋了絕大部分的Flink程序,針對前6種類型數據集,Flink皆可以自動生成對應的TypeSerializer定制序列化工具,非常有效率地對數據集進行序列化和反序列化
對於第7種類型,Flink使用Kryo進行序列化和反序列化
對於可被用作Key的類型,Flink還同時自動生成TypeComparator,用來輔助直接對序列化后的二進制數據直接進行compare、hash等操作
對於Tuple、CaseClass、Pojo等組合類型,Flink自動生成的TypeSerializer、TypeComparator同樣是組合的,並把其成員的序列化/反序列化代理給其成員對應的TypeSerializer、TypeComparator,如圖6所示:
此外如有需要,用戶可通過集成TypeInformation接口定制實現自己的序列化工具。
JDK8的G1算法改善了JVM垃圾回收的效率和可用范圍
通過JVM進行內存管理的話,OutOfMemoryError也是一個很難解決的問題
在JVM內存管理中,Java對象有潛在的碎片化存儲問題
Flink將內存分為3個部分,每個部分都有不同用途:
1.Network buffers: 一些以32KB Byte數組為單位的buffer,主要被網絡模塊用於數據的網絡傳輸,基於Netty的網絡傳輸
2.Memory Manager pool大量以32KB Byte數組為單位的內存池,所有的運行時算法(例如Sort/Shuffle/Join)都從這個內存池申請內存並將序列化后的數據存儲其中,結束后釋放回內存池。通常會配置為最大的一塊內存,
3. Remaining (Free) Heap主要留給UDF中用戶自己創建的Java對象,由JVM管理。時Flink也不鼓勵用戶在UDF中緩存很多數據。。 Remaining Heap的內存雖然由JVM管理,但是由於其主要用來存儲用戶處理的流式數據,生命周期非常短,速度很快的Minor GC就會全部回收掉,一般不會觸發Full GC
在Flink中,內存池由多個MemorySegment組成,每個MemorySegment代表一塊連續的內存,底層存儲是byte[],默認32KB大小。
MemorySegment提供了根據偏移量訪問數據的各種方法,如get/put int、long、float、double等,MemorySegment之間數據拷貝等方法和java.nio.ByteBuffer類似。
對於Flink的數據結構,通常包括多個向內存池申請的MemeorySegment,所有要存入的對象通過TypeSerializer序列化之后,將二進制數據存儲在MemorySegment中,在取出時通過TypeSerializer反序列化
數據結構通過MemorySegment提供的set/get方法訪問具體的二進制數據
Flink這種看起來比較復雜的內存管理方式帶來的好處主要有:
1.二進制的數據存儲大大提高了數據存儲密度,節省了存儲空間。所有的運行時數據結構和算法只能通過內存池申請內存,保證了其使用的內存大小是固定的,不會因為運行時數據結構和算法而發生OOM
Flink當前的內存管理在最底層是基於byte[],
flink排序算法的實現:
1.將待排序的數據經過序列化后存儲在兩個不同的MemorySegment集中,數據全部的序列化值存放於其中一個MemorySegment集中。數據序列化后的Key和指向第一個MemorySegment集中值的指針存放於第二個MemorySegment集中。對第二個MemorySegment集中的Key進行排序,如需交換Key位置,只需交換對應的Key+Pointer的位置,第一個MemorySegment集中的數據無需改變。 當比較兩個Key大小時,TypeComparator提供了直接基於二進制數據的對比方法,無需反序列化任何數據。排序完成后,訪問數據時,按照第二個MemorySegment集中Key的順序訪問,並通過Pointer值找到數據在第一個MemorySegment集中的位置,通過TypeSerializer反序列化成Java對象返回。
通過Key和Full data分離存儲的方式盡量將被操作的數據最小化,提高Cache命中的概率,從而提高CPU的吞吐量。 移動數據時,只需移動Key+Pointer,而無須移動數據本身,大大減少了內存拷貝的數據量。 TypeComparator直接基於二進制數據進行操作,節省了反序列化的時間。
DataSet API級別的執行計划優化器,原生的迭代操作符等,