技術實踐 | 如何基於 Flink 實現通用的聚合指標計算框架


1 引言

網易雲信作為一個 PaaS 服務,需要對線上業務進行實時監控,實時感知服務的“心跳”、“脈搏”、“血壓”等健康狀況。通過采集服務拿到 SDK、服務器等端的心跳埋點日志,是一個非常龐大且雜亂無序的數據集,而如何才能有效利用這些數據?服務監控平台要做的事情就是對海量數據進行實時分析,聚合出表征服務的“心跳”、“脈搏”、“血壓”的核心指標,並將其直觀的展示給相關同學。這其中核心的能力便是 :實時分析和實時聚合

在之前的《網易雲信服務監控平台實踐》一文中,我們圍繞數據采集、數據處理、監控告警、數據應用 4 個環節,介紹了網易雲信服務監控平台的整體框架。本文是對網易雲信在聚合指標計算邏輯上的進一步詳述。

基於明細數據集進行實時聚合,生產一個聚合指標,業界常用的實現方式是 Spark Streaming、Flink SQL / Stream API。不論是何種方式,我們都需要通過寫代碼來指定數據來源、數據清洗邏輯、聚合維度、聚合窗口大小、聚合算子等。如此繁雜的邏輯和代碼,無論是開發、測試,還是后續任務的維護,都需要投入大量的人力/物力成本。而我們程序員要做的便是化繁為簡、實現大巧不工。

本文將闡述網易雲信是如何基於 Flink 的 Stream API,實現一套通用的聚合指標計算框架。

2 整體架構

整體架構

如上圖所示,是我們基於 Flink 自研的聚合指標完整加工鏈路,其中涉及到的模塊包括:

  • source:定期加載聚合規則,並根據聚合規則按需創建 Kafka 的 Consumer,並持續消費數據。
  • process:包括分組邏輯、窗口邏輯、聚合邏輯、環比計算邏輯等。從圖中可以看到,我們在聚合階段分成了兩個,這樣做的目的是什么?其中的好處是什么呢?做過分布式和並發計算的,都會遇到一個共同的敵人:數據傾斜。在我們 PaaS 服務中頭部客戶會更加明顯,所以傾斜非常嚴重,分成兩個階段進行聚合的奧妙下文中會詳細說明。
  • sink:是數據輸出層,目前默認輸出到 Kafka 和 InfluxDB,前者用於驅動后續計算(如告警通知等),后者用於數據展示以及查詢服務等。
  • reporter:全鏈路統計各個環節的運行狀況,如輸入/輸出 QPS、計算耗時、消費堆積、遲到數據量等。

下文將詳細介紹這幾個模塊的設計和實現思路。

3 source

規則配置

為了便於聚合指標的生產和維護,我們將指標計算過程中涉及到的關鍵參數進行了抽象提煉,提供了可視化配置頁面,如下圖所示。下文會結合具體場景介紹各個參數的用途。

可視化配置頁面

規則加載

在聚合任務運行過程中,我們會定期加載配置。如果檢測到有新增的 Topic,我們會創建 kafka-consumer 線程,接收上游實時數據流。同理,對於已經失效的配置,我們會關閉消費線程,並清理相關的 reporter。

數據消費

對於數據源相同的聚合指標,我們共用一個 kafka-consumer,拉取到記錄並解析后,對每個聚合指標分別調用 collect() 進行數據分發。如果指標的數據篩選規則(配置項)非空,在數據分發前需要進行數據過濾,不滿足條件的數據直接丟棄。

4 process

整體計算流程

基於 Flink 的 Stream API 實現聚合計算的核心代碼如下所示:

SingleOutputStreamOperator<MetricContext> aggResult = src
        .assignTimestampsAndWatermarks(new MetricWatermark())
        .keyBy(new MetricKeyBy())
        .window(new MetricTimeWindow())
        .aggregate(new MetricAggFuction());
  • MetricWatermark():根據指定的時間字段(配置項⑧)獲取輸入數據的 timestamp,並驅動計算流的 watermark 往前推進。
  • MetricKeyBy():指定聚合維度,類似於 MySQL 中 groupby,根據分組字段(配置項⑥),從數據中獲取聚合維度的取值,拼接成分組 key。
  • MetricTimeWindow():配置項⑧中指定了聚合計算的窗口大小。如果配置了定時輸出,我們就創建滑動窗口,否則就創建滾動窗口。
  • MetricAggFuction():實現配置項②指定的各種算子的計算,下文將詳細介紹各個算子的實現原理。

二次聚合

對於大數據量的聚合計算,數據傾斜是不得不考慮的問題,數據傾斜意味着規則中配置的分組字段(配置項⑥)指定的聚合 key 存在熱點。我們的計算框架在設計之初就考慮了如何解決數據傾斜問題,就是將聚合過程拆分成2階段:

  • 第1階段:將數據隨機打散,進行預聚合。
  • 第2階段:將第1階段的預聚合結果作為輸入,進行最終的聚合。

具體實現:判斷並發度參數 parallelism(配置項⑦) 是否大於1,如果 parallelism 大於1,生成一個 [0, parallelism) 之間的隨機數作為 randomKey,在第1階段聚合 keyBy() 中,將依據分組字段(配置項⑥)獲取的 key 與 randomKey 拼接,生成最終的聚合 key,從而實現了數據隨機打散。

聚合算子

作為一個平台型的產品,我們提供了如下常見的聚合算子。由於采用了二次聚合邏輯,各個算子在第1階段和第2階段采用了相應的計算策略。

算子 第1階段聚合 第2階段聚合
min/max/sum/count 直接對輸入數據進行預聚合計算,輸出預聚合結果 對第1階段預聚合結果進行二次聚合計算,輸出最終結果
first/last 對輸入數據的 timestamp 進行比較,記錄最小/最大的 timestamp 以及對應的 value 值,輸出 <timestamp,value> 數據對 對 <timestamp,value> 數據對進行二次計算,輸出最終的 first/last
avg 計算該分組的和值和記錄數,輸出 <sum,cnt> 數據對 對 <sum,cnt> 數據對分別求和,然后輸出:總 sum / 總 cntcount
median/tp90/tp95 統計輸入數據的分布,輸出 NumericHistogram 對輸入的 NumericHistogram 做 merge 操作,最終輸出中位數/tp90/tp95
count-distinct 輸出記錄桶信息和位圖的 RoaringArray 對 RoaringArray 進行 merge 操作,最終輸出精確的去重計數結果
count-distinct(近似) 輸出基數計數對象 HyperLoglog 對 HyperLoglog 進行 merge 操作,最終輸出近似的去重計數結果

對於計算結果受全部數據影響的算子,如 count-distinct(去重計數),常規思路是利用 set 的去重特性,將所有統計數據放在一個 Set 中,最終在聚合函數的 getResult 中輸出 Set 的 size。如果統計數據量非常大,這個 Set 對象就會非常大,對這個 Set 的 I/O 操作所消耗的時間將不能接受。

對於類 MapReduce 的大數據計算框架,性能的瓶頸往往出現在 shuffle 階段大對象的 I/O 上,因為數據需要序列化 / 傳輸 / 反序列化,Flink 也不例外。類似的算子還有 median 和 tp95。

為此,需要對這些算子做專門的優化,優化的思路就是盡量減少計算過程中使用的數據對象的大小,其中:

  • median/tp90/tp95:參考了 hive percentile_approx 的近似算法,該算法通過 NumericHistogram(一種非等距直方圖)記錄數據分布,然后通過插值的方式得到相應的 tp 值(median 是 tp50)。
  • count-distinct:采用 RoaringBitmap 算法,通過壓縮位圖的方式標記輸入樣本,最終得到精確的去重計數結果。
  • count-distinct(近似) :采用 HyperLoglog 算法,通過基數計數的方式,得到近似的去重計數結果。該算法適用於大數據集的去重計數。

后處理

后處理模塊,是對第2階段聚合計算輸出數據進行再加工,主要有2個功能:

  • 復合指標計算:對原始統計指標進行組合計算,得到新的組合指標。例如,要統計登錄成功率,我們可以先分別統計出分母(登錄次數)和分子(登錄成功的次數),然后將分子除以分母,從而得到一個新的組合指標。配置項③就是用來配置組合指標的計算規則。
  • 相對指標計算:告警規則中經常要判斷某個指標的相對變化情況(同比/環比)。我們利用 Flink 的state,能夠方便的計算出同比/環比指標,配置項④就是用來配置相對指標規則。

異常數據的處理

這里所說的異常數據,分為兩類:遲到的數據和提前到的數據。

  • 遲到數據
    • 對於嚴重遲到的數據(大於聚合窗口的 allowedLateness),通過 sideOutputLateData 進行收集,並通過 reporter 統計上報,從而能夠在監控頁面進行可視化監控。
    • 對於輕微遲到的數據(小於聚合窗口的 allowedLateness),會觸發窗口的重計算。如果每來一條遲到數據就觸發一次第 1 階段窗口的重計算,重計算結果傳導到第 2 階段聚合計算,就會導致部分數據的重復統計。為了解決重復統計的問題,我們在第 1 階段聚合 Trigger 中進行了特殊處理:窗口觸發采用 FIRE_AND_PURGE(計算並清理),及時清理已經參與過計算的數據。
  • 提前到的數據:這部分數據往往是數據上報端的時鍾不准導致。在計算這些數據的 timestamp 時要人為干預,避免影響整個計算流的 watermark。

5 sink

聚合計算得到的指標,默認輸出到 Kafka 和時序數據庫 InfluxDB。

  • kafka-sink:將指標標識(配置項①)作為 Kafka 的topic,將聚合結果發送出去,下游接收到該數據流后可以進一步處理加工,如告警事件的生產等。
  • InfluxDB-sink:將指標標識(配置項①)作為時序數據庫的表名,將聚合結果持久化下來,用於 API 的數據查詢、以及可視化報表展示等。

6 reporter

為了實時監控各個數據源和聚合指標的運行情況,我們通過 InfluxDB+Grafana 組合,實現了聚合計算全鏈路監控:如各環節的輸入/輸出 QPS、計算耗時、消費堆積、遲到數據量等。

reporter

7 結語

目前,通過該通用聚合框架,承載了網易雲信 100+ 個不同維度的指標計算,帶來的收益也是比較可觀的:

  • 提效:采用了頁面配置化方式實現聚合指標的生產,開發周期從天級縮短到分鍾級。沒有數據開發經驗的同學也能夠自己動手完成指標的配置。
  • 維護簡單,資源利用率高:100+ 個指標只需維護 1 個 flink-job,資源消耗也從 300+ 個 CU 減少到 40CU。
  • 運行過程透明:借助於全鏈路監控,哪個計算環節有瓶頸,哪個數據源有問題,一目了然。

作者介紹

聖少友,網易雲信數據平台資深開發工程師,從事數據平台相關工作,負責服務監控平台、數據應用平台、質量服務平台的設計開發工作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM