基於 DolphinScheduler 的數據質量檢查實踐


今天給大家帶來的分享是基於 Apache DolphinScheduler 的數據質量檢查實踐,分享的內容主要為以下四點:

  • 為什么要做數據質量檢查?

  • 為什么要基於 DolphinScheduler 做數據質量檢查?

  • 基於 DolphinScheduler 的數據質量服務的設計和實現

  • 不足和規划

1 為什么要做數據質量檢查

在今天,數據已經成為企業的新型資產,有效的數據能夠支撐企業的分析和決策,而錯誤的數據卻可能會帶來負面的影響,我們一起來看下數據質量差會帶來什么問題:

  • 數據可信度低

  • 影響數據分析和數據挖掘的准確性

  • 可能會導致錯誤的決策

  • 數據開發層面的工作越來越多,鏈路也越來越長,如果沒有在一些關鍵的節點配置好相應的檢查,一旦出現數據錯誤的問題就比較難定位。

由此可知做好數據質量監控是數據開發工作的重中之重,做好數據質量檢查可以帶來提高數據的可信度、 及時發現數據錯誤問題,更好地定位問題和提高工作效率的好處

2 為什么基於 DS 來做?

在探討完數據數據質量檢查的重要性以后,我們接下來一起看看我們為什么要基於 DolphinScheduler 來開發數據質量檢查。

2.1 DolphinScheduler 是什么?

DolphinScheduler是一個分布式易擴展的可視化DAG工作流任務調度系統。致力於解決數據處理流程中錯綜復雜的依賴關系,使調度系統在數據處理流程中開箱即用

它具備以下的特性:

  • 高可靠性,去中心化的多 Master 和多 Worker 服務架構, 避免單 Master 壓力過大,另外采用任務緩沖隊列來避免任務過載

  • 簡單易用,所有流程定義都是可視化,通過拖拽任務可完成定制 DAG ,也可通過 API 方式與第三方系統集成, 一鍵部署

  • 具有豐富的應用場景,支持多租戶,支持暫停恢復操作. 緊密貼合大數據生態,提供 Spark, Hive, M/R, Python, Sub_process, Shell 等近20種任務類型

  • 高擴展性,支持自定義任務類型,調度器使用分布式調度,調度能力隨集群線性增長,Master 和 Worker 支持動態上下線

2.2 數據質量檢查的目標

我們想要的的數據質量檢查的樣子:

  • 具備豐富的內置規則,能夠滿足我們日常的檢查需求

  • 能夠無縫接入到工作流當中,當出現嚴重數據錯誤問題時能及時進行告警和阻斷

  • 有較為完善的結果處理機制

  • 能夠查看數據檢查結果和錯誤數據,方便排查問題

在確定我們的目標以后,我們調研了現有的開源方案是否能夠滿足我們的需求,目前開源方案中使用 Java 語言開發的較為知名的有兩個:

  • Griffin:

    • Apache 頂級項目,是一個優秀並且完備的數據質量檢查系統,

    • 具有獨立的UI、調度和內置規則,依賴於 Apache Livy 來提交 Spark 作業

    • 一個獨立的系統,較難無縫地接入到工作流當中來實現當出現嚴重數據質量問題時的阻斷。

  • Qualitis:

    • 微眾開源的數據質量系統,具備較豐富的內置規則,界面簡潔容易使用

    • 依賴於 Linkis 作為執行Spark作業的引擎

    • 如果想要實現無縫接入工作流需要依賴DataSphere Studio,不夠輕量級

2.3 優勢

在調研完現有的開源方案以后,我們決定基於 DolphinScheduler 來開發我們的數據質量檢查服務。那么基於DolphinScheduler來開發數據質量檢查有什么優勢呢?

  • DolphinScheduler作為一個任務調度系統,具備了執行任務的基礎,不需要引入新的組件來提交任務

  • 數據質量檢查可以作為一種任務類型無縫接入到工作流當中

  • 無需新增其他服務來增加運維的難度

  • 可以很好地與社區共建開源

3 設計和實現

我們來看一下整體的運行流程

運行流程
  • 用戶側

    • 用戶創建數據質量檢查任務時會在前端頁面選擇一個規則,填入所需要的參數,保存任務

  • 系統側

    • 數據質量任務開始執行,由 Master 下發任務到 Worker ,Master 是DolphinScheduler中的調度工作流和任務的組件,Worker 是DolphinSchduler中負責實際執行任務的組件。

    • Worker 接收到任務以后會進行參數的解析和構造,執行 DQCApplicaiton,DQCApplicaiton 執行完成后會將結果寫入到相應的存儲引擎,Worker 會發送 TaskResponse 給 Master。

    • Master 接收到 TaskResponse 后會對任務類型進行判斷,如果是數據質量檢查任務的話會進行數據質量結果判斷,一旦發現數據異常就會進行告警或者阻斷。

從上面的流程中我們可以看到下面幾個核心的組成部分:

  • 規則管理:規則定義以及規則的使用

  • 數據處理:Worker 將規則定義轉化為 DQCAppliccation 所需的參數以及 DQCApplicaiton 執行數據處理

  • 結果處理:結果檢查以及異常后的處理

3.1 規則管理

3.1.1 規則的定義

規則是整個數據質量檢查的核心部分,每一條規則對應着一個檢查指標,那么一條完整的規則應該包含什么內容呢?規則主要是由兩個部分組成。

  • 參數輸入項:在我們的數據質量檢查規則中核心的輸入項包括:

    • 統計值指的是我們對要檢驗的數據執行一系列操作后得到的值,例如表總行數或者為某字段為空的行數。

    • 比對值指的是用來作為比較目標的值,比對值可以是固定值,也可以是由定義好的計算邏輯計算出來的值

    • 數據源輸入項,定義了要檢查哪些數據

    • 統計值參數比對值參數

    • 結果判斷相關的參數,包括檢查的方式、比較符和閾值以及失敗,這部分的參數主要是用來定義怎樣判斷數據是否異常和異常如何處理。

  • SQL定義:需要定義SQL來計算得到統計值、比對值以及獲取錯誤數據

在設計規則的時候做了如下考慮:

  • 后續新增規則不希望頻繁修改前端頁面

  • 保證用戶良好的體驗包括輸入項的聯動比如數據源、表和列的選擇聯動等等。

我們決定使用前端表單自動生成技術form-create,后端讀取規則參數輸入項轉換成form-create所規定的JSON字符串傳輸給前端,由前端去自動生成表單輸入項,實現規則靈活的增刪改,同時也保證前端代碼簡潔和用戶體驗。

下圖是我們定義數據質量任務時所展示的表單,選擇不同的規則前端就會生成不同的表單輸入項

數據質量任務前端表單輸入項

我們按照准確性、完整性、及時性、唯一性、規范性、一致性和自定義監控七種分類定義了十幾個規則。下面簡單講下幾種分類的定義和對應的一些規則。

  • 准確性檢查指的是檢查數據是否存在反常或者錯誤的情況,例如數值反常地過大或者過小,或者超過記錄的波動值,對應的規則包括數值波動檢查,最大值、最小值檢查,跨表准確性等。

  • 完整性檢查指的是檢查數據是否存在缺失的情況,這里的缺失可以是整個數據集的記錄缺失,也可以是某個字段記錄的缺失,對應的規則包括空值檢查,表總行數檢查。

  • 及時性檢查指的是檢查數據是否能夠按時地產生。

  • 唯一性檢查指的是檢查哪些數據是重復數據或者數據的哪些屬性是重復的,對應的規則例如主鍵唯一性檢查。

  • 規范性檢查指的是檢查數據是否符合規范,是否按照規定的格式存儲,對應的規則包括正則表達式檢查,字段長度檢查,枚舉集合檢查,數值范圍等。

  • 一致性檢查指的是檢查數據是否符合邏輯,數據內單項或多項數據間存在邏輯關系,例如pv>uv 對應的規則是表邏輯檢查。

  • 自定義監控指的是用戶可以通過自定義SQL的方式來定義要檢查的邏輯,支持的規則包括單表自定義SQL和跨表值比對。

3.1.2 比對值的管理

比對值在規則中也是相對比較重要的組成部分,我們對目標數據進行計算統計以后得到的統計值去和比對值進行一定方式的比較才能得到一個校驗結果。

比對值類型和內置規則將決定了我們數據質量校驗方式的豐富性,我們系統里面內置較為豐富的比較值:

  • 固定值

  • 最近7天波動

  • 最近30天波動

  • 日波動

  • 周波動

  • 月波動

  • 源表總行數

  • 目標表總行數

同時也支持用戶去自定義比對值,只需要定義好以下參數:

  • execute_sql:該sql是用來計算出比對值

  • output_table_name:將execute_sql執行的結果注冊到臨時視圖所用的表名

  • comparison_name:是比對值的名稱,結合output_table_name可用於其他SQL中

3.2 數據處理

規則定義好了,用戶也填好參數,這個時候就需要一個可以執行計算任務的組件來完成實際數據處理,DQCApplication 就是這么一個組件,它主要是由兩個部分組成:

  • 執行引擎:我們選擇了 Spark 作為任務的執行引擎。Spark 支持多種數據源類型的連接同時計算比較快。

  • 執行鏈路組件:我們設計了ReaderTransformerWriter三種類型的組件, Reader用於連接數據源,Transformer用於執行sql進行數據的處理,Writer用於將數據輸出到指定的存儲引擎中

3.2.1 執行流程

整個應用的執行流程是這樣的:

  • Worker 接收到任務以后,會將規則的參數輸入項和 SQL 定義轉換成 DQCApplication 所需要的參數傳給 DQCApplication 去執行

  • DQCApplication 解析參數選擇相應的引擎,構造出引擎對應的 RuntimeEnviroment 和 Execution

  • 同時也會根據參數創建一系列的 Reader、Transformer 和 Writer , Execution 會按照一定的順序去執行這些組件中的邏輯,來完成整個數據質量校驗任務

數據處理執行流程

在上圖里我們看到可以定義一個或者多個 Reader 來滿足不同場景的需求,最后通過 Writer 輸出的數據包括校驗結果、統計數據以及錯誤數據

當 DQCApplicaiton 執行完計算邏輯,把統計值、比對值計算出來寫到存儲引擎中以后,會把 TaskResponse 發送給 Master ,由 Master 來執行最后一步的結果處理。

當 Master 接收到 TaskResponse 以后,判斷任務類型為 DATA_QUALITY 后會進行結果判斷,如果判斷結果為異常的話,那么就會根據所選擇的失敗策略進行處理

3.2.2 結果處理

怎么去判斷我們所檢查的數據是否異常呢?

這里我們提供了一個校驗的公式,請看下圖

校驗公式

在這圖里面我們可以看這個校驗的公式由是三個部分組成,檢查方式,比較符和用戶定義的閾值,只要將統計值和比對值填入到由這三個組成的公式就可以得到檢查結果,接下來我們來看一個例子,校驗表總行數能否達標。

舉個例子

我們這里選擇的的檢查方式是比對值減去統計值,假設統計值是9800,比對值是10000,比較符我們選擇 大於等於,閾值設置為100。

這個表達式想要達到的意圖是當 比對值減去統計值的差 大於等於 100 時,檢查結果是失敗的。我們把統計值和比對值套入這個公式當中,發現10000-9800 = 200 是大於100 ,那么檢查結果為失敗。

一旦發現檢查結果為異常,那么就要執行相應的失敗策略,我們提供兩個等級的失敗策略:

  • 告警,告警等級是當檢查結果為異常時會進行告警,但不會將任務結果設置為false,不導致整個工作流的阻斷。

  • 阻斷:當檢查結果為異常時,首先是進行告警,同時會將任務的結構設置為false,對整個工作流進行阻斷。

3.2.3 結果數據查看

當有數據異常的時候我們可以在結果列表中去看相關的結果數據:

校驗結果

在這個列表中可以較為清晰地了解統計值比對值比較方式等信息,幫助我們了解數據異常的情況.

有時候光看統計值並不能很好地了解數據異常的原因,這個時候我們也可以查看錯誤數據,看看是哪些數據出了問題,幫我們更好地去修復數據。

我們支持兩種錯誤數據的輸出方式:

  • 寫入到 ElasticSearch 中,可以通過 Kibana 或者其他工具去查看.

  • 寫到 HDFS 中,這樣不用新增新的組件,也可以查看錯誤數據,只是沒那么方便。

4 不足和規划

不足:

  • 目前的數據質量校驗只支持離線數據的校驗

  • 只支持 Jdbc 和 Hive 兩種類型的數據源

  • 沒有提供頁面讓用戶去自定義規則

規划:

  • 我們會支持實時數據的校驗

  • 增加新的數據源類型,例如 Hdfs、ES 等

  • 支持用戶可以自定義單表檢查規則

結束語

目前,我們已經將數據質量校驗功能貢獻給社區,1.0 版本的代碼在 dolphinscheduler 的主倉庫的 data_quality_design 分支上,2.0 版本也會在近期完成pr的提交,也希望能有更多感興趣的小伙伴一起來參與,和社區一起共建開源,謝謝大家。

下面視頻是 ApacheCon Asia 上做的分享演講: 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM