flink系列-1、flink介紹,反壓原理


一、flink介紹

 Apache Flink是一個分布式大數據處理引擎,可對 有界數據流無界數據流進行 有狀態計算。 可部署在各種集群環境,對各種大小的數據規模進行快速計算。

 

1.1、有界數據流和無界數據流 

1、 無界流有一個開始但沒有定義的結束。它們不會在生成時終止並提供數據。必須持續處理無界流,即必須在攝 取事件后立即處理事件。無法等待所有輸入數據到達,因為輸入是無界的,並且在任何時間點都不會完成。處理無界數據通常要求以特定順序(例如事件發生的順序)攝取事件。 
2、 有界流具有定義的開始和結束。可以在執行任何計算之前通過攝取所有數據來處理有界流。處理有界流不需要 有序攝取,因為可以始終對有界數據集進行排序。有界流的處理也稱為批處理。
Apache Flink擅長處理無界和有界數據集。精確控制時間和狀態使Flink的運行時能夠在無界流上運行任何類型的應 用程序。有界流由算法和數據結構內部處理,這些算法和數據結構專門針對固定大小的數據集而設計,從而產生出 色的性能。

1.2、flink的特點

  • 支持 java、python、 scala api 
  • 流(dataStream)批(dataSet)一體化 
  • 支持事件處理和無序處理通過DataStream API,基於DataFlow數據流模型 
  • 在不同的時間語義(事件時間-數據產生的時間,攝取時間:集群獲取到數據、處理時間)下支持靈活的窗口(時間,滑動、翻滾,會話,自定義觸發器) 
  • 支持有狀態計算的Exactly-once(僅處理一次)容錯保證 
  • 支持基於輕量級分布式快照checkpoint機制實現的容錯 
  • 支持savepoints 機制,一般手動觸發,在升級應用或者處理歷史數據是能夠做到無狀態丟失和最小停機時間 
  • 兼容hadoop的mapreduce,集成YARN,HDFS,Hbase 和其它hadoop生態系統的組件 
  • 支持大規模的集群模式,支持yarn、Mesos。可運行在成千上萬的節點上 
  • 在dataSet(批處理)API中內置支持迭代程序 
  • 圖處理(批) 機器學習(批) 復雜事件處理(流) 
  • 自動反壓機制 
  • 高效的自定義內存管理 

1.3、flink的分層模型

 flink自身提供了不同級別的抽象來支持我們開發流式或者批處理程序,上圖描述了Flink 支持的四種不同級別的抽象。

Stateful Stream Processing 
  • 位於最底層, 是core API 的底層實現 
  • processFunction (處理函數)
  • 利用低階,構建一些新的組件或者算子 
  • 靈活性高,但開發比較復雜 
  • 表達性最強,可以操作狀態,time等
Core API 
  • DataSet - 批處理 API 
  • DataStream –流處理 API 
  • 封裝了一些算子
Table 
API
 &
 SQL
  • 
構建在Table
之上,都需要構建Table
環境 
  • 不同的類型的Table
構建不同的Table
環境 
  • Table
可以與DataStream或者DataSet進行相互轉換 
  • Streaming
SQL不同於存儲的SQL,最終會轉化為流式執行計划 

二、finlk的反壓原理 

  • 反壓是分布式處理系統中經常遇到的問題,當消費者速度低於生產者的速度時,則需要消費者將信息反饋給生產者使得生產者的速度能和消費者的速度進行匹配。
  • Stom 在處理背壓問題上簡單粗暴,當下游消費者速度跟不上生產者的速度時會直接通知生產者,生產者停止生產數據,這種方式的缺點是不能實現逐級反壓,且調優困難。設置的消費速率過小會導致集群吞吐量低下,速率過大會導致消費者 OOM。
  • Spark Streaming 為了實現反壓這個功能,在原來的架構基礎上構造了一個“速率控制器”,這個“速率控制器”會根據幾個屬性,如任務的結束時間、處理時長、處理消息的條數等計算一個速率。在實現控制數據的接收速率中用到了一個經典的算法,即“PID 算法”。
  • Flink 沒有使用任何復雜的機制來解決反壓問題,Flink 在數據傳輸過程中使用了分布式阻塞隊列。我們知道在一個阻塞隊列中,當隊列滿了以后發送者會被天然阻塞住,這種阻塞功能相當於給這個阻塞隊列提供了反壓的能力。

具體過程如下:

如下圖所示展示了 Flink 在網絡傳輸場景下的內存管理。網絡上傳輸的數據會寫到 Task 的 InputGate(IG)中, 經過Task的處理后,再由Task寫到ResultPartition(RS)中。每個 Task 都包括了輸入和輸入,輸入和輸出的 數據存在 Buffer 中(都是字節數據)。Buffer 是 MemorySegment 的包裝類。 
  1. 根據配置,Flink 會在 NetworkBufferPool 中生成一定數量(默認2048,一個32K)的內存塊 MemorySegment,內存塊 的總數量就代表了網絡傳輸中所有可用的內存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共 享的,每個節點(TaskManager - 跑任務的進程,類似於spark的extour)只會實例化一個。
  2. Task 線程啟動時,會向 NetworkEnvironment 注冊,NetworkEnvironment 會為 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創建一個 LocalBufferPool(緩沖池)並設置可申請的 MemorySegment(內存塊)數量。IG 對應的緩沖池初始的內存塊數量與 IG 中 InputChannel 數量一致,RP 對應的緩沖池初始的內存 塊數量與 RP 中的 ResultSubpartition 數量一致。不過,每當創建或銷毀緩沖池時,NetworkBufferPool 會計 算剩余空閑的內存塊數量,並平均分配給已創建的緩沖池。注意,這個過程只是指定了緩沖池所能使用的內存 塊數量,並沒有真正分配內存塊,只有當需要時才分配。為什么要動態地為緩沖池擴容呢?因為內存越多,意 味着系統可以更輕松地應對瞬時壓力(如GC),不會頻繁地進入反壓狀態,所以我們要利用起那部分閑置的 內存塊。 
  3. 在 Task 線程執行過程中,當 Netty 接收端收到數據時,為了將 Netty 中的數據拷貝到 Task 中, InputChannel(實際是 RemoteInputChannel)會向其對應的緩沖池申請內存塊(上圖中的①)。如果緩沖池 中也沒有可用的內存塊且已申請的數量還沒到池子上限,則會向 NetworkBufferPool 申請內存塊(上圖中的 ②)並交給 InputChannel 填上數據(上圖中的③和④)。如果緩沖池已申請的數量達到上限了呢?或者 NetworkBufferPool 也沒有可用內存塊了呢?這時候,Task 的 Netty Channel 會暫停讀取,上游的發送端會立 即響應停止發送,拓撲會進入反壓狀態。當 Task 線程寫數據到 ResultPartition 時,也會向緩沖池請求內存 塊,如果沒有可用內存塊時,會阻塞在請求內存塊的地方,達到暫停寫入的目的。
  4. 當一個內存塊被消費完成之后(在輸入端是指內存塊中的字節被反序列化成對象了,在輸出端是指內存塊中的 字節寫入到 Netty Channel 了),會調用 Buffer.recycle() 方法,會將內存塊還給 LocalBufferPool (上 圖中的⑤)。如果LocalBufferPool中當前申請的數量超過了池子容量(由於上文提到的動態容量,由於新注 冊的 Task 導致該池子容量變小),則LocalBufferPool會將該內存塊回收給 NetworkBufferPool(上圖中的 ⑥)。如果沒超過池子容量,則會繼續留在池子中,減少反復申請的開銷。 

2.1、反壓的過程 

下面這張圖簡單展示了兩個 Task 之間的數據傳輸以及 Flink 如何感知到反壓的:
 
  1.  記錄“A”進入了 Flink 並且被 Task 1 處理。(這里省略了 Netty 接收、反序列化等過程)
  2. 記錄被序列化到 buffer 中。 
  3. 該 buffer 被發送到 Task 2,然后 Task 2 從這個 buffer 中讀出記錄。 
不要忘了:記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer 
結合上面兩張圖看:Task 1 在輸出端有一個相關聯的 LocalBufferPool(稱緩沖池1),Task 2 在輸入端也有一個 相關聯的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來序列化記錄 “A”,我們就序列化 並發送該 buffer。 
這里我們需要注意兩個場景: 
  • 本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節點(TaskManager),該 buffer 可以直接交給下一 個 Task。一旦 Task 2 消費了該 buffer,則該 buffer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度,導致緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。 
  • 遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節點上,那么 buffer 會在發送到網絡(TCP Channel)后被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然后拷貝網絡中的數據到 buffer 中。 如果沒有可用的 buffer,會停止從 TCP 連接中讀取數據。在輸出端,通過 Netty 的水位值機制(可配置)來 保證不往網絡中寫入太多數據。如果網絡中的數據(Netty輸出緩沖中的字節數)超過了高水位值,我們會等 到其降到低水位值以下才繼續寫入數據。這保證了網絡中不會有太多的數據。如果接收端停止消費網絡中的數 據(由於接收端緩沖池沒有可用 buffer),網絡中的緩沖數據就會堆積,那么發送端也會暫停發送。另外,這 會使得發送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請求 buffer,阻塞了 writer 往 ResultSubPartition 寫數據。 
這種固定大小緩沖池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生產數據的速度不會快 於消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數據傳輸自然地擴展到更復雜的 pipeline 中,保 證反壓機制可以擴散到整個 pipeline。 

2.2、反壓監控

 Flink 的實現中,只有當 Web 頁面切換到某個 Job 的 Backpressure 頁面,才會對這個 Job 觸發反壓檢測,因為 反壓檢測還是挺昂貴的。JobManager 會通過 Akka 給每個 TaskManager 發送TriggerStackTraceSample消息。默 認情況下,TaskManager 會觸發100次 stack trace 采樣,每次間隔 50ms(也就是說一次反壓檢測至少要等待5秒 鍾)。並將這 100 次采樣的結果返回給 JobManager,由 JobManager 來計算反壓比率(反壓出現的次數/采樣的 次數),最終展現在 UI 上。UI 刷新的默認周期是一分鍾,目的是不對 TaskManager 造成太大的負擔。 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM