概述
- 數據同步場景類型:
- 主數據庫與備份數據庫之間的數據備份;
- 主系統與子系統間的數據更新;
- 屬於同類型不同集群數據庫間的數據同步;
- 不同區域、不同數據庫類型間的數據傳輸交換;
- 大數據系統:數據從業務系統同步進入數據倉庫、數據從數據倉庫同步進入數據服務或者數據應用。(本篇主講數據從業務系統同步進入數據倉庫)
一、數據同步基礎
- 源業務系統的數據類型:
- 源於關系型數據庫的結構化數據(如MySQL、Oracle、DB2、SQL Server等);
- 源於非關系型數據庫的非結構化數據(如OceanBase、HBase、MongoDB等,數據主要存儲在數據庫表中);
- 源於文件系統的結構化或非結構化數據(如阿里雲對象存儲 OSS、文件存儲 NAS等,數據通常以文件形式進行存儲);
- 數據同步需要針對不同的數據類型及業務場景選擇不同的同步方式:直連同步、數據文件同步、數據庫日志解析同步。
1、直連同步
- 直連同步:通過定義好的規范接口 API 和基於動態鏈接庫的方式直接連接業務庫。(如 ODBC/JDBC 等規定了統一規范的標准接口,不同數據庫基於這套標准接口提供規范的驅動,致辭完全相同的函數調用和 SQL 實現)
- 優點:配置簡單,實現容易,比較適合操作型業務系統的數據同步。
- 缺點:對源系統的性能影響較大,當執行大批量數據同步是會降低甚至拖垮業務系統的性能。
- 如果業務庫采取主備策略,則可以從備庫抽取數據,避免對業務系統產生性能影響;但是當數據量較大時,采取此種抽取方式性能較差,不太適合從業務系統到數據倉庫系統的同步。
2、數據文件同步
- 通過約定好的文件編碼、大小、格式等,直接從源系統生成數據的文本文件,由專門的文件服務器,如 FTP 服務器傳輸到目標系統后,加載到目標數據庫系統中。
- 當數據源包含多個異構的數據庫系統(如 MySQL、Oracle、SQL Server、DB2等)時,用這種方式比較簡單、使用。
- 互聯網的日志類數據,通常是以文本文件形式存在的,也適合使用數據文件同步方式。
- 有通過文件服務器上傳、下載可能會造成丟包或錯誤,為了確保數據文件同步的完整性,通常除了上傳數據文件本身以外,還會上傳一個校驗文件,該校驗文件記錄了數據文件的數據量以及文件大小等校驗信息,以供下游目標系統驗證數據同步的准確性。
- 從源系統生成數據文件的過程中,可以增加壓縮和加密功能,傳輸到目標系統以后,再對數據進行解壓縮和解密,這樣可以大大提高文件的傳輸效率和安全性。
3、數據庫日志解析同步
- 數據庫日志解析同步:通過解析日志文件獲取發生變更的數據,從未滿足增量數據同步的需求。(日志文件信息足夠豐富,數據格式穩定)
- 目前廣泛應用於從業務系統到數據倉庫的增量數據同步應用中。
- 以 Oracle 為例:
- 通過源系統的進程,讀取歸檔日志文件用以收集變化的數據信息;
- 判斷日志中的變更是否屬於被收集對象,將其解析到目標文件中;(這種讀操作是在操作系統層面完成的,不需要通過數據庫,不會給源系統帶來性能影響。)
- 通過網絡協議,實現源系統和目標系統間的數據文件傳輸。(相關進程可以確保數據文件的正確接收和網絡數據包的正確順序,並提供網絡傳輸冗余,確保數據文件的完整性;)
- 數據被傳輸到目標系統后,可通過數據加載模塊完成數據的導入,從而實現數據從源系統到目標系統的同步;
- 優點:性能好,效率高,實現了實時與准實時同步的能力,延遲可以控制在毫秒級別,並且對業務系統的性能影響比較小。
- 問題/缺點:
- 數據延遲。(例:業務系統做批量補錄可能會使數據更新量超出系統處理峰值,導致數據延遲。)
- 投入較大。(需要在源數據庫與目標數據庫之間部署一個系統實施抽取數據。)
- 數據漂移和遺漏。(數據漂移,一般是對增量表而言的,通常指該表的同一個業務日期數據中包含前一天或后一天凌晨附近的數據或者丟失當天的變更數據。)
- 由於數據庫日志抽取一般是獲取所有的數據記錄的變更(增、刪、該),落地到目標表時需要根據主鍵去重按照日志事件倒排序獲取最后狀態的變化情況。(對於刪除數據這種變更,針對不同的業務場景可以采用一些不同的落地手法。)
-
實例:
-
下圖為源業務系統中某表變更日志流水表。其含義:存在5條變更日志,其中主鍵為 1 的記錄有 3 條變更日志,主鍵為 2 的記錄有 2 條變更日志。
- 針對刪除數據這種變更,主要有三種方式:假設根據主鍵去重,按照流水倒序,獲取記錄最后狀態生成的表為 delta 表;
- 第一種方式:不過濾刪除流水,生成下圖 delta 表。(不管是否是刪除操作,都獲取統一主鍵最后變更的那條流水。)
- 第二種方式:過濾最后一條刪除流水,生成下圖 delta 表。(如果統一主鍵最后變更的那條流水是刪除操作,就獲取倒數第二條流水。)
- 第三種方式:過濾刪除流水和之前的流水,生成下圖 delta 表。(如果在同一主鍵變更的過程中有刪除操作,則根據操作時間將該刪除操作做對應的流水和之前的流水都過濾掉)
- 對於采用哪種方式處理刪除數據,要看前端是如何刪除數據的。
- 前端業務系統刪除數據的方式一般有兩種:正常業務數據刪除、手工批量刪除。
- 手工批量刪除:通常針對類似的場景,業務系統只做邏輯刪除,不做物理刪除,DBA 定期將部分歷史數據直接刪除或者備份到備份庫。
- 一般情況下,可以采用不過濾的方式來處理,下游通過是否刪除巨佬的標識來判斷記錄是否有效。如果明確業務數據不存在業務上的刪除,但是存在批量刪除或備份數據刪除,則可以采用只過濾最后一條刪除流水的方式,通過狀態字段來標識刪除記錄是否有效。
二、阿里數據倉庫的同步方式
- 數據倉庫的特性之一是集成,將不同的數據來源、不同形式的數據整合在一起,所以從不同業務系統將各類數據源同步到數據倉庫是一切的開始。
- 阿里數據倉庫的數據同步的特點:
- 數據來源的多樣性。(除了結構化的數據,還有大量非結構化數據,特別是日志數據,這類數據通常直接以文本形式記錄在文件系統中,對於數據的分析、統計、挖掘等各類數據應用有極大的價值。)
- 數據量巨大。(目前大型互聯網企業的大數據系統每條同步的數據量達到 PB 級別(1 PB = 1024 TB),而阿里的大數據系統 MaxCompute 的數據存儲達到 EB 級別(1 EB = 1024 PB),每天需要同步的數據量達到 PB 級)
- 針對不同的數據源類型和數據應用的時效性要求,采用不同的同步方式。
1、批量數據同步
- 對於離線類型的數據倉庫應用,需要將不同的數據源批量同步到數據倉庫,以及將經過數據倉庫處理的結果數據定時同步到業務系統。
- 數據倉庫系統是集成了各類數據源的地方,數據類型是統一的。
- 要實現各類數據庫系統與數據倉庫系統之間的批量雙向數據同步,需要先將數據轉換成中間狀態,統一數據格式。
- 由於各數據庫系統的數據都是結構化的,均支持標准的 SQL 語言查詢,所以所有的數據類型都可以轉換成字符串類型。因此,通過將各類數據庫系統的數據類型統一轉換為字符串類型的方式,實現數據格式的統一。
- 阿里集團的 DataX :一個能滿足多方向高自由度的異構數據交換服務產品。
- DataX 通過插件的形式提供支持,將數據從數據源讀出並轉換為中間狀態,同時維護好數據的傳輸、緩存等工作。
- 數據在 DataX 中以中間狀態存在,並在目標數據系統中將中間狀態的數據轉換為對應的數據格式后寫入。
- DataX 通過分布式模式,批量同步處理數據。
- DataX 采用 Framework + Plugin 的開放式框架實現,Framework 處理緩沖、流程控制、並發、上下文加載等高速數據減緩的大部分技術問題,並提供鑒定的接口與插件介入。
- 數據傳輸在單進程(單機模式)/多進程(分布式模式)下完成,傳輸過程全內存操作,不讀寫磁盤,也沒有進程間通信,實現了在異構數據庫或文件系統之間的高速數據交換。
- Job:數據同步作業。
- Spliter:作業切分模塊。(將一個大任務分解成多個可以並發行的小任務)
- Sub-Job:數據同步作業切分后的小任務,或稱之為 Task。
- Reader:數據讀入模塊。(負責運行切分后的小任務,將數據從源系統裝載到 DataX)
- Channel:Reader 與 Writer 通過 Channel 交換數據。
- Writer:數據寫入模塊。(負責將數據從 DataX 導入到目標數據系統)
2、實時數據同步
- 天貓“雙 11” 的數據大屏為例:對所產生的交易數據需要實時匯總,實現秒級的數據刷新。
- 方法:通過解析 MySQL 的binlog 日志(相當於 Oracle 的歸檔日志)來實時獲得增量的數據更新,並通過消息訂閱模式來實現數據的實時同步。
- 具體操作:建立數據交易中心,通過專門的模塊從每台服務器源源不斷地讀取日志數據,或者解析業務數據庫系統的 binlog 或歸檔日志,將增量數據以數據流的方式不斷同步到日志交換中心,然后通知所有訂閱了這些數據的數據倉庫系統來獲取。
- 阿里集團的 Time Tunnel(TT)系統就是這樣的實時數據傳輸平台:是一種基於生產者、消費者和 Topic 消息標識的消息中間件,將消息數據持久化到 HBase 的高可用、分布式數據交互系統。
- 生產者(Client):消息數據的產生端,向 Time Tunnel 集群發送消息數據。
- 消費者(Client):消息數據的接收端,從 Time Tunnel 集群中獲取數據進行業務處理。
- Topic:消息類型的標識,如淘寶 acookie 日志的 Topic 為 tao_acookie,生成 Client 和消費 Client 均需要知道對應的Topic 名字。
- Broker 模塊:負責處理客戶端手法消息數據的請求,然后往 HBase 取發數據。
三、數據同步遇到的問題與解決方案
1、分庫分表處理
- 問題及要求:業務不斷增長,數據量飛速增加,需要系統具備靈活的擴展能力和高並發大數據量的處理能力;
- 解決方案:數據庫系統采用分布式分庫分表方案。(見下圖)
- 例:阿里集團的 TDDL(Taobao Distributed Data Layer)就是這樣一個分布式數據庫的訪問引擎:通過建立中間狀態的邏輯表來整合統一分庫分表訪問。
- TDDL 是在持久層框架之下、JDBC 驅動之上的中間件,與 JDBC 規范保持一致,有效解決了分庫分表的規則引擎問題,實現了 SQL 解析、規則計算、表名替換、選擇執行單元並合並結果集的功能,同時解決了數據庫表的讀寫分離、高性能主備切換的問題,實現了數據庫配置信息的統一管理。
2、高效同步和批量同步
- 數據同步的方法通常是:先創建目標表,再通過同步工具的填寫數據庫連接、表、字段等各種配置信息后測試完成數據同步。
- 問題:
- 業務發展,數據量增大,傳統方法完成工作的工作量增大,而且,相似並重復的操作降低開發人員的工作熱情;
- 數據倉庫的數據源種類特別豐富,遇到不同類型的數據同步,開發人員需要去了解其特色配置;
- 部分真正的數據需求方,如 Java 開發和業務運營,由於存在相關數據同步的專業技能部門,往往需要將需求提交給數據開發方來完成,額外增加了溝通和流程成本;
- 解決方案以阿里集團為例:數據倉庫研發了 OneClick 產品;
- 對不同數據源的數據同步配置透明化,可以通過庫名和表名唯一定位,通過 IDB 接口獲取源數據信息自動生成配合信息;
- 簡化了數據同步操作步驟,實現了與數據同步相關的建表、配置任務、發布、測試操作一鍵化處理,並且封裝成 Web 接口進一步達到批量化的效果;
- 降低了數據同步的技能門檻,讓數據需求方更加方便地獲取和使用數據。
- OneClick:實現了數據的一鍵化和批量化同步,一鍵完成 DDL 和 DML 的生成、數據的冒煙測試以及在生成環境中測試等。
- IDB:阿里集團用於統一管理 MySQL、OceanBase、PostgreSQL、Oracle、SQL Server等關系型數據庫的平台,是一種集數據管理、結構管理、診斷優化、實時監控和系統管理與一體的數據管理服務。(在對集團數據庫表統一管服務過程中,IDB 產出了數據庫、表、字段各個級別元數據信息,並提供了元數據接口服務)
3、增量與全量同步的合並
- 問題:傳統數據同步方式為周期全量數據同步,但隨着業務發展數據量的急劇增加,周期全量同步的效率太低了。
- 解決方案:每個周期只同步增量數據,然后與上一個同步周期獲取的全量數據進行合並,獲取最新版本的全量數據。
- 傳統數據整合方案:merge 方式(update + insert );
- 當前大數據平台不支持 update 操作,而采用:全外連接(full outer join) + 數據全覆蓋重新加載(insert overwrite);(即如日調度,則將當天的增量數據和前一天的全量數據做全外連接,重新加載最新的全量數據)
- 如果擔心數據更新錯誤:每條保持一個最新的全量版本,保留較短的事件周期。(另外,當業務系統的表有物理刪除數據的操作,而數據倉庫需要保留所有歷史數據時,也可以選擇這種方式,在數據倉庫中永久保留最新的全量數據快照。)
- 例:淘寶訂單表
4、同步性能的處理
- 數據同步任務是針對不同數據看系統之間的數據同步問題而創建的一些列周期調度的任務。在代行的數據調度工作台上,每條會運行大量的數據同步任務。針對數據同步任務,一般首先需要設定首輪同步的線程數,然后運行同步任務。這樣的數據同步模式存在以下幾個問題:
- 有些數據同步任務的總線程數達不到用戶設置的首輪同步的線程數時,如果同步控制器將這些同步縣城分發到 CPU 比較繁忙的機器上,將導致這些同步任務的平均同步速度非常低,數據同步速度非常慢;
- 用戶不清楚該如何設置首輪同步的線程數,基本都會設置成一個固定的值,導致同步任務因得不到合理的 CPU 資源而影響同步效率;
- 不同的數據同步任務的重要程度是不一樣的,但是同步控制器平等對待接收到的同步線程,導致重要的同步線程因得不到 CPU 資源而無法同步;
- 上述三種情況可能會導致同步任務不穩定。
- 阿里集團的解決思路:通過目標數據庫的元數據估算同步任務的總線程數,以及通過系統預先定義的期望同步速度估算首輪同步的線程數,同時通過數據同步任務的業務優先級決定同步線程的優先級,最終提升同步任務的執行效率和穩定性。
- 具體實現步驟:
- 用戶創建數據同步任務,並提交該同步任務;
- 根據系統提前獲知及設定的數據,估算該同步任務需要同步的數據量、平均同步速度、首輪運行期望的線程數、需要同步的總線程數;
- 根據需要同步的總線程數將待同步的數據拆分成等數據量的數據塊,一個線程處理一個數據塊,並將該任務對應的所有線程提交至同步控制器;
- 同步控制器判斷需要同步的總線程數是否大於首輪運行期望的線程數,若大於,則跳轉至(5);若不大於,則跳轉至(6);
- 同步控制器采用多機多線程的數據同步模式,准備該任務的第一輪線程的調度,優先發送等待時間最長、優先級最高且同一任務的線程;
- 同步控制器准備一定數量(期望首輪線程數 - 總線程數)的虛擬線程,采用單機多線程的數據同步模式,准備該任務響應實體線程和虛擬線程的調度,優先發送等待時間最長、優先級最高且單機 CPU 剩余資源可以支持首輪所有線程數且同一任務的線程,如果沒有滿足條件的機器,則選擇 CPU 剩余資源最多的機器進行首輪發送。
- 數據任務同步開始,並等待完成;
- 數據任務同步結束。
5、數據漂移的處理
- ODS:通常把從源系統同步進入數據倉庫的第一層數據成為 ODS 或者 staging 層數據。
- 數據漂移:指 ODS 表的同一個業務日期數據中包含前一天或后一天凌晨附近的數據,或者丟失當天的變更數據。
- 由於 ODS 需要承接面向歷史的細節數據查詢需求,這就需要物理落地到數據倉庫的 ODS 表按時間段來切分進行分區存儲;(通常的做法是按某些時間戳字段來切分,而實際上往往由於時間戳字段的准確性問題導致發生數據漂移)
- 問題:
- 通常,時間戳分為四類(根據其中的某一個字段來切分 ODS 表,導致產生數據漂移):
- 數據庫表中用來表示數據記錄更新時間的時間戳字段(假設這類字段叫 modified_time);
- 數據庫日志中用來表示數據記錄更新時間的時間戳字段(假設這類字段叫 log_time);
- 數據庫表中用來記錄具體業務過程發生時間的時間戳字段(假設這類字段叫 proc_time);
- 標識數據記錄被抽取到時間的時間戳字段(假設這類字段叫 extract_time);
- 理論上以上幾個時間應該是一致的,但實際生產中,這幾個時間往往會出現差異,可能的原因有一下幾點:
- 由於數據抽取是需要時間的,extract_time 往往會晚於前三個時間;
- 前台業務系統手工訂正數據時未更新 modified_time;
- 由於網絡或者系統壓力問題,log_time 或者 modified_time 會晚於 proc_time;
- 數據漂移場景:
- 根據 extract_time 來獲取數據(ODS 數據)。(數據漂移問題最明顯)
- 根據 modified_time 限制。(最常見,但是往往會發生不更新 modified_time 而導致的數據遺漏,或者凌晨事件產生的數據記錄漂移到前一天。)
- 根據 log_time 限制。(由於網絡或者系統壓力問題,log_time 會晚於 proc_time,從而導致凌晨時間產生的數據記錄漂移到后一天。)(例:天貓“雙11”大促期間,凌晨時間產生的數據非常大,用戶支付需要調用多個接口,導致 log_time 晚於實際的支付時間)
- 根據 proc_time 限制。(得到的 ODS 表只是包含一個業務過程所產生的記錄,會遺漏很多其他過程的變化記錄,違背了 ODS 和業務系統保持一致的設計原則)
- 兩種解決方法:
- (1)多獲取后一天的數據
- ODS 每個時間分區中,向前、向后多冗余一些數據,保障數據智慧多不會少,而具體的數據切分讓下游根據自身不同的業務場景用不同的業務時間 proc_time 來限制。
- 缺點:產生數據誤差。(例:一個訂單是當天支付的,但是第二條凌晨申請退款關閉了該訂單,那么這條記錄的訂單狀態會被更新,下游在統計支付訂單狀態時會出現錯誤)
- (2)通過多個時間戳字段限制時間來獲取相對准確的數據
- 首先,根據 log_time 分別冗余前一天最后 15 分鍾的數據和后一天凌晨開始 15 分鍾的數據,並用 modified_time 過濾非當天數據,確保數據不會因為系統問題而遺漏;
- 然后,根據 log_time 獲取后一天 15分鍾的數據;針對此數據,按照主鍵根據 log_time 做升級排列去重。(因為最終需要獲取的是最接近當天記錄變化的數據(數據庫日志將保留所有變化的數據,但是落地到 ODS 表的是根據主鍵去重獲取最后狀態變化的數據))
- 最后,將前兩步的結果數據做全外連接,通過限制業務時間 proc_time 來獲取所需要的數據。
- 例:淘寶交易訂單的數據漂移;
- “雙 11”的交易訂單中,有一大批在 11 月 11 日 23:59:59 左右支付的交易訂單漂移到了 12 日。主要原因是用戶下單支付后系統需要調用支付寶的接口而有所延遲,從而導致這些訂單最終生成的時間咵天了。即 modified_time 和 log_time 都晚於 proc_time。
- 難點:
- 如果訂單只有一個支付業務過程,則可以用支付時間類限制就能獲取到正確的數據。但是往往實際訂單有多個業務過程:下單、支付、成功,每個業務過程都有響應的時間戳字段,並不只有支付數據會漂移。
- 若果直接通過多獲取后一天的數據,然后限制這些時間,則可以獲取到相關數據,倒數后一天的數據可能已經更新多次,直接獲取到的那條記錄已經是更新多次后的狀態,數據的准確性存在問題。
- 解決方法:
- 根據實際情況獲取后一天 15 分鍾的數據,並限制多個業務過程的時間戳字段(下單、支付、成功)都是 “雙 11” 當天的,然后對這些數據按照訂單的 modified_time 做升序排列,獲取每個訂單首次數據變更的那條記錄;
- 根據 log_time 分別冗余前一天最后 15 分鍾的數據和后一天凌晨開始 15 分鍾的數據,並用 modified_time 過濾非當天數據,針對每個訂單按照 log_time 進行降序排列,去每個訂單當天最后一次數據變更的那條記錄。
- 最后將兩份數據根據訂單做全外連接,精漂移數據回補到當天數據中。