什么是Apache Flink實時流計算框架?


一.概述

  Apache Flink 是一個框架和分布式處理引擎,用於對無限制和有限制的數據流進行有狀態的計算。Flink被設計為可以在所有常見的集群環境中運行,以內存速度和任何規模的計算。

  

  首先,需要對什么是無限制什么是有限制做一下說明,首先看官方的解釋:

  1、無限制數據流

    無限制數據流指數據是沒有界限的,其有一個起點,但是定義終點。它們不會終止並在生成數據時提供數據。無限制的流必須被連續處理,即事件被攝取后必須立即處理。無法等待所有輸入數據的到達【因為可能永遠也沒有所有這一說】。處理無限制的數據通常要求以特定順序【例如事件發生的順序】提取事件,以便能夠推斷出結果的完整性。

  2、有限制數據流

    有限制數據流指具有定義的開始和結束。可以通過在執行任何計算之前提取所有數據來處理有限制數據流。由於有限制數據流始終可以排序,因此不需要有序攝取即可處理有限制數據流,此時,也稱為批處理。

  說完了官方的解釋,來說說我自己的看法,有不對的地方大家多多指正,所謂有限制數據流和無限制數據流,就拿時間為條件進行分析,區別說白了就是數據生成的持續時間,在一定時間內能得到全部數據的就是有限制數據流也稱一批數據,而數據隨着某些條件一直延續生成,而這些條件又是無止境的【例如:時間,空間等】,那就是無限制數據流。當然數據是否有限制還是認為定義的,就拿時間來說,指定時間段的就是有限制數據流【比如定時統計每天網站的訪問量】,而累計從網站上線到現在的實時訪問量就是無限制數據流,因為未來就是現在!

  其實某些數據是不是有限制的對Flink來說其實區別不大,Flink擅長處理無限制和有限制的數據集【主要還是無限制的,就是常說的流計算,其在批處理上比其它大數據計算框架例如Spark來說,並沒有多少優勢,反而在於機器學習、圖計算等方面存在劣勢】,對時間和狀態的准確控制使Flink的運行能夠在無限制的流上運行任何類型的應用程序。有限制數據流由專門為固定大小的數據集設計的算法和數據結構在內部進行處理,從而產生出色的性能。

  部署的方便性

    Flink是一個分布式系統,需要計算資源才能執行應用程序,Flink可以與所有常見的集群資源管理器【如YARN、Mesos、Kubernate等】進行集成,這樣允許Flink以其常用的方式與每個資源管理器進行交互。也可以設置為獨立集群運行。

    部署Flink應用程序時,Flink會根據應用程序配置的並行性自動識別所需要的資源,並向資源管理器請求它們。如果發生故障,Flink會通過請求新資源來替換發生故障的容器。提交或控制應用程序的所有通信均通過REST調用進行,這簡化了Flink在許多環境中的集成。

  運行任意規模應用程序

    Flink旨在運行任何規模的有狀態應用程序。將應用程序並行化為可能在集群上分布式並行執行的數千個任務。因此,應用程序幾乎可以利用無限數量的CPU、內存、磁盤和網絡IO。而且,Flink易於維護非常大的應用程序狀態。它的異步和增量檢查點算法可確保對處理延遲的影響達到最小,同時可保證一次狀態一致性。

  內存優先

    有狀態Flink應用程序針對本地狀態進行優化。任務始終保持在內存中,或者如果狀態大小超過可用內存,則始終保持在訪問有效的磁盤數據結構中。因此,任務通過訪問通常處於內存中的狀態來執行所有計算,從而產生非常低的處理延遲。Flink通過定期將本地狀態異步同步到指定持久存儲來確保出現故障時一次准確的狀態一致性。

    

二.應用場景

  Flink是用於無限制和有限制數據流上的有狀態計算框架。其在不同的抽象級別上提供了多個API,並為常見用例提供了專門的庫。

  一.流應用程序的構建塊

    流處理框架可以構建和執行的應用程序的類型由框架控制流、狀態和時間的能力定義。

  二.流的特點和分類

    顯然,流是流處理的基本內容。但是,流可能具有不同的特性,這些特性會影響流的處理方式。Flink是一個通用的處理框架,可以處理任何類型的流。

    >有限制和無限制數據流【上面已經詳細描述,此處不再贅述】

    >實時流和記錄流【區別為是否在數據產生時就開始處理,立即或經過類似Kafka暫存處理的為實時流,先存儲到持久存儲器以待以后處理的為記錄流】

  三.狀態

    每個存在多個事件的流應用程序都是有狀態的,即,僅對單個事件應用轉換的應用程序不需要狀態。任何運行基本業務邏輯的應用程序都需要記住事件或中間結果,以便在以后的某個時間點訪問它們,例如在下一個事件時或在特定的持續時間后。

    

    應用程序狀態是Flink中的一等公民。通過查看Flink在狀態處理上下文中提供的所有功能,可以看到這一點。

    1.多個狀態基元

      Flink為不同的數據結構【例如:原子值、列表或映射】提供狀態基元。開發人員可以根據功能的訪問模式選擇最有效的狀態原語。

    2.可插拔狀態后端

      在可插拔狀態后端中管理應用程序狀態並對其進行檢查。Flink具有不同的狀態后端,這些后端將狀態存儲在內存或RocksDB【高效的嵌入式磁盤數據存儲】中。自定義狀態后端也可以插入。

    3.一次精確的狀態一致性

      Flink的檢查點和恢復算法可確保發生故障時應用程序狀態的一致性。因此,可以透明地處理故障,並且不會影響應用程序的正確性。

    4.支持大狀態

      Flink由於具有異步和增量檢查點算法,因此能夠保持大小為TB的應用程序狀態。

    5.可擴展的應用程序

      Flink通過將狀態重新分配給更多或更少的工作程序來支持有狀態應用程序的擴展。

  四.時間

    時間是流應用程序的另一個重要組成部分。大多數時間流具有固定的時間語義,因為每個事件都是在特定的時間點產生的。此外,許多常見的流計算都是基於時間的,例如窗口聚合、會話、模式檢測和基於時間的連接。流處理的一個重要方面是應用程序如何測量時間,即事件時間與處理時間之差。

    1.事件時間模式

      使用事件時間語義處理流的應用程序根據事件的時間戳計算結果。因此,無論是處理記錄的事件還是實時事件,事件時間處理都可以提供准確一致的結果。

    2.水印支持

      Flink在事件時間應用程序中使用水印推理時間。水印還是權衡結果的延遲和完整性的靈活機制。

    3.后期數據處理

      在帶有水印的事件時間模式下處理流時,可能會發生所有相關事件到達之前已經完成計算的情況。這種事件稱為遲發事件。Flink具有多個選項來處理較晚的事件,例如通過側面輸出重新路由它們並更新先前完成的結果。

    4.處理時間模式

      除事件時間模式外,Flink還支持處理時間語義,該語義執行由處理機的掛鍾時間觸發計算。處理時間模式可能適合具有嚴格的低延遲要求的某些應用程序,這些應用程序可以忍受近似結果。

  五.分層API

    Flink提供了三層API。每個API針對不同的用例在簡潔性和表達性之間提供了不同的權衡。

    

    1.ProcessFunctions

      ProcessFunctions是Flink提供的最具表現力的功能接口。Flink提供了ProcessFunctions來處理來自一個或兩個輸入流或分組在一個窗口中的單個事件。ProcessFunctions提供對時間和狀態的細粒度控制。其可以任意修改其狀態並注冊計時器,這些計時器會觸發回調函數。因此,ProcessFunctions可以根據許多有狀態事件驅動的應用程序的需要實現復雜的業務邏輯。

      以下示例顯示KeyedProcessFunction操作KeyedStream並匹配START和END事件。當一個事件被START接收,則該函數將記住其狀態時間戳和寄存器在有效期為四個小時的計時器中。如果END在計時器觸發之前收到事件,則該函數將計算END和START事件之間的持續時間,清除狀態並返回值。否則,計時器將觸發並清除狀態。

 1 /**
 2  * Matches keyed START and END events and computes the difference between 
 3  * both elements' timestamps. The first String field is the key attribute, 
 4  * the second String attribute marks START and END events.
 5  */
 6 public static class StartEndDuration
 7     extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
 8 
 9   private ValueState<Long> startTime;
