Flink比Spark香在哪?


1 Flink介紹

Flink 是一個面向分布式數據流處理和批量數據處理的開源計算平台。和 Spark 類似,兩者都希望提供一個統一功能的計算平台給用戶,都在嘗試建立一個統一的平台以運行批量,流式,交互式,圖處理,機器學習等應用。

 

 

1.1部署模式

Flink 集群的部署,本身不依賴 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存儲數據,就需要選擇對應的 Hadoop 版本。

 

 

  • Standalone

  • YARN

  • Mesos

  • Cloud

1.2整合支持

  1. Flink支持消費kafka的數據;

  2. 支持HBase,Cassandra, ElasticSearch

  3. 支持與Alluxio的整合

  4. 支持RabbitMQ

1.3 API支持

  • 對Streaming數據類應用,提供DataStream API

  • 對批處理類應用,提供DataSet API(支持Java/Scala)

  • 對流處理和批處理,都支持Table API

  • 支持雙流join

1.4 Libraries支持

  • 支持機器學習(FlinkML)

  • 支持圖分析(Gelly)

  • 支持關系數據處理(Table)

  • 支持復雜事件處理(CEP)

1.5 Flink on YARN

 

 

Flink提供兩種Yarn的部署方式Yarn Setup:

Start a long-running Flink cluster on YARN

  • 通過命令yarn-session.sh來實現,本質上是在yarn集群上啟動一個flink集群。

  • 由yarn預先給flink集群分配若干個container給flink使用,在yarn的界面上只能看到一個Flink session with X TaskManagers的任務。

  • 只有一個Flink界面,可以從Yarn的ApplicationMaster鏈接進入。

  • 使用bin/flink run命令發布任務時,本質上是使用Flink自帶的調度,與普通的在Flink集群上發布任務並沒有不同。不同的任務可能在一個TaskManager中,也即是在一個JVM進程中,無法實現資源隔離。

 

Run a Flink job on YARN

  • 通過命令bin/flink run -m yarn-cluster實現,一次只發布一個任務,本質上給每個flink任務啟動了一個集群。

  • yarn不事先給flink分配container,而是在任務發布時,啟動JobManager(對應Yarn的AM)和TaskManager,如果一個任務指定了n個TaksManager(-yn n),則會啟動n+1個Container,其中一個是JobManager。

  • 發布m個應用,則有m個Flink界面,對比方式一,同樣發布m個應用,會多出m-1個JobManager的。

  • 發布任務時,實際上是使用了Yarn的調用。不同的任務不可能在一個Container(JVM)中,也即是實現了資源隔離。

 

以第一種啟動方式為例,其主要啟動流程如下:

首先我們通過下面的命令行啟動flink on yarn的集群
這里將產生總共五個進程:

 

  • 1個FlinkYarnSessionCli ---> Yarn Client

  • 1個YarnApplicationMasterRunner ---> AM + JobManager

  • 3個YarnTaskManager --> TaskManager

即一個客戶端+4個container,1個container啟動AM,3個container啟動TaskManager。

yarn-session.sh支持的參數:

 

 

一個Flink環境在YARN上的啟動流程:

  1. FlinkYarnSessionCli 啟動的過程中首先會檢查Yarn上有沒有足夠的資源去啟動所需要的container,如果有,則上傳一些flink的jar和配置文件到HDFS,這里主要是啟動AM進程和TaskManager進程的相關依賴jar包和配置文件。

  2. 接着yarn client會首先向RM申請一個container來啟動 ApplicationMaster(YarnApplicationMasterRunner進程),然后RM會通知其中一個NM啟動這個container,被分配到啟動AM的NM會首先去HDFS上下載第一步上傳的jar包和配置文件到本地,接着啟動AM;在這個過程中會啟動JobManager,因為JobManager和AM在同一進程里面,它會把JobManager的地址重新作為一個文件上傳到HDFS上去,TaskManager在啟動的過程中也會去下載這個文件獲取JobManager的地址,然后與其進行通信;AM還負責Flink的web 服務,Flink里面用到的都是隨機端口,這樣就允許了用戶能夠啟動多個yarn session。

 

      從這個啟動過程中可以看出,在每次啟動Flink on YARN之前,需要指定啟動多少個TaskManager,每個taskManager分配的資源是固定的,也就是說這個資源量從taskManager出生到死亡,資源情況一直是這么多,不管它所承載的作業需求資源情況,這樣在作業需要更多資源的時候,沒有更多的資源分配給對應的作業,相反,當一個作業僅需要很少的資源就能夠運行的時候,仍然分配的是那些固定的資源,造成資源的浪費。

用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。當一個Flink程序被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。

下面是一個由Flink程序映射為Streaming Dataflow的示意圖,如下所示:

 

 

FlinkKafkaConsumer是一個Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink是一個Sink Operator。

1.6 CEP(Complex event processing)

Flink CEP 是一套極具通用性、易於使用的實時流式事件處理方案。作為 Flink 的原生組件,省去了第三方庫與 Flink 配合使用時可能會導致的各種問題。但其功能現階段看來還比較基礎,不能表達復雜的業務場景,同時它不能夠做到動態更新。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  // (Event, timestamp)  DataStream<Event> input = env.fromElements(  Tuple2.of(new Event(1, "start", 1.0), 5L),  Tuple2.of(new Event(2, "middle", 2.0), 1L),  Tuple2.of(new Event(3, "end", 3.0), 3L),  Tuple2.of(new Event(4, "end", 4.0), 10L),  Tuple2.of(new Event(5, "middle", 6.0), 7L),  Tuple2.of(new Event(6, "middle", 5.0), 7L),  // last element for high final watermark  Tuple2.of(new Event(7, "middle", 5.0), 100L)         ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { 

具體的業務邏輯

Pattern<Event, ? extends Event> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  return value.getName().equals("start");  }  }).followedByAny("middle").where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  return value.getName().equals("middle");  }  }).followedByAny("end").where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  return value.getName().equals("end");  }  });  DataStream<String> result = CEP.pattern(input, pattern, comparator).select(  new PatternSelectFunction<Event, String>() {  @Override  public String select(Map<String, List<Event>> pattern) {  StringBuilder builder = new StringBuilder();  builder.append(pattern.get("start").get(0).getId()).append(",")  .append(pattern.get("middle").get(0).getId()).append(",")  .append(pattern.get("end").get(0).getId());  return builder.toString();  }  }  ); 

 

 

從例子代碼中可以看到,patterns需要用java代碼寫,需要編譯,很冗長很麻煩,沒法動態配置;需要可配置,或提供一種DSL;再者,對於一個流同時只能設置一個pattern,比如對於不同的用戶實例想配置不同的pattern,就沒法支持;需要支持按key設置pattern。

 

1.7 Flink目前存在的一些問題

在實時計算中有這么一個普遍的邏輯:業務邏輯中以一個流式數據源與幾個相關的配置表進行join操作,而配置表並不是一成不變的,會定期的進行數據更新,可以看成一個緩慢變化的流。這種join環境存在以下幾個尚未解決的問題:

1.對元數據庫的讀壓力;如果分析程序有1000並發,是否需要讀1000次;

2.讀維表數據不能拖慢主數據流的throughput,每秒千萬條數據量;

3.動態維表更新問題和一致性問題;元數據是不斷變化的,如何把更新同步到各個並發上;

4.冷啟動問題,如何保證主數據流流過的時候,維表數據已經ready,否則會出現數據無法處理;

5.超大維表數據會導致流量抖動和頻繁gc,比如幾十萬條的實例數據,可能上百兆。

在Flink社區,對該問題也進行了關注

https://issues.apache.org/jira/browse/FLINK-6131

 

 

https://issues.apache.org/jira/browse/FLINK-2320

 

 

https://issues.apache.org/jira/browse/FLINK-3514

 

 

當然在生產環境上也有相應的解決方案:

使用redis來做cache,只用一個job,負責從元數據庫同步數據到redis,這樣就解決1,3

然后所有的並發都從redis直接查詢需要的元數據,這樣就解決4;對於2,在並發上做local cache,只有第一次需要真正查詢redis,后續定期異步更新就好,不會影響到主數據流;對於5,因為現在不需要一下全量的讀取維表數據到內存,用到的時候才去讀,分攤了負載,也可以得到緩解。

這個方案也有一定的弊端,增加了架構的外部依賴,要額外保障外部redis和同步job的穩定性。

2 Flink vs Spark

2.1 框架

Spark把streaming看成是更快的批處理,而Flink把批處理看成streaming的special case。這里面的思路決定了各自的方向,其中兩者的差異點有如下這些:
實時 vs 近實時的角度:Flink提供了基於每個事件的流式處理機制,所以可以被認為是一個真正的流式計;而Spark,不是基於事件的粒度,而是用小批量來模擬流式,也就是多個事件的集合。所以Spark被認為是近實時的處理系統。
  Spark streaming 是更快的批處理,而Flink Batch是有限數據的流式計算。

2.1.1 流式計算和批處理API

Spark對於流式計算和批處理,都是基於RDD的抽象。這樣很方便將兩種計算方式合並表示。而Flink將流式計算和批處理分別抽象出來DataStream和DataSet兩種API,這一點上Flink相對於spark來說是一個糟糕的設計。

2.2 社區活躍度對比

 

 

 

 

Spark 2.3 繼續向更快、更易用、更智能的目標邁進,引入了低延遲的持續處理能力和流到流的連接,讓 Structured Streaming 達到了一個里程碑式的高度。

3 提交一個Flink作業

啟動flink服務

./bin/yarn-session.sh -n 4 -jm 2048 -tm 2048

 

 

在yarn監控界面上可以看到該作業的執行狀態

 

 

並驗證Wordcount例子

./bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048 ./examples/batch/WordCount.jar

 

在client端可以看到log:

 

 

 

 

--end--


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM