花了四小時,看完Flink的內容,基本了解了原理。 挖個坑,待總結后填一下。
2019-06-02 01:22:57等歐冠決賽中,填坑。
一、概述
storm最大的特點是快,它的實時性非常好(毫秒級延遲)。為了低延遲它犧牲了高吞吐,並且不能保證exactly once語義。
在低延遲和高吞吐的流處理中,維持良好的容錯是非常困難的,但為了得到有保障的准確狀態,人們想到一種替代方法:將連續時間中的流數據分割成一系列微小的批量作業(微批次處理)。如果分割得足夠小,計算幾乎可以實現真正的流處理。因為存在延遲,所以不可能做到完全實時,但是每個簡單的應用程序都可以實現僅有幾秒甚至幾亞秒的延遲。這就是Spark Streaming所使用的方法。
為了實現高吞吐和exactly once語義,storm推出了storm trident,也是使用了微批次處理的方法。
微批次處理缺點:
-
數據只能按固定時間分割,沒有辦法根據實際數據情況,進行不同批次或每個批次不同大小的分割
-
滿足不了對數據實時性要求非常高的數據
初識flink
flink與storm,spark streaming的比較
Apache flink主頁在其頂部展示了該項目的理念:“Apache Flink是為分布式,高性能,隨時可用以及准確的流處理應用程序打造的開源流處理框架”
流處理與批處理:
批處理的特點是有界、持久(數據已經落地)、大量,批處理非常適合需要訪問全套記錄才能完成的計算工作,一般用於離線統計。典型的是Hadoop,它只能用於批處理。
流處理的特點是無界、實時,流處理方式無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作,一般用於實時統計。典型的是storm,它只能進行流處理。
有沒有即能實現批處理,也能實現流處理?
spark即能進行流處理,也能實現批處理,但它並不是在同一架構體系下,spark的批處理是通過spark core和spark sql實現,流處理是通過spark streaming實現。
flink什么特點呢?不管是批處理還是流處理,它能夠同時進行處理,因為它底層是不區分流批的。flink將批處理(即處理有限的靜態數據)視作一種特殊的流處理。
二、flink基本架構
JobManager與TaskManager
如果粗化一點看的話,flink就是由2部分組成,即JobManager和TaskManager,是兩個JVM進程。
JobManager:也稱為master(對應spark里的driver),用於協調分布式執行,它們用來調度task,協調檢查點,協調失敗回復等。flink運行時至少存在一個master處理器,如果配置高可用模式則會存在多個master,它們其中一個是leader,而其它都是standby.
TaskManager:也稱為worker(對應spark里面的executor),用於執行一個dataflow的task,數據緩沖和datastream的交換,flink運行時至少會存在一個worker處理器。
flink的編程模型
Stateful Stream Processing,是數據接入,計算,輸出都是自己來實現,是最靈活也是最麻煩的編程接口。
DataStream/DataSet API是針對流和批處理的封裝API,絕大多數的編程是在這一層。
Table API,是將數據抽像成一張表,提供select, group_by等API接口供調用。
SQL是最高級的接口,支持直接寫SQL查詢數據。
三、flink運行架構
任務提交流程
當啟動新的Flink YARN會話時,客戶端首先檢查請求的資源(容器和內存)是否可用。之后,它將包含flink的jar和配置上傳到HDFS(步驟1)。
客戶端的下一步是請求(步驟2)YARN容器以啟動ApplicationMaster(步驟3)。由於客戶端將配置和jar文件注冊為容器的資源,因此在該特定機器上運行的YARN的NodeManager將負責准備容器(例如,下載文件)。一旦完成,ApplicationMaster(AM)就會啟動。
該JobManager和AM在同一容器中運行。成功啟動后,AM就很容易知道JobManager的地址(它自己的主機)。它為TaskManagers生成一個新的Flink配置文件(以便它們可以連接到JobManager)。該文件也被上傳到HDFS。此外,AM容器還提供Flink的Web界面。YARN代碼分配的所有端口都是臨時端口。這允許用戶並行執行多個Flink YARN會話。
之后,AM開始為Flink的TaskManagers分配容器,它將從HDFS下載jar文件和修改后的配置。完成這些步驟后,Flink即會設置並准備接受作業。
這個提交Yarn Session的整個過程,Yarn Session提交完成后,JobManager和TaskManager就啟動完畢,等待用戶任務的提交(jar包)
任務調度原理
Task Slot只平均分配內存,不分CPU!Slot指的是TaskManager能夠並行執行的task最大數。
客戶端將程序抽像成Datafow Graph,並能過Actor System通信,將程序提交到JobManager,JobManager根據Dataflow Graph(類似Spark中的DAG),兩個相臨任務間的並行度變化,來划分任務,並將任務提交到TaskManager的Task Slot去執行。
TaskManager是一個獨立的JVM進程,TaskManager和Slot可以看作worker pool模型,Slot是一個Worker,如果TaskManager里有Slot,才能被分配任務。
一個Slot里的Task,可能包含多個算子。Task按distributed進行划分,也就算子是否產生shuffle(spark里shuffle==flink的distributed).
如果有3個TaskManager,第個TaskManager中有3個Slot,那么最高支持的並行度是9,parallelism.default=9.
程序與數據流
flink程序的基礎構架模塊是流(streams)和轉換(transformation)
fink通過source將流接進來,通過transformation算子對流進行轉換,再通過sink將數據輸出,這是一個flink程序的完整過程。
並行數據流
flink程序的執行具有並行、分布式的特性。在執行過程中,一個stream包含一個或多個stream partition,而每個operator包含一個或多個operator subtask,這些operator subtask在不同的線程、不同的物理機或不同的容器中彼此互不依賴的執行。
一個特定的operator的subtask的個數被稱之為其parallelism(並行度)。一個程序中,不同的operator可能具有不同的並行度。
stream在operator之間的傳輸數據的形式可以是one-to-one(forwarding)的模式,也可以是redistributing的模式,具體哪種形式取決於operator的種類。
如上圖map是one-to-one模式,而keyBy,window,apply是redistributing模式
one-to-one的算子,會被組合在一起成為operator-chain,一個operator-chain被分成一個task去執行。
四、DataStream API
flink程序結構
每個flink程序都包含以下的若干個流程:
-
獲取一個執行環境:execution enviroment
-
加載/創建初始數據:source
-
指定轉換這些數據:transformation
-
指定放置計算結果的位置:sink
-
觸發程序執行
Transformation
-
Map操作
遍歷一個集合的所有元素,並對每個元素做轉換
輸入一個參數,產生一個輸出。
steam.map(item => item * 2)
-
FlatMap操作
輸入一個參數,產生0個、1個或多個輸出。
stream.flatMap(item => item.split(“ ”))
-
Filter操作
結算每個元素的布爾值,並返回布爾值為true的元素。
stream.filter(item => item == 1)
-
Connect操作
DataStream1, DataStream2 -> ConnectedStreams
在ConnectedStream的內部,stream還是分開的,也就是說,想對ConnectedStream執行一個Map/Filter等操作,要傳入2個函數。第1個對DataStream1操作,第2個對DataStream2操作。
streamConnect = stream1.connect(stream2)
streamConnect.map(item => item * 2, item => (item, 1L))
-
coMap, coFlatMap操作
ConnectedStreams -> DataStream
stream = streamConnect.map(item => item * 2, item => (item, 1L))
輸入是一個ConnectedStream,輸出是一個普通的DataStream
-
Split + Select操作
DataStream -> SplitStream
val streamSplit = stream.split(word => (“haddoop”.equals(word)) match {
case true => List(“hadoop”)
case false => List(“other”)
}
)
上面split將流划分成2個流
val streamSelect001 = streamSplit.select(“hadoop”)
select 將指定的一個流取出來。
-
union操作
對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新的DataStream。
-
KeyBy
DataStream -> KeyedStream,輸入必須是Tuple類型,邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同Key的元素,在內部以Hash的形式實現。
val env = StreamExecutionEnvironment.getExecutionEnvironment
var stream = env.readTextFile(“test.txt”)
val streamMap = stream.flaMap(item => item.split(“ “)).map(item => (item, 1L))
val streamKeyBy = streamMap.keyBy(0) //keyBy可以根據Tuple中的第一個元素,也可以根據第二個元素,進行partition。0代表第一個元素。
-
Reduce操作
KeyedStream -> DataStream:一個分組數據流的聚合操作,合並當前元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。
val streamReduce = streamKeyBy.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
-
Fold操作
KeyedStream -> DataStream:一個有初始值的分組數據流的滾動折疊操作。給窗口賦一個fold功能的函數,並返回一個fold后結果
-
Aggregation操作
五、Time與Window
時間
flink中有3個時間,
EventTime:事件生成的時間
IngestionTime:事件進入flink的時間
WindowProcessingTime:事件被處理的系統時間(默認使用)
窗口
不能直接對無界的流進行聚合,要先將流划分為window,再對window進行聚合。
window分為兩類:
CountWindow:按照指定的數據條數,生成一個window,與時間無關
TimeWindow:按照時間生成window
對於TimeWindow,可以根據窗口實現原理的不同分成三類:
滾動窗口(Tumbing Window)—— 沒有重疊
滑動窗口(Sliding Window)—— 重疊,有窗口長度和滑動步長兩個屬性
會話窗口(Sessionn Window)—— 如果相臨的兩條數據,間隔時間超過會話窗口時間大小,則前面的數據生成一個窗口。
每滿足滑動步長,會針對window執行一次計算
val streanWindow = streamKeyBy.timeWindow(Time.Seconds(10), Time.Seconds(2)).reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
六、EventTime和Window
引入EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
WaterMark
如果用EventTime來決定窗口的運行,一旦出現亂序,我們不能明確數據是否已經全部到位,但又不能無限期的等下去,此時必須有個機制來保證一個特定的時間后,必須觸發window的計了,這個特別的機制就是watermark.
WaterMark是一種衡量EventTime進展的機制,它是數據本身的一個隱藏屬性,數據本身攜帶着對應的WaterMark。
WaterMark是用於處理亂序事件的,而正確的處理亂序事件,通常用WaterMark機制結合window來實現。
數據流中的WaterMark用於表示eventTime小於watermark的數據,都已經到達了,因此,window的執行也是由WaterMark觸發的。
WaterMark可以理解成一個延遲觸發機制,我們可以設置WaterMark的延時時長為t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime小於maxEventTime-t的所有數據都已經到達,如果有窗口的停止時間等於maxEventTime-t,那么這個窗口被觸發。
當Flink接收到每一條數據時,都會計算產生一條watermark,watermark = 當前所有到達數據中的maxEventTime - 延遲時長,也就是說,watermark是由數據攜帶的,一量數據攜帶的watermark比當前未觸發的窗口的停止時間要晚,那么就會觸發相應的窗口的執行。由於watermark是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠不被觸發。
EventTime的窗口與Time里的窗口區別:
窗口大小設置為5s,Time窗口每5秒執行一次,不管有沒有數據。
EventTime每5s生成一個窗口,但不執行。當觸發條件滿足后,才會執行窗口。
附錄
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html