通過Flink實現個推海量消息數據的實時統計


背景

消息報表主要用於統計消息任務的下發情況。比如,單條推送消息下發APP用戶總量有多少,成功推送到手機的數量有多少,又有多少APP用戶點擊了彈窗通知並打開APP等。通過消息報表,我們可以很直觀地看到消息推送的流轉情況、消息下發到達成功率、用戶對消息的點擊情況等。

個推在提供消息推送服務時,為了更好地了解每天的推送情況,會從不同的維度進行數據統計,生成消息報表。個推每天下發的消息推送數巨大,可以達到數百億級別,原本我們采用的離線統計系統已不能滿足業務需求。隨着業務能力的不斷提升,我們選擇了Flink作為數據處理引擎,以滿足對海量消息推送數據的實時統計。

本文將主要闡述選擇Flink的原因、Flink的重要特性以及優化后的實時計算方法。

離線計算平台架構

在消息報表系統的初期,我們采用的是離線計算的方式,主要采用spark作為計算引擎,原始數據存放在HDFS中,聚合數據存放在Solr、Hbase和Mysql中:

查詢的時候,先根據篩選條件,查詢的維度主要有三個:

  1. appId
  2. 下發時間
  3. taskGroupName

根據不同維度可以查詢到taskId的列表,然后根據task查詢hbase獲取相應的結果,獲取下發、展示和點擊相應的指標數據。在我們考慮將其改造為實時統計時,會存在着一系列的難點:

  1. 原始數據體量巨大,每天數據量達到幾百億規模,需要支持高吞吐量;
  2. 需要支持實時的查詢;
  3. 需要對多份數據進行關聯;
  4. 需要保證數據的完整性和數據的准確性。

Why Flink

Flink是什么

Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。

對 Flink 而言,其所要處理的主要場景就是流數據。Flink 的前身是柏林理工大學一個研究性項目, 在 2014 被 Apache 孵化器所接受,然后迅速地成為了 ASF(Apache Software Foundation)的頂級項目之一。

方案對比

為了實現個推消息報表的實時統計,我們之前考慮使用spark streaming作為我們的實時計算引擎,但是我們在考慮了spark streaming、storm和flink的一些差異點后,還是決定使用Flink作為計算引擎:

針對上面的業務痛點,Flink能夠滿足以下需要:

  1. Flink以管道推送數據的方式,可以讓Flink實現高吞吐量。

  2. Flink是真正意義上的流式處理,延時更低,能夠滿足我們消息報表統計的實時性要求。

  3. Flink可以依靠強大的窗口功能,實現數據的增量聚合;同時,可以在窗口內進行數據的join操作。

  4. 我們的消息報表涉及到金額結算,因此對於不允許存在誤差,Flink依賴自身的exact once機制,保證了我們數據不會重復消費和漏消費。

Flink的重要特性

下面我們來具體說說Flink中一些重要的特性,以及實現它的原理:

1)低延時、高吞吐

Flink速度之所以這么快,主要是在於它的流處理模型。

Flink 采用 Dataflow 模型,和 Lambda 模式不同。Dataflow 是純粹的節點組成的一個圖,圖中的節點可以執行批計算,也可以是流計算,也可以是機器學習算法。流數據在節點之間流動,被節點上的處理函數實時 apply 處理,節點之間是用 netty 連接起來,兩個 netty 之間 keepalive,網絡 buffer 是自然反壓的關鍵。

經過邏輯優化和物理優化,Dataflow 的邏輯關系和運行時的物理拓撲相差不大。這是純粹的流式設計,時延和吞吐理論上是最優的。

簡單來說,當一條數據被處理完成后,序列化到緩存中,然后立刻通過網絡傳輸到下一個節點,由下一個節點繼續處理。

2)Checkpoint

Flink是通過分布式快照來實現checkpoint,能夠支持Exactly-Once語義。

分布式快照是基於Chandy和Lamport在1985年設計的一種算法,用於生成分布式系統當前狀態的一致性快照,不會丟失信息且不會記錄重復項。

Flink使用的是Chandy Lamport算法的一個變種,定期生成正在運行的流拓撲的狀態快照,並將這些快照存儲到持久存儲中(例如:存儲到HDFS或內存中文件系統)。檢查點的存儲頻率是可配置的。

3)backpressure

back pressure出現的原因是為了應對短期數據尖峰。

舊版本Spark Streaming的back pressure通過限制最大消費速度實現,對於基於Receiver 形式,我們可以通過配置spark.streaming. receiver.maxRate參數來限制每個 receiver 每秒最大可以接收的記錄的數據。

對於 Direct Approach 的數據接收,我們可以通過配置spark.streaming. kafka.maxRatePerPartition 參數來限制每次作業中每個 Kafka 分區最多讀取的記錄條數。

但這樣是非常不方便的,在實際上線前,還需要對集群進行壓測,來決定參數的大小。

Flink運行時的構造部件是operators以及streams。每一個operator消費一個中間/過渡狀態的流,對它們進行轉換,然后生產一個新的流。

描述這種機制最好的類比是:Flink使用有效的分布式阻塞隊列來作為有界的緩沖區。如同Java里通用的阻塞隊列跟處理線程進行連接一樣,一旦隊列達到容量上限,一個相對較慢的接受者將拖慢發送者。

消息報表的實時計算

優化之后,架構升級成如下:

可以看出,我們做了以下幾點優化:

  1. Flink替換了之前的spark,進行消息報表的實時計算;
  2. ES替換了之前的Solr。

對於Flink進行實時計算,我們的關注點主要有以下4個方面:

  1. ExactlyOnce保證了數據只會被消費一次
  2. 狀態管理的能力
  3. 強大的時間窗口
  4. 流批一體

為了實現我們實時統計報表的需求,主要依靠Flink的增量聚合功能。

首先,我們設置了Event Time作為時間窗口的類型,保證了只會計算當天的數據;同時,我們每隔一分鍾增量統計當日的消息報表,因此分配1分鍾的時間窗口。

然后我們使用.aggregate (AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數據,減少 state 的存儲壓力。之后,我們將增量聚合后的數據寫入到ES和Hbase中。

流程如下所示:

同時,在查詢的時候,我們通過taskID、日期等維度進行查詢,先從ES中獲取taskID的集合,之后通過taskID查詢hbase,得出統計結果。

總結

通過使用Flink,我們實現了對消息推送數據的實時統計,能夠實時查看消息下發、展示、點擊等數據指標,同時,借助FLink強大的狀態管理功能,服務的穩定性也得到了一定的保障。未來,個推也將持續優化消息推送服務,並將Flink引入到其他的業務線中,以滿足一些實時性要求高的業務場景需求。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM