1.概述
在如今數據爆炸的時代,企業的數據量與日俱增,大數據產品層出不窮。今天給大家分享一款產品—— Apache Flink,目前,已是 Apache 頂級項目之一。那么,接下來,筆者為大家介紹Flink 的相關內容。
2.內容
2.1 What's Flink
Apache Flink 是一個面向分布式數據流處理和批量數據處理的開源計算平台,它能夠基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為他們它們所提供的SLA是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapReduce、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。 Flink在實現流處理和批處理時,與傳統的一些方案完全不同,它從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。
Flink 是一款新的大數據處理引擎,目標是統一不同來源的數據處理。這個目標看起來和 Spark 和類似。這兩套系統都在嘗試建立一個統一的平台可以運行批量,流式,交互式,圖處理,機器學習等應用。所以,Flink 和 Spark 的目標差異並不大,他們最主要的區別在於實現的細節。
下面附上 Flink 技術棧的一個總覽,如下圖所示:
2.2 Compare
了解 Flink 的作用和優缺點,需要有一個參照物,這里,筆者以它與 Spark 來對比闡述。從抽象層,內存管理,語言實現,以及 API 和 SQL 等方面來贅述。
2.2.1 Abstraction
接觸過 Spark 的同學,應該比較熟悉,在處理批處理任務,可以使用 RDD,而對於流處理,可以使用 Streaming,然其世紀還是 RDD,所以本質上還是 RDD 抽象而來。但是,在 Flink 中,批處理用 DataSet,對於流處理,有 DataStreams。思想類似,但卻有所不同:其一,DataSet 在運行時表現為 Runtime Plans,而在 Spark 中,RDD 在運行時表現為 Java Objects。在 Flink 中有 Logical Plan ,這和 Spark 中的 DataFrames 類似。因而,在 Flink 中,若是使用這類 API ,會被優先來優化(即:自動優化迭代)。如下圖所示:
然而,在 Spark 中,RDD 就沒有這塊的相關優化,如下圖所示::
另外,DataSet 和 DataStream 是相對獨立的 API,在 Spark 中,所有不同的 API,比如 Streaming,DataFrame 都是基於 RDD 抽象的。然而在 Flink 中,DataSet 和 DataStream 是同一個公用引擎之上的兩個獨立的抽象。所以,不能把這兩者的行為合並在一起操作,目前官方正在處理這種問題,詳見[FLINK-2320]
2.2.2 Memory
在之前的版本(1.5以前),Spark 延用 Java 的內存管理來做數據緩存,這樣很容易導致 OOM 或者 GC。之后,Spark 開始轉向另外更加友好和精准的控制內存,即:Tungsten 項目。然而,對於 Flink 來說,從一開始就堅持使用自己控制內存。Flink 除把數據存在自己管理的內存之外,還直接操作二進制數據。在 Spark 1.5之后的版本開始,所有的 DataFrame 操作都是直接作用於 Tungsten 的二進制數據上。
PS:Tungsten 項目將是 Spark 自誕生以來內核級別的最大改動,以大幅度提升 Spark 應用程序的內存和 CPU 利用率為目標,旨在最大程度上利用硬件性能。該項目包括了三個方面的改進:
- 內存管理和二進制處理:更加明確的管理內存,消除 JVM 對象模型和垃圾回收開銷。
- 緩存友好計算:使用算法和數據結構來實現內存分級結構。
- 代碼生成:使用代碼生成來利用新型編譯器和 CPU。
2.2.3 Program
Spark 使用 Scala 來實現的,它提供了 Java,Python 以及 R 語言的編程接口。而對於 Flink 來說,它是使用 Java 實現的,提供 Scala 編程 API。從編程語言的角度來看,Spark 略顯豐富一些。
2.2.4 API
Spark 和 Flink 兩者都傾向於使用 Scala 來實現對應的業務。對比兩者的 WordCount 示例,很類似。如下所示,分別為 RDD 和 DataSet API 的示例代碼:
- RDD
// Spark WordCount object WordCount { def main(args: Array[String]) { val env = new SparkContext("local","WordCount") val data = List("hi","spark cluster","hi","spark") val dataSet = env.parallelize(data) val words = dataSet.flatMap(value => value.split("\\s+")) val mappedWords = words.map(value => (value,1)) val sum = mappedWords.reduceByKey(_+_) println(sum.collect()) } }
- DataSet
// Flink WordCount object WordCount { def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val data = List("hello","flink cluster","hello") val dataSet = env.fromCollection(data) val words = dataSet.flatMap(value => value.split("\\s+")) val mappedWords = words.map(value => (value,1)) val grouped = mappedWords.groupBy(0) val sum = grouped.sum(1) println(sum.collect()) } }
對於 Streaming,Spark 把它看成更快的批處理,而 Flink 把批處理看成 Streaming 的特殊例子,差異如下:其一,在實時計算問題上,Flink 提供了基於每個事件的流式處理機制,所以它可以被認為是一個真正意義上的流式計算,類似於 Storm 的計算模型。而對於 Spark 來說,不是基於事件粒度的,而是用小批量來模擬流式,也就是多個事件的集合。所以,Spark 被認為是一個接近實時的處理系統。雖然,大部分應用實時是可以接受的,但對於很多應用需要基於事件級別的流式計算。因而,會選擇 Storm 而不是 Spark Streaming,現在,Flink 也許是一個不錯的選擇。
2.2.5 SQL
目前,Spark SQL 是其組件中較為活躍的一部分,它提供了類似於 Hive SQL 來查詢結構化數據,API 依然很成熟。對於 Flink 來說,截至到目前 1.0 版本,只支持 Flink Table API,官方在 Flink 1.1 版本中會添加 SQL 的接口支持。[Flink 1.1 SQL 詳情計划]
3.Features
Flink 包含一下特性:
- 高吞吐 & 低延時
- 支持 Event Time & 亂序事件
- 狀態計算的 Exactly-Once 語義
- 高度靈活的流式窗口
- 帶反壓的連續流模型
- 容錯性
- 流處理和批處理共用一個引擎
- 內存管理
- 迭代 & 增量迭代
- 程序調優
- 流處理應用
- 批處理應用
- 類庫生態
- 廣泛集成
3.1 高吞吐 & 低延時
Flink 的流處理引擎只需要很少配置就能實現高吞吐率和低延遲。下圖展示了一個分布式計數的任務的性能,包括了流數據 shuffle 過程。
3.2 支持 Event Time & 亂序事件
Flink 支持了流處理和 Event Time 語義的窗口機制。Event time 使得計算亂序到達的事件或可能延遲到達的事件更加簡單。如下圖所示:
3.3 狀態計算的 exactly-once 語義
流程序可以在計算過程中維護自定義狀態。Flink 的 checkpointing 機制保證了即時在故障發生下也能保障狀態的 exactly once 語義。
3.4 高度靈活的流式窗口
Flink 支持在時間窗口,統計窗口,session 窗口,以及數據驅動的窗口,窗口可以通過靈活的觸發條件來定制,以支持復雜的流計算模式。
3.5 帶反壓的連續流模型
數據流應用執行的是不間斷的(常駐)operators。Flink streaming 在運行時有着天然的流控:慢的數據 sink 節點會反壓(backpressure)快的數據源(sources)。
3.6 容錯性
Flink 的容錯機制是基於 Chandy-Lamport distributed snapshots 來實現的。這種機制是非常輕量級的,允許系統擁有高吞吐率的同時還能提供強一致性的保障。
3.7 流處理和批處理共用一個引擎
Flink 為流處理和批處理應用公用一個通用的引擎。批處理應用可以以一種特殊的流處理應用高效地運行。如下圖所示:
3.8 內存管理
Flink 在 JVM 中實現了自己的內存管理。應用可以超出主內存的大小限制,並且承受更少的垃圾收集的開銷。
3.9 迭代和增量迭代
Flink 具有迭代計算的專門支持(比如在機器學習和圖計算中)。增量迭代可以利用依賴計算來更快地收斂。如下圖所示:
3.10 程序調優
批處理程序會自動地優化一些場景,比如避免一些昂貴的操作(如 shuffles 和 sorts),還有緩存一些中間數據。
3.11 流處理應用
DataStream API 支持了數據流上的函數式轉換,可以使用自定義的狀態和靈活的窗口。下面示例展示了如何以滑動窗口的方式統計文本數據流中單詞出現的次數。
case class Word(word: String, freq: Long) val texts: DataStream[String] = ... val counts = text .flatMap { line => line.split("\\W+") } .map { token => Word(token, 1) } .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("freq")
3.12 批處理應用
Flink 的 DataSet API 可以使你用 Java 或 Scala 寫出漂亮的、類型安全的、可維護的代碼。它支持廣泛的數據類型,不僅僅是 key/value 對,以及豐富的 operators。下面示例展示了圖計算中 PageRank 算法的一個核心循環。
case class Page(pageId: Long, rank: Double) case class Adjacency(id: Long, neighbors: Array[Long]) val result = initialRanks.iterate(30) { pages => pages.join(adjacency).where("pageId").equalTo("pageId") { (page, adj, out : Collector[Page]) => { out.collect(Page(page.id, 0.15 / numPages)) for (n <- adj.neighbors) { out.collect(Page(n, 0.85*page.rank/adj.neighbors.length)) } } } .groupBy("pageId").sum("rank") }
3.13 類庫生態
Flink 棧中提供了很多高級 API 和滿足不同場景的類庫:機器學習、圖分析、關系式數據處理。當前類庫還在 beta 狀態,並且在大力發展。
3.14 廣泛集成
Flink 與開源大數據處理生態系統中的許多項目都有集成。Flink 可以運行在 YARN 上,與 HDFS 協同工作,從 Kafka 中讀取流數據,可以執行 Hadoop 程序代碼,可以連接多種數據存儲系統。如下圖所示:
4.總結
以上,便是對 Flink 做一個簡要的剖析認識,至於如何使用 Flink,以及其編譯,安裝,部署,運行等流程,較為簡單,這里就不多做贅述了,大家可以在 Flink 的官網,閱讀其 QuickStart 即可,[訪問地址]。
5.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!