簡介: 唯品會 Flink 的容器化實踐應用,Flink SQL 平台化建設,以及在實時數倉和實驗平台上的應用案例。
轉自dbaplus社群公眾號
作者:王康,唯品會數據平台高級開發工程師
自 2017 年起,為保障內部業務在平時和大促期間的平穩運行,唯品會就開始基於 Kubernetes 深入打造高性能、穩定、可靠、易用的實時計算平台,現在的平台支持 Flink、Spark、Storm 等主流框架。
本文將分為五個方面,分享唯品會 Flink 的容器化實踐應用以及產品化經驗:
- 發展概覽
- Flink 容器化實踐
- Flink SQL 平台化建設
- 應用案例
- 未來規划
一、發展概覽
1、集群規模
在集群規模方面,我們有 2000+ 的物理機,主要部署 Kubernetes 異地雙活的集群,利用 Kubernetes 的 namespaces,labels 和 taints 等實現業務隔離以及初步的計算負載隔離。
Flink 任務數、Flink SQL 任務數、Storm 任務數、Spark 任務數,這些線上實時應用加起來有 1000 多個。目前我們主要支持 Flink SQL 這一塊,因為 SQL 化是一個趨勢,所以我們要支持 SQL 任務的上線平台。
2、平台架構
我們從下往上進行解析實時計算平台的整體架構:
- 資源調度層(最底層)
實際上是用 deployment 的模式運行 Kubernetes 上,平台雖然支持 yarn 調度,但是 yarn 調度與批任務共享資源,所以主流任務還是運行在 Kubernetes 上的。並且,yarn 調度這一層主要是離線部署的一套 yarn 集群。在 2017 年的時候,我們自研了 Flink on Kubernetes 的一套方案,因為底層調度分了兩層,所以在大促資源緊張的時候,實時跟離線就可以做一個資源的借調。
- 存儲層
主要用來支持公司內部基於 Kafka 的實時數據 vms,基於 binlog 的 vdp 數據和原生 Kafka 作為消息總線,狀態存儲在 HDFS 上,數據主要存入 Redis、MySQL、HBase、Kudu、HDFS、ClickHouse 等。
- 計算引擎層
主要是 Flink、Storm、Spark,目前主推的是 Flink,每個框架會都會支持幾個版本的鏡像以滿足不同的業務需求。
- 實時平台層
主要提供作業配置、調度、版本管理、容器監控、job 監控、告警、日志等功能,提供多租戶的資源管理(quota,label 管理)以及 Kafka 監控。資源配置也分為大促日和平常日,大促的資源和平常的資源是不一樣的,資源的權限管控也是不一樣的。在 Flink 1.11 版本之前,平台自建元數據管理系統為 Flink SQL 管理 schema;從 1.11 版本開始,則是通過 Hive metastore 與公司元數據管理系統融合。
- 應用層
主要是支持實時大屏、推薦、實驗平台、實時監控和實時數據清洗的一些場景。
二、Flink容器化實踐
1、容器化方案
上面是實時平台 Flink 容器化的架構圖。Flink 容器化其實是基於 Standalone 模式部署的。
我們的部署模式共有 Client、Job Manager、Task Manager 三個角色,每一個角色都會有一個 Deployment 來控制。
用戶通過平台上傳任務 jar 包、配置等,存儲於 HDFS 上。同時由平台維護的配置、依賴等也存儲在 HDFS 上,當 pod 啟動時,就會進行拉取等初始化操作。
Client 中主進程是一個由 go 開發的 agent,當 Client 啟動時,會首先檢查集群狀態,當集群准備好后,從 HDFS 上拉取 jar 包,再向這個集群提交任務。Client 的主要任務是做容錯,它主要功能還有監控任務狀態,做 savepoint 等操作。
通過部署在每台物理機上的 smart-agent 采集容器的指標寫入 m3,以及通過 Flink 暴漏的接口將 metrics 寫入 prometheus,結合 grafana 展示。同樣通過部署在每台物理機上的 vfilebeat 采集掛載出來的相關日志寫入 es,在 dragonfly 可以實現日志檢索。
1)Flink 平台化
在實踐過程中,一定要結合具體場景和易用性,再去考慮做平台化工作。
2)Flink 穩定性
在我們應用部署以及運行過程中,異常是不可避免的,這時候平台就需要做一些保證任務在出現異常狀況后,依舊保持穩定性的一些策略。
- pod 的健康和可用:
由 livenessProbe 和 readinessProbe 檢測,同時指定 pod 的重啟策略,Kubernetes 本身可以做一個 pod 的拉起。
-
Flink 任務產生異常時:
- Flink 有自已本身的一套 restart 策略和 failover 機制,這是它的第一層保障。
- 在 Client 中會定時監控 Flink 狀態,同時將最新的 checkpoint 地址更新到自己的緩存中,並匯報到平台,然后固化到 MySQL 中。當 Flink 無法再重啟時,由 Client 重新從最新的成功 checkpoint 提交任務。這是它的第二層保障。
這一層將 checkpoint 固化到 MySQL 中后,就不再使用 Flink HA 機制了,少了 zk 的組件依賴。
- 當前兩層無法重啟時或集群出現異常時,由平台自動從固化到 MySQL 中的最新 checkpoint 重新拉起一個集群,提交任務,這是它的第三層保障。
-
機房容災:
- 用戶的 jar 包,checkpoint 都做了異地雙 HDFS 存儲。
- 異地雙機房雙集群。
2、Kafka 監控方案
Kafka 監控是任務監控里非常重要的一個環節,整體的流程如下:
平台提供監控 Kafka 堆積,用戶在界面上,可以配置自己的 Kafka 監控,告知在怎樣的集群,以及用戶消費 message 等配置信息。可以從 MySQL 中將用戶 Kafka 監控配置提取后,再通過 jmx 監控 Kafka,這樣的信息采集之后,寫入下游 Kafka,再通過另一個 Flink 任務實時監控告警,同時將這些數據同步寫入 ck 里面,從而反饋給我們的用戶(這里也可以不用 ck,用 Prometheus 去做監控也是可以的,但 ck 會更加適合),最后再用 Grafana 組件去展示給用戶。
三、Flink SQL 平台化建設
有了前面 Flink 的容器化方案之后,就要開始 Flink SQL 平台化建設了。大家都知道,這樣流式的 api 開發起來,還是有一定的成本的。 Flink 肯定是比 Storm 快的,也相對比較穩定、容易一些,但是對於一些用戶,特別是 Java 開發的一些同學來說,做這個是有一定門檻的。
Kubernetes 的 Flink 容器化實現以后,方便了 Flink api 應用的發布,但是對於 Flink SQL 的任務仍然不夠便利。於是平台提供了更加方便的在線編輯發布、SQL 管理等一棧式開發平台。
1、 Flink SQL 方案
平台的 Flink SQL 方案如上圖所示,任務發布系統與元數據管理系統是完全解耦的。
1)Flink SQL 任務發布平台化
在實踐過程中,需要考慮易用性,做平台化工作,主操作界面如下圖所示:
- Flink SQL 的版本管理、語法校驗、拓撲圖管理等;
- UDF 通用和任務級別的管理,支持用戶自定義 udf;
- 提供參數化的配置界面,方便用戶上線任務。
下圖是一個用戶界面配置的例子:
下圖是一個集群配置的范例:
2)元數據管理
平台在 1.11 之前通過構建自己的元數據管理系統 UDM,MySQL 存儲 Kafka,Redis 等 schema,通過自定義 catalog 打通 Flink 與 UDM,從而實現元數據管理。
在 1.11 之后,Flink 集成 Hive 逐漸完善,平台重構了 Flink SQL 框架,並通過部署一個 SQL-gateway service 服務,中間調用自己維護的 SQL-Client jar 包,從而與離線元數據打通,實現了實時離線元數據的統一,為之后的流批一體打好了基礎。
在元數據管理系統創建的 Flink 表操作界面如下圖所示:創建 Flink 表的元數據,持久化到 Hive 里,Flink SQL 啟動時從 Hive 里讀取對應表的 table schema 信息。
2、Flink SQL 相關實踐
平台對於官方原生支持或者不支持的 connector 進行整合和開發,鏡像和 connector,format 等相關依賴進行解耦,可以快捷的進行更新與迭代。
1)Flink SQL 相關實踐
Flink SQL 主要分為以下三層:
-
connector 層
- 支持 VDP connector 讀取 source 數據源;
- 支持 Redis string、hash 等數據類型的 sink & 維表關聯;
- 支持 kudu connector & catalog & 維表關聯;
- 支持 protobuf format 解析實時清洗數據;
- 支持 vms connector 讀取 source 數據源;
- 支持 ClickHouse connector sink 分布式表 & 本地表高 TPS 寫入;
- Hive connector 支持數坊 Watermark Commit Policy 分區提交策略 & array、decimal 等復雜數據類型。
-
runtime 層
- 主要支持拓撲圖執行計划修改;
- 維表關聯 keyBy 優化 cache 提升查詢性能;
- 維表關聯延遲 join。
-
平台層
- Hive UDF;
- 支持 json HLL 相關處理函數;
- 支持 Flink 運行相關參數設置如 minibatch、聚合優化參數;
- Flink 升級 hadoop3。
2)拓撲圖執行計划修改
針對現階段 SQL 生成的 stream graph 並行度無法修改等問題,平台提供可修改的拓撲預覽修改相關參數。平台會將解析后的 FlinkSQL 的 excution plan json 提供給用戶,利用 uid 保證算子的唯一性,修改每個算子的並行度,chain 策略等,也為用戶解決反壓問題提供方法。例如針對 ClickHouse sink 小並發大批次的場景,我們支持修改 ClickHouse sink 並行度,source 並行度 = 72,sink 並行度 = 24,提高 ClickHouse sink tps。
3)維表關聯 keyBy 優化 cache
針對維表關聯的情況,為了降低 IO 請求次數,降低維表數據庫讀壓力,從而降低延遲,提高吞吐,有以下三種措施:
下面是維表關聯 KeyBy 優化 cache 的圖:
在優化之前的時候,維表關聯 LookupJoin 算子和正常算子 chain 在一起,優化之間維表關聯 Lookup Join 算子和正常算子不 chain 在一起,將join key 作為 hash 策略的 key。
采用這種方式優化后,例如原來的 3000W 數據量維表,10 個 TM 節點,每個節點都要緩存 3000W 的數據,總共需要緩存 3 億的量。而經過 keyBy 優化之后,每個 TM 節點只需要緩存 3000W/10 = 300W 的數據量,總共緩存的數據量只有 3000W,這非常大程度減少了緩存數據量。
4)維表關聯延遲 join
維表關聯中,有很多業務場景,在維表數據新增數據之前,主流數據已經發生 join 操作,會出現關聯不上的情況。因此,為了保證數據的正確,將關聯不上的數據進行緩存,進行延遲 join。
最簡單的做法是,在維表關聯的 function 里設置重試次數和重試間隔,這個方法會增大整個流的延遲,但主流 qps 不高的情況下,可以解決問題。
增加延遲 join 的算子,當 join 維表未關聯時,先緩存起來,根據設置重試次數和重試間隔從而進行延遲的 join。
四、應用案例
1、實時數倉
1)實時數據入倉
實時數倉主要分為三個過程:
- 流量數據一級 Kafka 進行實時數據清洗后,可以寫到二級清洗 Kafka,主要是 protobuf 格式,再通過 Flink SQL 寫入 Hive 5min 表,以便做后續的准實時 ETL,加速 ods 層數據源的准備時間。
- MySQL 業務庫的數據,通過 VDP 解析形成 binlog cdc 消息流,再通過 Flink SQL 寫入 Hive 5min 表,同時會提交到自定義分區,再把分區狀態匯報到服務接口,最后再做一個離線的調度。
- 業務系統通過 VMS API 產生業務 Kafka 消息流,通過 Flink SQL 解析之后寫入 Hive 5min 表。可以支持 string、json、csv 等消息格式。
使用 Flink SQL 做流式數據入倉是非常方便的,而且 1.12 版本已經支持了小文件的自動合並,解決了大數據層一個非常普遍的痛點。
我們自定義分區提交策略,當前分區 ready 時候會調一下實時平台的分區提交 api,在離線調度定時調度通過這個 api 檢查分區是否 ready。
采用 Flink SQL 統一入倉方案以后,我們可獲得以下成果:
- 首先我們不僅解決了以往 Flume 方案不穩定的問題,用戶也可以實現自助入倉,大大降低入倉任務的維護成本,穩定性也可以得到保障。
- 其次我們還提升了離線數倉的時效性,從小時級降低至 5min 粒度入倉,時效性可以增強。
2)實時指標計算
- 實時應用消費清洗后 Kafka,通過 Redis 維表、api 等方式關聯,再通過 Flink window 增量計算 UV,持久化寫到 HBase 里。
- 實時應用消費 VDP 消息流之后,通過 Redis 維表、api 等方式關聯,再通過 Flink SQL 計算出銷售額等相關指標,增量 upsert 到 kudu 里,方便根據 range 分區批量查詢,最終通過數據服務對實時大屏提供最終服務。
以往指標計算通常采用 Storm 方式,這個方式需要通過 api 定制化開發,采用這樣 Flink 方案以后,我們可以獲得了以下成果:
- 將計算邏輯切到 Flink SQL 上,降低計算任務口徑變化快,解決修改上線周期慢等問題;
- 切換至 Flink SQL 可以做到快速修改,並且實現快速上線,降低了維護的成本。
3)實時離線一體化ETL數據集成
具體的流程如下圖所示:
Flink SQL 在最近的版本中持續強化了維表 join 的能力,不僅可以實時關聯數據庫中的維表數據,還能關聯 Hive 和 Kafka 中的維表數據,能靈活滿足不同工作負載和時效性的需求。
基於 Flink 強大的流式 ETL 的能力,我們可以統一在實時層做數據接入和數據轉換,然后將明細層的數據回流到離線數倉中。
我們通過將 presto 內部使用的 HyperLogLog(后面簡稱 HLL)實現引入到 Spark UDAF 函數里,打通 HLL 對象在 Spark SQL 與 presto 引擎之間的互通。如 Spark SQL 通過 prepare 函數生成的 HLL 對象,不僅可以在 Spark SQL 里 merge 查詢而且可以在 presto 里進行 merge 查詢。
具體流程如下:
UV 近似計算示例:
2、實驗平台(Flink 實時數據入 OLAP)
唯品會實驗平台是通過配置多維度分析和下鑽分析,提供海量數據的 A/B-test 實驗效果分析的一體化平台。一個實驗是由一股流量(比如用戶請求)和在這股流量上進行的相對對比實驗的修改組成。實驗平台對於海量數據查詢有着低延遲、低響應、超大規模數據(百億級)的需求。
整體數據架構如下:
- 離線數據是通過 waterdrop 導入到 ClickHouse 里面去;
- 實時數據通過 Flink SQL 將 Kafka 里的數據清洗解析展開等操作之后,通過 Redis 維表關聯商品屬性,通過分布式表寫入到 ClickHouse,然后通過數據服務 adhoc 查詢,通過數據服務提供對外的接口。
業務數據流如下:
我們的實驗平台有一個很重要的 ES 場景,我們上線一個應用場景后,如果我想看效果如何,包括上線產生的曝光、點擊、加購、收藏是怎樣的。我們需要把每一個數據的明細,比如說分流的一些數據,根據場景分區,寫到 ck 里面去。
我們通過 Flink SQL Redis connector,支持 Redis 的 sink 、source 維表關聯等操作,可以很方便地讀寫 Redis,實現維表關聯,維表關聯內可配置 cache ,極大提高應用的 TPS。通過 Flink SQL 實現實時數據流的 pipeline,最終將大寬表 sink 到 CK 里,並按照某個字段粒度做 murmurHash3_64 存儲,保證相同用戶的數據都存在同一 shard 節點組內,從而使得 ck 大表之間的 join 變成 local 本地表之間的 join,減少數據 shuffle 操作,提升 join 查詢效率。
五、未來規划
1、提高Flink SQL易用性
Flink SQL 對於 Hive 用戶來說,使用起來還是有一點不一樣的地方。不管是 Hive,還是 Spark SQL,都是批量處理的一個場景。
所以當前我們的 Flink SQL 調試起來仍有很多不方便的地方,對於做離線 Hive 的用戶來說還有一定的使用門檻,例如手動配置 Kafka 監控、任務的壓測調優。所以如何能讓用戶的使用門檻降至最低,讓用戶只需要懂 SQL 或者懂業務,把 Flink SQL 里面的概念對用戶屏蔽掉,簡化用戶的使用流程,是一個比較大的挑戰。
將來我們考慮做一些智能監控,告訴用戶當前任務存在的問題,不需要用戶去學習太多的東西,盡可能自動化並給用戶一些優化建議。
2、數據湖CDC分析方案落地
一方面,我們做數據湖主要是為了解決我們 binlog 實時更新的場景,目前我們的 VDP binlog 消息流,通過 Flink SQL 寫入到 Hive ods 層,以加速 ods 層數據源的准備時間,但是會產生大量重復消息去重合並。我們會考慮 Flink + 數據湖的 cdc 入倉方案來做增量入倉。
另一方面我們希望通過數據湖,來替代我們 Kudu,我們這邊一部分重要的業務在用 Kudu。雖然 Kudu 沒有大量的使用,但鑒於 Kudu 的運維比一般的數據庫運維復雜得多、比較小眾,並且像訂單打寬之后的 Kafka 消息流、以及聚合結果都需要非常強的實時 upsert 能力,所以我們就開始調研 CDC+數據湖這種解決方案,用這種方案的增量 upsert 能力來替換 kudu 增量 upsert 場景。
Q&A
Q1:vdp connector 是 MySQL binlog 讀取嗎?和 canal是一種工具嗎?
A1 :vdp 是公司 binlog 同步的一個組件,將 binlog 解析之后發送到 Kafka。是基於 canal 二次開發的。我們定義了一個 cdc format 可以對接公司的 vdp Kafka 數據源,與 Canal CDC format 有點類似。目前沒有開源,使我們公司用的 binlog 的一個同步方案。
Q2 : uv 數據輸出到 HBase,銷售數據輸出到 kudu,輸出到了不同的數據源,主要是因為什么采取的這種策略?
A2 :kudu 的應用場景沒有 HBase 這么廣泛。uv 實時寫入的 TPS 比較高,HBase 比較適合單條查詢的場景,寫入 HBase 高吞吐 + 低延遲,小范圍查詢延遲低;kudu 的話具備一些 OLAP 的特性,可以存訂單類明細,列存加速,結合 Spark、presto 等做 OLAP 分析。
Q3 : 請問一下,你們怎么解決的 ClickHouse 的數據更新問題?比如數據指標更新。
A3 : ck 的更新是異步 merge,只能在同一 shard 同一節點同一分區內異步 merge,是弱一致性。對於指標更新場景不太建議使用 ck。如果在 ck 里有更新強需求的場景,可以嘗試 AggregatingMergeTree 解決方案,用 insert 替換 update,做字段級的 merge。
Q4:binlog 寫入怎么保證數據的去重和一致性?
A4 : binlog 目前還沒有寫入 ck 的場景,這個方案看起來不太成熟。不建議這么做,可以用采用 CDC + 數據湖的解決方案。
Q5 : 如果 ck 各個節點寫入不均衡,怎么去監控,怎么解決?怎么樣看數據傾斜呢?
A5 :可以通過 ck 的 system.parts 本地表監控每台機器每個表每個分區的寫入數據量以及 size,來查看數據分區,從而定位到某個表某台機器某個分區。
Q6 : 你們在實時平台是如何做任務監控或者健康檢查的?又是如何在出錯后自動恢復的?現在用的是 yarn-application 模式嗎?存在一個 yarn application 對應多個 Flink job 的情況嗎?
A6 : 對於 Flink 1.12+ 版本,支持了 PrometheusReporter 方式暴露一些 Flink metrics 指標,比如算子的 watermark、checkpoint 相關的指標如 size、耗時、失敗次數等關鍵指標,然后采集、存儲起來做任務監控告警。
Flink 原生的 restart 策略和 failover 機制,作為第一層的保證。
在 Client 中會定時監控 Flink 狀態,同時將最新的 checkpoint 地址更新到自己的緩存中,並匯報到平台,固化到 MySQL 中。當 Flink 無法再重啟時,由 Client 重新從最新的成功 checkpoint 提交任務。作為第二層保證。這一層將 checkpoint 固化到 MySQL 中后,就不再使用 Flink HA 機制了,少了 zk 的組件依賴。
當前兩層無法重啟時或集群出現異常時,由平台自動從固化到 MySQL 中的最新 chekcpoint 重新拉起一個集群,提交任務,作為第三層保證。
我們支持 yarn-per-job 模式,主要基於 Flink on Kubernetes 模式部署 standalone 集群。
Q7 : 目前你們大數據平台上所有的組件都是容器化的還是混合的?
A7 :目前我們實時這一塊的組件 Flink、Spark 、Storm、Presto 等計算框架實現了容器化,詳情可看上文 1.2 平台架構。
Q8 :kudu 不是在 Kubernetes 上跑的吧?
A8 :kudu 不是在 Kubernetes 上運行,這個目前還沒有特別成熟的方案。並且 kudu 是基於 cloudera manager 運維的,沒有上 Kubernetes 的必要。
Q9 : Flink 實時數倉維度表存到 ck 中,再去查詢 ck,這樣的方案可以嗎?
A9:這是可以的,是可以值得嘗試的。事實表與維度表數據都可以存,可以按照某個字段做哈希(比如 user_id),從而實現 local join 的效果。
原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。