1 Flink介紹
Flink 是一個面向分布式數據流處理和批量數據處理的開源計算平台。和 Spark 類似,兩者都希望提供一個統一功能的計算平台給用戶,都在嘗試建立一個統一的平台以運行批量,流式,交互式,圖處理,機器學習等應用。

1.1部署模式
Flink 集群的部署,本身不依賴 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存儲數據,就需要選擇對應的 Hadoop 版本。

-
Standalone
-
YARN
-
Mesos
-
Cloud
1.2整合支持
-
Flink支持消費kafka的數據;
-
支持HBase,Cassandra, ElasticSearch
-
支持與Alluxio的整合
-
支持RabbitMQ
1.3 API支持
-
對Streaming數據類應用,提供DataStream API
-
對批處理類應用,提供DataSet API(支持Java/Scala)
-
對流處理和批處理,都支持Table API
-
支持雙流join
1.4 Libraries支持
-
支持機器學習(FlinkML)
-
支持圖分析(Gelly)
-
支持關系數據處理(Table)
-
支持復雜事件處理(CEP)
1.5 Flink on YARN

Flink提供兩種Yarn的部署方式Yarn Setup:
Start a long-running Flink cluster on YARN
-
通過命令yarn-session.sh來實現,本質上是在yarn集群上啟動一個flink集群。
-
由yarn預先給flink集群分配若干個container給flink使用,在yarn的界面上只能看到一個Flink session with X TaskManagers的任務。
-
只有一個Flink界面,可以從Yarn的ApplicationMaster鏈接進入。
-
使用bin/flink run命令發布任務時,本質上是使用Flink自帶的調度,與普通的在Flink集群上發布任務並沒有不同。不同的任務可能在一個TaskManager中,也即是在一個JVM進程中,無法實現資源隔離。
Run a Flink job on YARN
-
通過命令bin/flink run -m yarn-cluster實現,一次只發布一個任務,本質上給每個flink任務啟動了一個集群。
-
yarn不事先給flink分配container,而是在任務發布時,啟動JobManager(對應Yarn的AM)和TaskManager,如果一個任務指定了n個TaksManager(-yn n),則會啟動n+1個Container,其中一個是JobManager。
-
發布m個應用,則有m個Flink界面,對比方式一,同樣發布m個應用,會多出m-1個JobManager的。
-
發布任務時,實際上是使用了Yarn的調用。不同的任務不可能在一個Container(JVM)中,也即是實現了資源隔離。
以第一種啟動方式為例,其主要啟動流程如下:
首先我們通過下面的命令行啟動flink on yarn的集群
這里將產生總共五個進程:
-
1個FlinkYarnSessionCli ---> Yarn Client
-
1個YarnApplicationMasterRunner ---> AM + JobManager
-
3個YarnTaskManager --> TaskManager
即一個客戶端+4個container,1個container啟動AM,3個container啟動TaskManager。
yarn-session.sh支持的參數:

一個Flink環境在YARN上的啟動流程:
-
FlinkYarnSessionCli 啟動的過程中首先會檢查Yarn上有沒有足夠的資源去啟動所需要的container,如果有,則上傳一些flink的jar和配置文件到HDFS,這里主要是啟動AM進程和TaskManager進程的相關依賴jar包和配置文件。
-
接着yarn client會首先向RM申請一個container來啟動 ApplicationMaster(YarnApplicationMasterRunner進程),然后RM會通知其中一個NM啟動這個container,被分配到啟動AM的NM會首先去HDFS上下載第一步上傳的jar包和配置文件到本地,接着啟動AM;在這個過程中會啟動JobManager,因為JobManager和AM在同一進程里面,它會把JobManager的地址重新作為一個文件上傳到HDFS上去,TaskManager在啟動的過程中也會去下載這個文件獲取JobManager的地址,然后與其進行通信;AM還負責Flink的web 服務,Flink里面用到的都是隨機端口,這樣就允許了用戶能夠啟動多個yarn session。
從這個啟動過程中可以看出,在每次啟動Flink on YARN之前,需要指定啟動多少個TaskManager,每個taskManager分配的資源是固定的,也就是說這個資源量從taskManager出生到死亡,資源情況一直是這么多,不管它所承載的作業需求資源情況,這樣在作業需要更多資源的時候,沒有更多的資源分配給對應的作業,相反,當一個作業僅需要很少的資源就能夠運行的時候,仍然分配的是那些固定的資源,造成資源的浪費。
用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。當一個Flink程序被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。
下面是一個由Flink程序映射為Streaming Dataflow的示意圖,如下所示:

FlinkKafkaConsumer是一個Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink是一個Sink Operator。
1.6 CEP(Complex event processing)
Flink CEP 是一套極具通用性、易於使用的實時流式事件處理方案。作為 Flink 的原生組件,省去了第三方庫與 Flink 配合使用時可能會導致的各種問題。但其功能現階段看來還比較基礎,不能表達復雜的業務場景,同時它不能夠做到動態更新。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// (Event, timestamp)DataStream<Event> input = env.fromElements(Tuple2.of(new Event(1, "start", 1.0), 5L),Tuple2.of(new Event(2, "middle", 2.0), 1L),Tuple2.of(new Event(3, "end", 3.0), 3L),Tuple2.of(new Event(4, "end", 4.0), 10L),Tuple2.of(new Event(5, "middle", 6.0), 7L),Tuple2.of(new Event(6, "middle", 5.0), 7L),// last element for high final watermarkTuple2.of(new Event(7, "middle", 5.0), 100L)).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
具體的業務邏輯
Pattern<Event, ? extends Event> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.getName().equals("start");}}).followedByAny("middle").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.getName().equals("middle");}}).followedByAny("end").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.getName().equals("end");}});DataStream<String> result = CEP.pattern(input, pattern, comparator).select(new PatternSelectFunction<Event, String>() {@Overridepublic String select(Map<String, List<Event>> pattern) {StringBuilder builder = new StringBuilder();builder.append(pattern.get("start").get(0).getId()).append(",").append(pattern.get("middle").get(0).getId()).append(",").append(pattern.get("end").get(0).getId());return builder.toString();}});
從例子代碼中可以看到,patterns需要用java代碼寫,需要編譯,很冗長很麻煩,沒法動態配置;需要可配置,或提供一種DSL;再者,對於一個流同時只能設置一個pattern,比如對於不同的用戶實例想配置不同的pattern,就沒法支持;需要支持按key設置pattern。
1.7 Flink目前存在的一些問題
在實時計算中有這么一個普遍的邏輯:業務邏輯中以一個流式數據源與幾個相關的配置表進行join操作,而配置表並不是一成不變的,會定期的進行數據更新,可以看成一個緩慢變化的流。這種join環境存在以下幾個尚未解決的問題:
1.對元數據庫的讀壓力;如果分析程序有1000並發,是否需要讀1000次;
2.讀維表數據不能拖慢主數據流的throughput,每秒千萬條數據量;
3.動態維表更新問題和一致性問題;元數據是不斷變化的,如何把更新同步到各個並發上;
4.冷啟動問題,如何保證主數據流流過的時候,維表數據已經ready,否則會出現數據無法處理;
5.超大維表數據會導致流量抖動和頻繁gc,比如幾十萬條的實例數據,可能上百兆。
在Flink社區,對該問題也進行了關注
https://issues.apache.org/jira/browse/FLINK-6131

https://issues.apache.org/jira/browse/FLINK-2320

https://issues.apache.org/jira/browse/FLINK-3514

當然在生產環境上也有相應的解決方案:
使用redis來做cache,只用一個job,負責從元數據庫同步數據到redis,這樣就解決1,3
然后所有的並發都從redis直接查詢需要的元數據,這樣就解決4;對於2,在並發上做local cache,只有第一次需要真正查詢redis,后續定期異步更新就好,不會影響到主數據流;對於5,因為現在不需要一下全量的讀取維表數據到內存,用到的時候才去讀,分攤了負載,也可以得到緩解。
這個方案也有一定的弊端,增加了架構的外部依賴,要額外保障外部redis和同步job的穩定性。
2 Flink vs Spark
2.1 框架
Spark把streaming看成是更快的批處理,而Flink把批處理看成streaming的special case。這里面的思路決定了各自的方向,其中兩者的差異點有如下這些:
實時 vs 近實時的角度:Flink提供了基於每個事件的流式處理機制,所以可以被認為是一個真正的流式計;而Spark,不是基於事件的粒度,而是用小批量來模擬流式,也就是多個事件的集合。所以Spark被認為是近實時的處理系統。
Spark streaming 是更快的批處理,而Flink Batch是有限數據的流式計算。
2.1.1 流式計算和批處理API
Spark對於流式計算和批處理,都是基於RDD的抽象。這樣很方便將兩種計算方式合並表示。而Flink將流式計算和批處理分別抽象出來DataStream和DataSet兩種API,這一點上Flink相對於spark來說是一個糟糕的設計。
2.2 社區活躍度對比


Spark 2.3 繼續向更快、更易用、更智能的目標邁進,引入了低延遲的持續處理能力和流到流的連接,讓 Structured Streaming 達到了一個里程碑式的高度。
3 提交一個Flink作業
啟動flink服務
./bin/yarn-session.sh -n 4 -jm 2048 -tm 2048

在yarn監控界面上可以看到該作業的執行狀態

並驗證Wordcount例子
./bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048./examples/batch/WordCount.jar
在client端可以看到log:


--end--