10 
11   @Override
12   public void open(Configuration conf) {
13     // obtain state handle
14     startTime = getRuntimeContext()
15       .getState(new ValueStateDescriptor<Long>("startTime", Long.class));
16   }
17 
18   /** Called for each processed event. */
19   @Override
20   public void processElement(
21       Tuple2<String, String> in,
22       Context ctx,
23       Collector<Tuple2<String, Long>> out) throws Exception {
24 
25     switch (in.f1) {
26       case "START":
27         // set the start time if we receive a start event.
28         startTime.update(ctx.timestamp());
29         // register a timer in four hours from the start event.
30         ctx.timerService()
31           .registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
32         break;
33       case "END":
34         // emit the duration between start and end event
35         Long sTime = startTime.value();
36         if (sTime != null) {
37           out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
38           // clear the state
39           startTime.clear();
40         }
41       default:
42         // do nothing
43     }
44   }
45 
46   /** Called when a timer fires. */
47   @Override
48   public void onTimer(
49       long timestamp,
50       OnTimerContext ctx,
51       Collector<Tuple2<String, Long>> out) {
52 
53     // Timeout interval exceeded. Cleaning up the state.
54     startTime.clear();
55   }
56 }

    2.DataStream API

      數據流API提供了許多常見的流處理操作的原語,比如窗口,記錄在A-時間變換,並通過查詢外部數據存儲獲取事件。數據流API可用於Java和Scala編程語言調用,如map()、reduce()和aggregate()。函數可以通過擴展接口定義,也可以定義為Java或Scala lambda函數。

      以下示例顯示了如何對點擊流進行會話並計算每個會話的點擊次數。

 1 // a stream of website clicks
 2 DataStream<Click> clicks = ...
 3 
 4 DataStream<Tuple2<String, Long>> result = clicks
 5   // project clicks to userId and add a 1 for counting
 6   .map(
 7     // define function by implementing the MapFunction interface.
 8     new MapFunction<Click, Tuple2<String, Long>>() {
 9       @Override
10       public Tuple2<String, Long> map(Click click) {
11         return Tuple2.of(click.userId, 1L);
12       }
13     })
14   // key by userId (field 0)
15   .keyBy(0)
16   // define session window with 30 minute gap
17   .window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
18   // count clicks per session. Define function as lambda function.
19   .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

    3.SQL和表API

      Flink具有兩個關系API,即Table API和SQL。這兩個API都是用於批處理和流處理的統一API,即,對無界的實時流或有界的記錄流都是以相同的語義執行的查詢,並產生相同的結果。Table API和SQL利用Apache Calcite進行剖析,驗證和查詢優化。它們可以與DataStream、DataSet API無縫集成,並支持用戶定義的標量,聚合和表值函數。Flink的關系API旨在簡化數據分析,數據管道和ETL應用程序的定義。以下示例顯示了SQL查詢,以會話化點擊流並計算每個會話的點擊次數。

 1 SELECT userId, COUNT(*) 2 FROM clicks 3 GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId 

    4.庫

      Flink具有幾個常見數據處理用例的庫。這些庫通常嵌入在API中,並且不完全獨立。因此,它們可以從API的所有功能中受益,並可以與其他庫進行集成。

      4.1.復雜事件處理【CEP】

        模式檢測是事件流處理中非常常見的用例。Flink的CEP庫提供了一個API,用於指定事件的模式【考慮正則表達式或狀態機】。CEP庫與Flink的DataStream API集成在一起,因此可以在DataStreams上評估模型。CEP庫的應用程序包括網絡入侵檢測,業務流程監視和欺詐檢測。

      4.2.DataSet API

        DataSet API是Flink的批處理應用程序核心API。DataSet API的原語包括map,reduce,join【外連接】,cogroup和iterate。所有操作都由算法和數據結構支撐,這些算法和數據結構對內存中的序列化數據進行操作,如果數據大小超出內存大小,則溢出到磁盤。Flink的DataSet API的數據處理算法受傳統數據庫運算符的啟發,例如混合哈希連接或外部合並排序。

      4.3.Gelly

        Gelly是用可伸縮圖形處理和分析的庫。Gelly在DataSet API之上實現並和它進行集成。因此,它得益於其可擴展且強大的庫。Gelly具有內置算法,例如標簽傳播,三角形枚舉和頁面等級,但還提供了Graph API,可簡化自定義圖形算法的實現。

三.運行方式

  Apache Flink是用於無限制和有限制的數據流上的有狀態計算的框架。由於許多流應用程序的設計目的是在最少的停機時間內連續運行,因此流處理框架必須提供出色的故障恢復能力,以及在運行時監視和維護應用程序的工具。Flink將重點放在流處理的操作上。下面將介紹Flink的故障恢復機制,以及其用來管理和監控的應用程序。

  一.不間斷運行流計算應用程序7*24

    機器出現故障在分布式系統中是非常常見的。像Flink這樣的分布式流處理框架必須從故障中恢復,才能保證7*24小時不間斷運行。顯然,這不僅意味着失敗后重新啟動應用程序,而且還確保其內部狀態保持一致,從而使應用程序可以像沒有發生過故障那樣繼續執行。Flink提供一些功能來確保應用程序繼續運行並保持一致:

    1.一致的檢查點

      Flink的恢復機制基於應用程序狀態一致性的檢查點。如果發生故障,將重新啟動應用程序,並從最新的檢查點加載其狀態。與可重置的流數據源結合使用時,此功能可以保證一次狀態一致性。

    2.高效的檢查點

      如果應用程序的狀態保持TB級,則對應用程序的狀態進行檢查會非常昂貴Flink可以執行異步和增量檢查點,以使檢查點對應用程序延遲SLA的影響很小。

    3.端到端精確一次

      Flink具有特定存儲系統的事務接收器,即使在發生故障的情況下,也可以保證數據僅被精確 地寫入一次。

    4.與集群管理器集成

      Flink與集群管理器緊密集成,例如Hadoop YARN,Mesos或Kubernates。當流程失敗時,新流程將自動啟動以接管其工作。

    5.高可用性設置

      Flink具有高可用性模式,可消除所有單點故障。HA模式基於Apache Zookeeper,這是一項經過實踐檢驗的服務,可實現可靠的分布式協調服務。

  二.更新,遷移,暫停和恢復應用程序

    需要維護支持關鍵業務服務的流應用程序。需要修復錯誤,並需要改進或實現新功能。但是,更新有狀態流應用程序並非易事。通常,一個人無法簡單地停止應用程序並重新啟動一個固定或改進版本,因為一個人無法承受失去應用程序狀態的負擔。

    Flink的保存點是一項獨特而強大的功能,可以解決更新有狀態應用程序的問題以及許多其他相關挑戰。保存點是應用程序狀態的一致快照,因此與檢查點非常相似。但是,與檢查點相比,保存點需要手動觸發,並且在停止應用程序時不會自動將其刪除。保存點可用於啟動狀態兼容的應用程序並初始化其狀態。保存點啟用一下功能:

    1.應用程序演化

      保存點可用於演化應用程序。可用從先前版本的應用程序中獲取的保存點重新啟動應用程序的固定版本或改進版本。也可以從較早的時間點啟動應用程序【如果存在這樣的保存點】,以修復有缺陷的版本產生的錯誤結果。

    2.集群遷移

      使用保存點,可以將應用程序遷移【或克隆】到不同的集群。

    3.Flink版本更新

      可以使用保存點遷移應用程序以在新的Flink版本上運行。

    4.應用程序縮放

      保存點可用於增加或減少應用程序的並行性。

    5.A/B測試和假設方案

      可以通過從同一保存點啟動所有版本來比較應用程序的兩個【或多個】不同版本的性能或質量。

    6.暫停或恢復

      可以通過保存一點並停止它來暫停應用程序。在以后的任何時間點,都可以從保存點恢復應用程序。

    7.歸檔

      可以將保存點歸檔,以便將應用程序的狀態重置為較早的時間點。

  三.監視和控制應用程序

    就像任何其他服務一樣,需要監視連續運行的流應用程序並將其集成到組織的操作基礎架構【即監視和日志記錄服務】中。監視有助於預測問題並提前做出反應。通過日志記錄,可以進行根本原因分析以調查故障。最后,易於訪問的界面是控制運行中的應用程序的重要功能。Flink與許多常用的日志記錄和監視服務很好地集成在一起,並提供REST API來控制應用程序和查詢信息。

    1.Web UI

     2.日志記錄:整合slf4j,並與日志記錄框架log4j或logback集成。

     3.指標:Flink具有完善的指標系統,可收集和報告系統和用戶定義的指標。指標可以導出到多個報告器,包括JMX,Ganglia,Graphite,Prometheus,StatsD,Datadog和Slf4j。

     4.REST API:Flink公開REST API來提交新應用程序,獲取正在運行的應用程序的保存點或取消應用程序。REST API還公布了正在運行或已完成的應用程序的元數據和收集的指標。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM