「Flink」理解流式處理重要概念


什么是流式處理呢?

這個問題其實我們大部分時候是沒有考慮過的,大多數,我們是把流式處理和實時計算放在一起來說的。我們先來了解下,什么是數據流。

數據流(事件流)

  • 數據流是無邊界數據集的抽象
    • 我們之前接觸的數據處理,大多都都是有界的。例如:處理某天的數據、某個季度的數據等
    • 無界意味着數據是無限地、持續增長的
    • 數據流會隨着時間的推移,源源不斷地加入進來
  • 數據流無處不再
    • 信息卡交易
    • 電商購物
    • 快遞
    • 網絡交換機的流向數據
    • 設備傳感器發出的數據
    • 這些數據都是無窮無盡的
    • 每一件事情,都可以看成事件序列
  • 數據流是有序的
    • 數據的到來總是有個先后順序
  • 數據流是不可變的
    • 事件一旦發生,就不能被改變
    • 它陳述了某一個時刻的事實
  • 數據流是可以重播的
    • 為了處理的一些問題、糾正過去的錯誤,可以重跑數據流
    • 借助於Kafka,我們可以重新消費幾個月之前的原始數據流

流式處理

流式處理就是指實時地處理一個或多個事件流。它是一種編程范式。其他編程領域,主要有3種編程范式:

  1. 請求與響應
    • 延遲最小的一種方式,響應時間要求亞毫秒級到毫秒之間
    • 響應時間一般分穩定
    • 發出請求,等待響應(大部分的JavaEE同學,都是開發這一類編程范式的應用),其實就是OLTP
  2. 批處理
    • 特點:高延遲、高吞吐
    • 一般是固定某個時刻開始啟動執行,讀取所有的數據,然后輸出接口
    • 每次讀取到的都是舊數據
    • 主要應用在DWH或BI中
  3. 流式處理
    • 特點:介於上述兩者之間
    • 流式處理可以讓業務報告保持更新,持續響應

流的定義不依賴某個框架,只要儲蓄從一個無邊界數據集中讀取數據,並對它們進行處理生成結果,就是進行流式處理。重點是:整個過程必須是持續的。

流式處理中的時間

上述我們已經說過了,數據流都是有序的。某一時刻的數據是確定的。時間是流式處理中非常重要的概念。大部分流式應用的操作都是基於時間窗口的。

流式系統一般包含以下幾個時間概念(熟悉Flink的同學應該會很熟悉):

  • 事件時間(Eventtime)
    • 事件實際發生的時間
    • 用戶一般只對事件發生時間感興趣
  • 日志追加時間
    • 日志追加時間是指事件保存到事件存儲源的時間
    • 例如:數據是什么到達Kafka的(Kafka是可以啟用自動添加時間戳功能的)
  • 處理時間
    • 流式處理應用接收到事件后,要對齊進行處理的時間
    • 處理時間取決於流式處理應用何時讀取到這個時間
    • 如果應用程序使用了兩個線程來讀取同一個事件,這個時間戳可能會不一樣
    • 這個時間戳非常不可靠,應該避免使用它

狀態

如果流式處理是來一個事件就處理一個事件,那么流式處理就很簡單。但如果操作中包含了多個事件,流式處理就有意思了。例如:我們想在流式處理中統計北京用戶的訂單數量、消費金額等等。此時,就不能光處理單個事件了,我們需要獲取更多的事件。事件與事件之間的信息就稱之為狀態。例如簡單的,求某個類型的訂單數等。


這些狀態一般就保存在流式處理程序本地變量(本地內存)中,例如:使用HashMap來保存計數。但這種做法是很不可靠的,流式處理處理的是無界數據集,一旦應用程序出現異常,就會出現狀態丟失,這是我們說不能接受的。所以,每一種流式計算框架都會很小心地持久化狀態。如果應用程序重啟,需要將這些數據恢復。


流式處理一般包含兩種狀態:

  • 本地狀態
    • 這種狀態只能被應用程序實例訪問(不過Flink 1.9版本是可以外部來訪問本地狀態的)
    • 內嵌到應用程序的數據庫中進行維護和管理
    • 特點:速度快,但受內存大小的限制,所以,很多流式處理系統都將數據拆分到多個子流中處理
  • 外部狀態
    • 用外部存儲來處理,一般使用NoSQL系統,例如:Cassadra
    • 特點:沒有大小限制,可以被應用程序多個實例訪問、甚至外部應用訪問,但引入額外的系統會造成延遲、復雜性(例如:要維護內部和外部狀態一致性問題)

時間窗口

大部分針對流的操作都是基於時間窗口的。例如:計算一周內銷量最好的產品。兩個流的合並也是基於時間窗口的。流式系統會合並發生在相同時間段上的事件。窗口是有類型的。以下幾點是我們設計窗口需要考慮的:

  • 窗口的大小
    • 是基於5分鍾計算還是基於15分鍾、甚至是一天
    • 窗口越小,就能越快地發現變更,不過噪聲也就越多
    • 窗口越大,變更就跟平滑,不過延遲也越嚴重
  • 窗口的移動頻率(移動間隔)
    • 5分鍾的窗口,可以1分鍾計算一次,或者每秒鍾計算一次,或者每當有新事件到達時計算一次
    • 如果“移動頻率”與窗口大小相等,這種稱為滾動窗口(tumbling window)
    • 如果窗口隨着每一條記錄移動,這種情況稱為滑動窗口(sliding window)
  • 窗口的可更新時長
    • 假設:計算了 00:00 – 00:05 之間的訂單總數,一個小時后,又得到了一些“事件時間”是 00:02的事件(例如:因為網絡通信故障,這個消息晚到了一段時間),這種情況,是否需要更新 00:00 – 00:05 這個窗口的結果呢?或者就不處理了?
    • 理想情況下,可以定義一個時間段,只要在這個時間段內,事件可以被添加到對應的時間片段里。例如:如果事件處於4個小時以內,就更新,否則,就忽略掉。
  • 窗口時間對齊
    • 窗口可以與時間對齊,例如:5分鍾的窗口如果每分鍾移動一次,那么第一個分片可以是:00:00 – 00:05,第二個就是 00:01 – 00:06
    • 窗口也可以不與時間對齊,例如:應用可以在任何時間啟動,那么第一個分片有可能是03:17 – 03:22
    • 滑動窗口永遠不會與時間對齊,只要有新的記錄到達,就會發生移動


下面這張圖,說明了滾動窗口與滑動窗口的區別。

滾動窗口:假設窗口的大小為5分鍾,這里確定的3個時間窗口

滑動窗口:假設每分鍾滑動一次,那么這個時候會有5個時間窗口,計算結果會發生重疊

image

流式處理的設計模式

單個事件處理

這是流式處理最基本的模式。這種模式也叫:map或filter模式。經常被用來過濾無用的事件或者用於轉換事件。


這種模式,應用程序讀取流中的數據,修改數據,然后把事件生成到另一個流上。這一類應用程序無需在程序內部維護狀態,每一個事件都是獨立處理的。這種錯誤恢復和進行負載均衡都很容易。因為無需進行狀態恢復操作。


使用本地狀態

大部分流式處理應用關系如何聚合數據。特別是:基於時間窗口進行聚合。例如:找到每天最低、最高的交易價格。要實現這種操作,就需要維護流的狀態。例如:我們需要將最小值、最大值保存下來,用它們與每一個新值對比。這類操作,可以通過本地狀態來實現。例如:每一個分組都維護自己分組的狀態。


一旦流式處理中包含了本地狀態,就需要解決以下問題。

  • 內存使用
    • 必須要有足夠的內存來保存本地狀態
  • 持久化
    • 確保應用程序關閉時,不會丟失狀態
    • 例如:我們可以使用RocksDB將本地狀態保存到內存里、同時持久化到磁盤上,以便重啟后恢復。而且需要將本地狀態的變更發送到Kafka的主題上
  • 重新負載均衡
    • 有時候,分區被重新分配給不同的消費者。這種情況,失去分區的實例必須把最后的狀態保存下來,或得分區的實例必須要知道如何恢復到正確的狀態


多階段處理和重分區

有些時候,我們要通過所有可用的數據來獲得結果。例如:要發布每天的“前10支”股票,這10支股票需要從每天的交易股票中挑選出來。如果僅僅在單個實例上處理是不夠的,因為10支股票分布在多個實例上。


此種,我們分為多個階段來處理。

1、計算每支股票當天的漲跌。這個計算可以在每個實例上執行

2、將結果寫入到單個分區

3、再用一個實例找出當天的前10支股票


這一類操作就與MapReduce很像了。


使用外部查找——流和表的連接

有時候,流式處理需要將外部數據和流集成在一日。例如:外部數據中保存了一些規則、或者將完整完整地用戶信息拉取到流中。

這種case最大的問題,外部查找會帶來嚴重的延遲,一般在 5-15 ms之間,這在很多情況下是不可行的。而且,外部系統也無法承受這種額外的負載——流式處理系統每秒可以處理10-50W個事件,而數據庫正常情況下每秒只能處理1W個事件,所以需要伸縮性更強的解決方案。


為了獲取更好的性能和更強的伸縮性,需要將外部數據庫的信息緩存到流式處理應用中。但考慮以下問題:

如何保證緩存里的數據是最新的?

如果刷新太頻繁,仍然會對數據庫造成很大壓力,緩存也就無用了。

如果刷新不及時,那么流式處理中所用的數據就會過時。

如果能夠捕捉數據庫的變更事件,並形成事件流,流式處理作業就可以監聽事件流,並及時更新緩存。捕捉數據庫的變更事件並形成數據流,這個過程稱為CDC(Change Data Capture)。例如:我們可以通過Canal來捕獲MySQL數據庫的變化、可以通過ogg來捕獲Oracle數據庫的變化


流與流的連接

有時候需要連接兩個真實的事件流。要連接兩個流,就是連接所有的歷史事件(將兩個妞中具有相同鍵、發生在相同時間窗口內的事件匹配起來),這種流和流的連接稱為:基於時間窗口的連接(windowed-join)。連接兩個流,通常包含一個滑動時間窗口

image


亂序事件

不管對於流式處理、還是傳統的ETL系統,處理亂序事件都是一個挑戰。物聯網領域經常發生亂序事件:一個移動設備斷開Wifi連接幾個小時,在重新連上WiFi后,將幾個小時堆積的事件一並發出去。要讓流式處理應用處理好這些場景,需要做到幾下:

  • 識別亂序事件
    • 應用程序需要檢查事件的時間,並將其與當前時間進行比較
  • 規定一個時間段用於重排亂序事件
    • 例如:3個小時以內的事件可以重排,但3個小時以外的事件就可以直接扔掉
  • 具有一定時間段內重排事件的能力
    • 這是流式處理應用和批處理的重要不同點
    • 假設有一個每天運行的作業,一些事件在作業結束之后才到達,那么可以重新運行昨天的作業來更新
    • 而在流式處理中,重新運行昨天的作業是不存在的,亂序事件和新到達的事件必須一起處理
  • 具備更新結果的能力
    • 如果處理的結果保存在數據庫你,那么可以通過put或update對結果進行更新


重新處理

該重要模式是重新處理事件:

  • 流式處理應用更新了,要使用新版本應用處理同一個事件流,生成新的結果,並比較兩種版本的結果,然后某個時間點將客戶端切換到新的結果流
  • 現有的流式處理出現了缺陷,修復后,需要重新處理並重新計算結果

第一種情況,需要Kafka將事件流長時間地保存在可伸縮的數據存儲中

  • 將新版本的應用作為一個新的消費者組
  • 新的版本從輸入主題的第一個偏移量開始讀取數據
  • 檢查結果流,在新版本的處理作業趕上進度時,將客戶端應用程序切換到新的結果流上

第二種情況,需要應用程序回到輸入流的起始位置開始處理,同時重置本地狀態,還要清理之前的輸出流。這種方式處理起來比較困難。建議還是使用第一種方案。


參考文獻:

《Kafka全文指南》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM