四、Flink數據傾斜問題


一、數據傾斜

1、什么是數據傾斜?

由於數據分布不均勻,造成數據大量的集中到一點,造成數據熱點。

數據傾斜原理

目前我們所知道的大數據處理框架,比如 Flink、Spark、Hadoop 等之所以能處理高達千億的數據,是因為這些框架都利用了分布式計算的思想,集群中多個計算節點並行,使得數據處理能力能得到線性擴展。

在實際生產中 Flink 都是以集群的形式在運行,在運行的過程中包含了兩類進程。其中 TaskManager 實際負責執行計算的 Worker,在其上執行 Flink Job 的一組 Task,Task 則是我們執行具體代碼邏輯的容器。理論上只要我們的任務 Task 足夠多就可以對足夠大的數據量進行處理。

但是實際上大數據量經常出現,一個 Flink 作業包含 200 個 Task 節點,其中有 199 個節點可以在很短的時間內完成計算。但是有一個節點執行時間遠超其他結果,並且隨着數據量的持續增加,導致該計算節點掛掉,從而整個任務失敗重啟。我們可以在 Flink 的管理界面中看到任務的某一個 Task 數據量遠超其他節點。

2、Hadoop框架的特性

  • 不怕數據大,怕數據傾斜
  • jobs數比較多的作業運行效率相對比較低,如子查詢比較多。
  • sum,count,max,min等聚集函數,不會有數據傾斜問題

3、容易數據傾斜情況

  • group by
  • count(distinct ),在數據量大的情況下,容易數據傾斜,因為count(distinct)是按group by 字段分組,按distinct字段排序。
  • 小表關聯超大表

數據傾斜的時候進行負載均衡

Hive.groupby.skewindata=true

當選項設定為 true,生成的查詢計划會有兩個MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。

二、優化常用的手段

Flink 任務出現數據傾斜的直觀表現是任務節點頻繁出現反壓,但是增加並行度后並不能解決問題;部分節點出現 OOM 異常,是因為大量的數據集中在某個節點上,導致該節點內存被爆,任務失敗重啟。

產生數據傾斜的原因主要有 2 個方面:

  • 業務上有嚴重的數據熱點,比如滴滴打車的訂單數據中北京、上海等幾個城市的訂單量遠遠超過其他地區;
  • 技術上大量使用了 KeyBy、GroupBy 等操作,錯誤的使用了分組 Key,人為產生數據熱點。

因此解決問題的思路也很清晰:

  1. 業務上要盡量避免熱點 key 的設計,例如我們可以把北京、上海等熱點城市分成不同的區域,並進行單獨處理;
  2. 技術上出現熱點時,要調整方案打散原來的 key,避免直接聚合;此外 Flink 還提供了大量的功能可以避免數據傾斜。

解決數據傾斜問題

  • 減少job數(合並MapReduce,用Multi-group by)
  • 設置合理的mapreduce的task數,能有效提升性能。
  • 數據量較大的情況下,慎用count(distinct)。
  • 對小文件進行合並,針對文件數據源。

三、優化案例

  • join原則

將條目少的表/子查詢放在 Join的左邊。 原因是在 Join 操作的 Reduce 階段,位於 Join左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生內存溢出的幾率。

當一個小表關聯一個超大表時,容易發生數據傾斜,可以用MapJoin把小表全部加載到內存在map端進行join,避免reducer處理。

         如:SELECT /*+ MAPJOIN(user)*/  l.session_id, u.username from user u join page_views lon (u. id=l.user_id) ;

  • 笛卡爾積

         當Hive設定為嚴格模式(hive.mapred.mode=strict)時,不允許在HQL語句中出現笛卡爾積。

         當無法躲避笛卡爾積時,采用MapJoin,會在Map端完成Join操作,將Join操作的一個或多個表完全讀入內存。

         MapJoin的用法是在查詢/子查詢的SELECT關鍵字后面添加/*+MAPJOIN(tablelist) */提示優化器轉化為MapJoin 。

        其中tablelist可以是一個表,或以逗號連接的表的列表。tablelist中的表將會讀入內存,應該將小表寫在這里

  • 控制Map數

  同時可執行的map數是有限的。

    通常情況下,作業會通過input的目錄產生一個或者多個map任務

    主要的決定因素有: input的文件總個數,input的文件大小。

舉例

   a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(block為128M,6個128m的塊和1個12m的塊),從而產生7個map數

   b) 假設input目錄下有3個文件a,b,c,大小分別為10m,20m,130m,那么hadoop會分隔成4個塊(10m,20m,128m,2m),從而產生4個map數

兩種方式控制Map數:即減少map數和增加map數

減少map數可以通過合並小文件來實現,這點是對文件數據源來講。

增加map數的可以通過控制上一個job的reduer數來控制

 

Flink 消費 Kafka 上下游並行度不一致導致的數據傾斜

通常我們在使用 Flink 處理實時業務時,上游一般都是消息系統,Kafka 是使用最廣泛的大數據消息系統。當使用 Flink 消費 Kafka 數據時,也會出現數據傾斜。

需要十分注意的是,我們 Flink 消費 Kafka 的數據時,是推薦上下游並行度保持一致,即 Kafka 的分區數等於 Flink Consumer 的並行度。

但是會有一種情況,為了加快數據的處理速度,來設置 Flink 消費者的並行度大於 Kafka 的分區數。如果你不做任何的設置則會導致部分 Flink Consumer 線程永遠消費不到數據。

這時候你需要設置 Flink 的 Redistributing,也就是數據重分配。

GroupBy + Aggregation 分組聚合熱點問題

業務上通過 GroupBy 進行分組,然后緊跟一個 SUM、COUNT 等聚合操作是非常常見的。我們都知道 GroupBy 函數會根據 Key 進行分組,完全依賴 Key 的設計,如果 Key 出現熱點,那么會導致巨大的 shuffle,相同 key 的數據會被發往同一個處理節點;如果某個 key 的數據量過大則會直接導致該節點成為計算瓶頸,引起反壓。

兩階段聚合解決 KeyBy 熱點

KeyBy 是我們經常使用的分組聚合函數之一。在實際的業務中經常會碰到這樣的場景:雙十一按照下單用戶所在的省聚合求訂單量最高的前 10 個省,或者按照用戶的手機類型聚合求訪問量最高的設備類型等。

其他

Flink 一直在不斷地迭代,不斷出現各種各樣的手段解決我們遇到的數據傾斜問題。例如,MiniBatch 微批處理手段等


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM