簡介: 本文由知乎技術平台負責人孫曉光分享,主要介紹知乎 Flink 數據集成平台建設實踐。內容如下: 1. 業務場景 ; 2. 歷史設計 ; 3. 全面轉向 Flink 后的設計 ; 4. 未來 Flink 應用場景的規划。
本文由知乎技術平台負責人孫曉光分享,主要介紹知乎 Flink 數據集成平台建設實踐。內容如下:
- 業務場景
- 歷史設計
- 全面轉向 Flink 后的設計
- 未來 Flink 應用場景的規划
一、業務場景
很高興和大家分享近期知乎以 Flink 為基礎,重構上一代數據集成平台過程中的一些收獲。數據集成平台作為連接各種異構數據的紐帶,需要連接多種多樣的存儲系統。而不同的技術棧和不同的業務場景會對數據集成系統提出不同的設計要求。
我們首先來看一下在知乎內部數據集成的業務場景。同許多互聯網公司相似,過去知乎的在線存儲系統主要以 MySQL 和 Redis 為主,同時對於部分數據量級較大的業務也使用了 HBase。近年來隨着技術的演進,我們開始了從 MySQL 向 TiDB 的遷移。與此類似,我們也開始將 HBase 向基於 TiKV 技術棧研發的 Zetta 演進。在離線存儲方面絕大多數的場景則是以 Hive 表來支撐的。
從在線存儲到離線存儲,期間有着非常強的數據同步需求。除此以外也存在着大量的流式數據,比如消息系統中的數據,我們也希望它能夠同各種在線或離線存儲系統打通。過去知乎主要使用 Kafka 支撐流式數據,近期也開始引入 Pulsar。這兩套消息系統同存儲系統之間的數據交換存在着較強的需求。
在知乎的業務場景和當前發展狀態下,數據集成工作在技術和流程管理上都存在着一些挑戰。
- 首先從技術角度看,數據源多樣化會對數據集成系統的連接擴展能力提出較高的要求。而且下一代的存儲系統在給業務帶來更強能力的同時也釋放了業務的壓力,進而促使了數據量的加速膨脹。數據量級上的快速增長對數據集成平台的吞吐和實時性都提出了更高的要求。當然作為數據相關的基礎系統,數據准確性則是最基礎的要求,這塊我們也必須把它做好。
- 另外從流程管理角度看,我們需要理解並整合散落在不同業務團隊的數據,做好管理並確保數據訪問的安全,所以整個數據整合的流程是相對復雜的。雖然平台化能夠將復雜的流程自動化起來,但數據集成工作所固有的高成本並不能完全以平台化的方式消除。因此盡最大的可能提升流程的可復用性和可管理性也是數據集成系統需要持續應對的挑戰。
基於這兩個方向上的挑戰,我們對數據集成平台的設計目標進行了規划。
- 從技術方向看,我們需要支持知乎已經投入使用和將來要推廣使用的多種存儲系統,具備將這些系統中多樣化的數據進行集成的能力。此外我們還需要在滿足高吞吐,低調度時延的前提下保障數據集成的可靠性和准確性。
- 從流程方面看,可以通過整合各種內部存儲系統的元數據以及調度系統,復用現有系統基礎設施的能力,達到簡化數據接入流程,降低用戶接入成本的目的。我們還希望能夠以平台化的方式為用戶提供自助滿足數據需求的手段,從而提升數據集成工作的整體效率。
- 從提升任務可管理性的角度看,我們還要維護好數據的血緣關系。讓業務更好的去度量數據產出之間的關系,更有效的評估數據產出的業務價值,避免低質量和重復性的數據集成工作。最后我們需要對所有任務提供系統化的監控和報警能力來保障數據產出的穩定性。
二、歷史設計
在知乎的第一代數據集成平台成型前,大量的任務散落在各個業務方自己維護的 crontab 或者自行搭建的各種調度系統中。在這樣的無管理狀態下,各項集成任務的可靠性和數據質量都很難得到有效的保障。因此在這個階段我們要最迫切解決的是管理上的問題,讓數據集成的流程可管理可監控。
因此,我們整合了各種存儲系統的元數據系統,讓大家可以在統一的地方看到公司所有的數據資產。然后在調度中心統一管理這些數據的同步任務,由調度中心負責任務的依賴管理。同時調度中心對任務的關鍵指標進行監控並提供異常告警能力。在這個階段我們沿用了從前大家廣泛使用的 Sqoop 來實現 MySQL 和 Hive 之間數據的同步。且在平台建設后期,隨着流數據同步需求的出現,我們又引入了 Flink 來同步 Kafka 數據到 HDFS。
在建設初代集成平台時我們做過一次技術選型的選擇,是繼續使用已經得到廣泛驗證的 Sqoop 還是遷移到其它可選的技術方案。同 Sqoop 相比,阿里開源的 DataX 是這個領域一個非常有競爭力的對手。如果把這兩個產品進行橫向對比,可以發現他們在不同的方面互相有對方所不具備的優勢。
- 比如 Sqoop 在系統規模上具備 MapReduce 級別的擴展性和原生的 Hive 支持。但 Sqoop 又有數據源支持不豐富,缺乏一些重要功能特性的缺點。
- 而 DataX 提供了非常豐富的數據源支持,內置了數據集成系統非常重要的限速能力,還有它的良好設計所帶來的易於定制和擴展的能力。但它也存在無集群資源管理支持和欠缺 Hive Catalog 原生支持的缺陷。
在當時的狀態下這兩個產品相互比較起來沒有一款產品具有絕對的優勢。所以我們選擇了繼續使用 Sqoop,而維持使用 Sqoop 在驗證環節上也為我們節約了許多投入,所以第一代的數據集成平台在非常短的時間內就完成了開發和驗證並完成上線。
隨着初代數據集成平台的上線和成熟,它很好的支撐了公司的數據集成業務需求並獲得了顯著的收益。到目前為止平台上一共有大約 4000 個任務,每天運行超過 6000 個任務實例,同步大約 82 億條共計 124TB 的數據。
在平台的幫助下,數據接入流程得到了極大的簡化,為用戶提供了自助解決數據集成需求的能力。並且,平台在關鍵的流程節點上能夠輔以必要的規范約束和安全審查,在提升了管理水平的同時,整體的安全性和數據質量也得到了顯著的提升。
借助於 Yarn 和 K8s 的彈性能力,集成任務的規模擴展能力也有了很大的提升。當然,作為解決從 0 到 1 問題的第一代系統,也必然會伴隨着一系列問題。比如:
- Sqoop 的 MapReduce 模式所固有的高調度時延問題
- 業務數據分布不均所導致的數據傾斜問題
- 社區不活躍導致部分 Issue 長期無法得到解決的問題
- Sqoop 代碼設計不理想導致的可擴展性和可管理性弱的問題。
三、轉向 Flink
與 Sqoop 相對的,是用於支持 Kafka 消息到 HDFS 數據集成任務的 Flink,它以優秀的可靠性和靈活的可定制性獲得了大家更多的信任。基於流式數據集成任務為 Flink 建立的信心,我們開始嘗試全面轉向 Flink 來建設下一代的數據集成平台。
雖然 Flink 是本次平台演進中的最佳候選,我們還是基於當時的情況對市面上可選的技術方案再次進行了調研。這次我們將 Apache NIFI 項目和 Flink 進行了多方面的比較,從功能角度看:
- Apache NIFI 非常強大且完全覆蓋了我們當前的數據集成需求。但是恰恰因為它功能過於強大並且自成體系,所以也帶來了較高的整合門檻。而且,無法利用現有 Yarn 和 K8s 資源池也會帶來額外的資源池建設和維護的成本。
- 相比之下, Flink 具有一個非常活躍和開放的社區,在立項時刻就已經具備了非常豐富的數據源支持,可以預期在未來它的數據源覆蓋一定會更加全面。而且 Flink 作為一個通用計算引擎有着強大易用的 API 設計,在這個基礎上進行二次開發非常容易,所以它在可擴展性方面的優勢也非常突出。
最后基於我們對批流一體目標的認同,未來在知乎完成大數據計算引擎技術棧的統一也是一個極具吸引力的目標。
基於這些考量,在本輪迭代中我們選擇了全面使用 Flink 替代 Sqoop,基於 Flink 完整實現了之前 Sqoop 的功能並重新建設了全新的集成平台。
如下圖所示,橙色部分是本輪迭代中發生了變化的部分。除了作為主角出現的 Flink 之外,在本輪迭代的過程中我們還開發了 TiDB、Redis 和 Zetta 三種存儲系統的數據集成功能。在消息系統這邊則直接從社區獲得了 Pulsar 的支持。在我們開始開發工作的時候,Flink 已經演進到了比較成熟的階段,對 Hive 內建了原生的支持,整個遷移過程沒有遇到過多的技術困難,非常順暢。
Flink 的遷移為我們帶來了許多收益。
1. 首先從可維護性上看,相比 Sqoop 有了非常顯著的改善。如下圖所示,左邊是過去使用 Sqoop 時的任務定義,這里是一大堆非結構化的容易出錯的原始命令。而 Flink 則只需使用 SQL 定義一個源表和一個目標表再配合寫入命令來定義任務。任務的可理解性、可調試性遠好於從前,變成最終用戶也能夠理解的模式。很多問題不再需要平台開發者配合排查,用戶就能夠自助的解決許多常見的任務異常。
2. 在性能角度方面,我們也有針對性的做了許多優化。
2.1 調度策略
首先是調度策略上的優化,在第一代集成平台中我們只使用 Flink 同步流式數據,所以任務調度完全使用 Per Job。現在平台同時支持了 Session 和 Per Job 的混合調度模式,於是,對於從消息系統接入數據的流式任務會繼續使用 Per-Job 模式運行,而批同步的任務則采用 Session 模式復用集群從而避免集群啟動的耗時提升同步效率。
當然,在這樣的場景中使用 Session 集群也存在着一系列的挑戰,比如工作負載隨着任務提交不停變化而帶來的資源需求變化問題。所以我們建設了自動的擴縮容機制來幫助 Session 集群應對變化的負載。除此以外,為了簡化計費機制和隔離風險,我們還為不同的業務線創建了私有 Session 集群用於服務對應業務線的數據集成任務。
2.2 數據庫
在關系數據庫方面,我們采用了常見的 JDBC 方式對 MySQL 進行數據同步,但這種方式也會存在一些固有難以解決的問題。
- 比如因業務數據在主鍵維度上空間分布不均導致的數據傾斜問題。
- 再比如為了隔離在線離線工作負載所建設的專用同步從庫,所產生的資源浪費和管理成本。
- 並且由於 MySQL 實例眾多規格不一,合理協調多個並發任務的實例和實例所在的主機,進行合理的速度控制也非常困難。
相比之下,考慮到正在全面將數據從 MySQL 遷移到 TiDB 這一趨勢。我們開發了原生 TiDB 的 Flink connector 來充分利用 TiDB 架構上的優勢。
- 首先 region 級別的負載均衡策略能夠確保對於任何表結構和任何的數據分布,同步任務都能夠以 region 為顆粒度進行拆分避免數據傾斜問題。
- 其次通過設定副本放置策略,可以在離線數據中心對數據統一放置一個 Follower 副本。進而在保持原有目標副本數量不變,無需額外資源成本的情況下,利用 Follower read 的能力隔離在線交易和數據抽取的負載。
- 最后我們還引入了分布式的數據提交方式提升了數據寫入的吞吐能力。
3. 最后是為知乎內部廣泛使用的 Redis 提供數據集成的能力。Flink 社區已經有一個 Redis connector,但它目前只具備寫入能力並且難以靈活定制寫入時所使用的 key。所以我們基於自身需求重新開發了一個 Redis connector,同時支持以 Redis 作為 Source 和 Sink。
同樣為了避免數據抽取過程影響在線交易,在數據讀取路徑上我們采用了 Redis 原生的 master/slave 機制獲取並解析 RDB 文件抽取數據,獲得了單實例約 150MB 每秒的數據抽取吞吐。而且得益於打通內部存儲系統的元數據,我們不但能夠支持分片模式 Redis 集群的數據抽取,還可以只選擇每個分片的 slave 節點作為數據抽取源頭,避免抽取對 master 節點產生壓力。
這次全面轉向 Flink 的演進,解決了很多上一代數據集成平台的問題,獲得了非常顯著的收益。
- 從吞吐角度看,以 Flink 替代 MR 模式將整個調度的時延從分鍾級降低到了 10 秒左右。並且在同樣的數據量和同樣的 Flink 資源量情況下,TiDB 原生 connector 能夠比 JDBC 提升 4 倍的吞吐。
- 從功能角度看,新平台不但能夠原生支持分庫分表的數據集成任務,還能夠以業務無關的方式避免數據傾斜的問題。
- 在數據源支持能力上,我們以非常低的成本獲得了 TiDB、Zetta、Redis 和 Pulsar 的支持。而且,隨着 Flink 的生態越來越完善,未來一定會有更多的開箱即用的 connector 供我們使用。
- 從成本上看,最后下線 MySQL 離線節點和統一使用 K8s 資源池所帶來的資源效率提升,在成本和管理角度看都使我們獲得了顯著的收益。
四、Flink 即未來
回過頭看,本次全面 Flink 化的演進投入產出比非常高,這也進一步增強了我們對 “Flink 即未來” 的的信心。目前在知乎內部除了數據集成場景,Flink 在搜索 Query 的時效性分析、商業廣告點擊數據處理和關鍵業務指標的實時數倉上也都有所應用。
在未來我們希望能夠進一步擴展 Flink 在知乎的使用場景,建設更加全面的實時數倉、系統化的在線機器學習平台。我們更希望批流一體的落地,讓報表類和 ETL 類的大型批任務也能夠在 Flink 平台上落地。
基於知乎大數據系統建設的模式和總體資源投入的情況,未來將技術棧向 Flink 收攏是一個非常適合知乎的選擇。作為用戶我們非常期待能夠一同見證未來 Flink 批流一體目標的達成。同時作為社區的成員,我們也希望能夠用自己的方式為這一目標的達成貢獻一份力量。
本文為阿里雲原創內容,未經允許不得轉載。