【翻譯】Flink Table Api & SQL — 性能調優 — 流式聚合


本文翻譯自官網:Streaming Aggregation  https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/tuning/streaming_aggregation_optimization.html

Flink Table Api & SQL 翻譯目錄

SQL是用於數據分析的最廣泛使用的語言。Flink的Table API和SQL使用戶能夠以更少的時間和精力定義高效的流分析應用程序。而且,Flink Table API和SQL得到了有效的優化,它集成了許多查詢優化和優化的運算符實現。但是並非默認情況下會啟用所有優化,因此對於某些工作負載,可以通過打開某些選項來提高性能。

在此頁面中,我們將介紹一些有用的優化選項以及流聚合的內部原理,這將在某些情況下帶來很大的改進。

注意:當前,僅Blink計划程序支持此頁面中提到的優化選項。

注意:當前,僅對無邊界聚合支持流聚合優化將來將支持窗口聚合的優化

默認情況下,無界聚合運算符一個一個地處理輸入記錄,即(1)從狀態讀取累加器,(2)將記錄累加/縮回到累加器,(3)將累加器寫回到狀態,(4)下一條記錄將從(1)重新進行處理。此處理模式可能會增加StateBackend的開銷(尤其是對於RocksDB StateBackend)。此外,生產中非常常見的數據偏斜會使問題惡化,並使工作容易承受背壓情況。

小批量聚合

小型批處理聚合的核心思想是將一組輸入緩存在聚合運算符內部的緩沖區中。當觸發輸入以進行處理時,每個鍵只需一個操作即可訪問狀態。這樣可以大大減少狀態開銷並獲得更好的吞吐量。但是,這可能會增加一些延遲,因為它會緩沖一些記錄而不是立即處理它們。這是吞吐量和延遲之間的權衡。

下圖說明了小批量聚合如何減少狀態操作。

 

 

MiniBatch 優化默認情況下處於禁用狀態。為了使這種優化,您應該設置 table.exec.mini-batch.enabledtable.exec.mini-batch.allow-latencytable.exec.mini-batch.size請參閱配置頁面以獲取更多詳細信息。

以下示例顯示如何啟用這些選項。

// instantiate table environment
val tEnv: TableEnvironment = ...

// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task

局部全局聚合

提出將局部聚合分為兩個階段來解決數據傾斜問題,即先在上游進行局部聚合,然后在下游進行全局聚合,這類似於MapReduce中的Combine + Reduce模式。例如,考慮以下SQL:

SELECT color, sum(id)
FROM T
GROUP BY color

數據流中的記錄可能會傾斜,因此聚合運算符的某些實例會比其他實例處理更多的記錄,這會導致熱點。本地聚合可以幫助將具有相同密鑰的一定數量的輸入累加到單個累加器中。全局匯總將僅接收減少的累加器,而不是大量的原始輸入。這可以大大減少網絡改組和狀態訪問的成本。每次本地聚合累積的輸入數量基於最小批處理間隔。這意味着本地-全局聚合取決於啟用了小批量優化。

下圖顯示了本地全局聚合如何提高性能。

 

 

以下示例顯示了如何啟用本地全局聚合。

// instantiate table environment
val tEnv: TableEnvironment = ...

// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation

分割不同的聚合

局部全局優化可有效消除常規聚合的數據偏斜,例如SUM,COUNT,MAX,MIN,AVG。但是,在處理不同的聚合時,其性能並不令人滿意。

例如,如果我們要分析今天有多少唯一用戶登錄。我們可能有以下查詢:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

如果distinct key(即user_id)的值稀疏,則COUNT DISTINCT不能減少記錄。即使啟用了局部全局優化,它也無濟於事。因為累加器仍包含幾乎所有原始記錄,並且全局聚合將成為瓶頸(大多數繁重的累加器由一項任務處理,即在同一天)。 

此優化的想法是將不同的聚合(例如COUNT(DISTINCT col))分為兩個級別。第一次聚合由組密鑰和其他存儲桶密鑰混洗。使用來計算存儲桶密鑰HASH_CODE(distinct_key) % BUCKET_NUMBUCKET_NUM默認為1024,可以通過table.optimizer.distinct-agg.split.bucket-num選項配置第二次聚合由原始組密鑰改組,並用於SUM聚合來自不同存儲桶的COUNT DISTINCT值。由於相同的唯一鍵將僅在同一存儲桶中計算,因此轉換是等效的。存儲桶密鑰充當附加組密鑰的角色,以分擔組密鑰中的熱點負擔。存儲桶關鍵字使工作具有可伸縮性,以解決不同聚合中的數據偏斜/熱點。

拆分非重復聚合后,上述查詢將自動重寫為以下查詢:

SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

下圖顯示了拆分的非重復聚合如何提高性能(假設顏色代表天,字母代表user_id)。

 

 

 

注意:上面是最簡單的示例,可以從此優化中受益。除此之外,Flink 支持分裂更復雜的聚集查詢,例如,一個以上的具有不同的不同密鑰(例如不同的集合COUNT(DISTINCT a), SUM(DISTINCT b)),與其他非重復的聚合工作(例如SUMMAXMINCOUNT)。

注意:但是,當前,拆分優化不支持包含用戶定義的AggregateFunction的聚合。

以下示例顯示如何啟用拆分非重復聚合優化。

// instantiate table environment
val tEnv: TableEnvironment = ...

tEnv.getConfig         // access high-level configuration
  .getConfiguration    // set low-level key-value options
  .setString("table.optimizer.distinct-agg.split.enabled", "true")  // enable distinct agg split

在不同的聚合上使用FILTER修飾符 

在某些情況下,用戶可能需要從不同維度計算UV(唯一訪客)的數量,例如Android的UV,iPhone的UV,Web的UV和總UV。許多用戶將選擇CASE WHEN支持此功能,例如: 

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

但是,在這種情況下,建議使用 FILTER 語法而不是CASE WHEN。因為FILTER它更符合SQL標准,並且將獲得更多的性能改進。 FILTER是用於聚合函數的修飾符,用於限制聚合中使用的值。將上面的示例替換為FILTER修飾符,如下所示: 

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

Flink SQL優化器可以識別同一唯一鍵上的不同過濾器參數。例如,在上面的示例中,所有三個COUNT DISTINCT都在user_id列上。然后Flink可以只使用一個共享狀態實例,而不是三個狀態實例,以減少狀態訪問和狀態大小。在某些工作負載中,這可以顯着提高性能。 

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM