導讀:傳統ETL方案讓企業難以承受數據集成之重,基於Kafka Connect構建的新型實時數據集成平台被寄予厚望。
在4月21日的Kafka Beijing Meetup第四場活動上,DataPipeline CTO陳肅分享了DataPipeline是如何基於Kafka Connect框架構建實時數據集成平台的應用實踐。以下內容是基於現場錄音整理的文字,供大家參考。
什么是數據集成?最簡單的應用場景就是:一個數據源,一個數據目的地,數據目的地可以一個數據倉庫,把關系型數據庫的數據同步到數據倉庫里,就形成了一次數據集成。
企業數據集成面臨的4個挑戰
我們先來看一個真實的數據集成案例。
G公司是DataPipeline的一個典型客戶,擁有近千個數據源,類型主要包括Oracle、SQL Server、MySQL等。根據業務的需要和現有的基礎設施情況,這些數據源分別需要同步到不同的目的端,類型主要包括MySQL、HDFS、Kafka等。基於以上背景,G公司的具體要求如下:
1. 需要支持約5TB日新增數據量的同步,今年將增長5-10倍。
2. 這些數據一部分數據源要求實時同步,另一部分可接受定時同步。
3. 缺乏強大的運維人才,現有數據源的業務承載壓力有限,對壓力非常的敏感,要求進行限流。
4. 從這些數據源到目的地的同步都是Kettle寫腳本實現的,管理起來比較混亂,要求通過一個管理平台對任務進行集中化的配置和管理。
5. 上游的數據源和下游的數據目的都不穩定,隨時可能出現各種問題,要求通過一個高可用的平台以減少數據傳輸中斷的影響。
6. 當數據同步任務被隨機的暫停/恢復時,要求可以保證數據的完整性。
7. 當數據源和目的地隨機出現故障和過載時,要求可以保證數據的完整性。
8. 當數據源Schema發生變化時,要求可以根據業務需求靈活配置目的地策略。
G公司的案例只是當前企業數據集成需求的一個典型應用場景。事實上,無論是互聯網企業還是傳統企業,在面臨數據集成的時候都會遇到以下4個挑戰:
1. 數據源的異構性:傳統ETL方案中,從數據源到目的地的同步都是腳本實現的,異構數據源就意味着企業要做大量的適配工作。
2. 數據源的動態性:在數據集成時,上游的數據源端經常會發生各種變化,有些數據源可能被刪掉一些結構,這可能會影響到后續數據分析的結果。
3. 任務的可伸縮性:當數據集成只有幾個數據源,系統壓力的問題不太突出。當數據集成面臨的是成百上千個數據源時,多任務並行就需要進行一些限速與緩沖的調度,讓讀寫速度相互匹配。
4. 任務的容錯性:當數據在傳輸過程中出現問題的時候,是否可以實現斷點重傳,且不產生重復的數據。
以上也是DataPipeline要為企業數據集成過程中解決的最關鍵的4個問題。
為什么選擇Kafka Connect作為底層框架
Kafka Connect是一種用於在Kafka和其他系統之間可擴展的、可靠的流式傳輸數據的工具,可以更快捷和簡單地將大量數據集合移入和移出Kafka的連接器。Kafka Connect為DataPipeline提供了一個相對成熟穩定的基礎框架,還提供了一些開箱即用的工具,大大地降低研發的投入和提升應用的質量。
下面,我們看一看Kafka Connect的具體優勢。
首先,Kafka Connect提供的是以數據管道為中心的業務抽象。在Kafka Connect里有兩個核心概念:Source和Sink。Source負責導入數據到Kafka,Sink負責從Kafka導出數據,它們都被稱為Connector。比如Source Connector,Sink Connector,其實就是提供了數據讀取和寫入的高度業務抽象,可以簡化很多生命周期的管理工作。
當然,Source Connector會去初始化Source Task,Sink Connector會去初始化Sink Task。這些都是標准的封裝。對於數據方面,通過Source & Sink Record把數據的結構進行標准化的抽象。另外,企業客戶在做數據集成的時候,數據在很多應用場景下都要求有一定的格式,所以在Kafka Connect里用Schema Registry & Projector來解決數據格式驗證和兼容性的問題。當數據源產生變化的時候,會生成新的Schema版本,通過不同的處理策略用Projector來完成對數據格式的兼容。
第二,Kafka Connect具有良好的可伸縮性、與容錯性。這些特性是與Kafka是一脈相承的。在流式處理和批量處理模式里,更多取決於Source端如何去讀取數據,Kafka Connect天然支持流式處理和批量傳輸方式。單節點和集群水平擴展功能都是由Kafka Connect框架直接支持。而任務恢復和狀態保持方面,目的端任務的寫入進度信息通過Kafka Connect框架自動管理、源端任務可以根據需要往Kafka里面放讀取進度信息,節省很多精力去管理任務重啟后的進度。
對於數據集成這樣一個通用的應用場景里,大家肯定都不希望重復發明輪子。目前,在Kafka Connect生態系統下,擁有可以直接使用的Connector共84個,絕大部分都是開源的。其中,一部分是Kafka官方提供的,另外一些是Confluent認證的,還有一些是第三方提供的。根據需求適當裁剪后,這些Connector都可以應用到自己的系統平台中。
DataPipeline解決哪些數據集成的核心難題
基於Kafka Connect 框架,DataPipeline已經完成了很多優化和提升工作,可以很好地解決當前企業數據集成面臨的很多核心難題。
1. 任務的獨立性與全局性。
從Kafka設計之初,就遵從從源端到目的的解耦性。下游可以有很多個Consumer,如果不是具有這種解耦性,消費端很難擴展。企業做數據集成任務的時候,需要源端到目的端的協同性,因為企業最終希望把握的是從源端到目的端的數據同步擁有一個可控的周期,並能夠持續保持增量同步。在這個過程中,源端和目的端相互獨立的話,會帶來一個問題,源端和目的端速度不匹配,一快一慢,造成數據堆積現象嚴重。所以,企業用戶在建立一個數據任務之后,我們希望對任務進行緩沖的控制,避免數據丟失。
2. 任務並行化的方式。
如果企業客戶有1000張數據表需要建立數據集成的任務,就要考慮用什么方式進行任務切分最佳。其中一種方式是把1000張表切分成若干個任務。這種情況下,Source Task的負載很難做到均衡,Sink Task可以消費多個Topics,依然存在負載不均的問題,每個任務負載多少張表其實是很難均衡的。每增加一個任務都會觸發Rebalance機制。可以想象,每一張表都通過Source Connector和Sink Connector初始化一個源端和目的端任務,會大大增加Rebalance的開銷。
3. 異構數據的映射。
在給企業客戶做數據集成的時候,50%幾率都會遇到一些臟活累活——異構數據源的映射(Mapping)。這個映射對很多互聯網公司來說不是那么嚴重什么事兒,因為數據庫設計的都比較符合規范,對字段的命名方式等都會比較“優雅”(統一)。但是在傳統企業里,由於很多業務系統都會外包,還有一些意識的原因,導致數據庫設計的沒有那么規范和統一。用Kafka Connect做數據集成的時候,需要盡可能做到異構數據精准的還原,尤其金融行業客戶對此要求比較高。另外,當確實遇到數據之間不匹配的情況時,可以在業務數據之間進行比較合理的映射。
另外,源端的Source Record包含了每一列的基本數據類型(INT16、STRING等)以及可選的meta信息(例如“name”)。目的端處理Sink Record的時候,需要依據基本數據類型以及meta信息決定映射關系。
4. Schema變化的處理策略。
給企業做數據集成的時候,需要根據數據源Schema的變化給出對應的處理策略。基於Kafka Connect框架,我們提供了以下幾種處理策略:
(1)Backward Compatibility:可使用最新的Schema一致訪問所有數據,e.g. 刪除列、添加具有默認值的列。
(2)Forward Compatibility:可使用最舊的Schema一致訪問所有數據,e.g. 刪除具有默認值的列。
(3)Full Compatibility:可任意使用新舊Schema訪問所有數據。
Kafka Connect推薦使用Backward Compatibility,這也是Schema Registry的默認值。另外,企業用戶還會提出源端刪除列,目的端需要忽略,源端添加具有默認值列,目的端需要跟隨等需求,都以Task為單位進行配置和實現。
DataPipeline基於Kafka Connect做了哪些提升
在不斷滿足當前企業客戶數據集成需求的同時,DataPipeline也基於Kafka Connect 框架做了很多非常重要的提升。
1. 系統架構層面。
DataPipeline引入DataPipeline Manager的概念,主要用於優化Source和Sink的全局化生命周期管理。當任務出現異常時,可以實現對目的端和全局生命周期的管理。例如,處理源端到目的端讀取速率不匹配以及暫停等狀態的協同。
為了加強系統的健壯性,我們把Connector任務的參數保存在ZooKeeper中,方便任務重啟后讀取配置信息。
DataPipeline Connector通過JMX Client將統計信息上報Dashboard。在Connector中在技術上進行一些封裝,把一些通用信息,比如說Connector歷史讀取信息,跟管理相關的信息都采集到Dashboard里面,提供給客戶。
2. 任務並行模式。
DataPipeline在任務並行方面做了一些加強。我們在具體服務客戶的時候也遇到這樣的問題,需要同步數十張表。在DataPipeline Connector中,我們允許每個Task內部可以定義和維護一個線程池,通過控制線程並發數,並且每個Task允許設置行級別的IO控制。而對於JDBC類型的Task,我們額外允許配置連接池的大小,減少上游和下游資源的開銷。
3. 規則引擎。
DataPipeline在基於Kafka Connect做應用時的基本定位是數據集成。數據集成過程中,不應當對數據進行大量的計算,但是又不可避免地要對一些字段進行過濾,所以在產品中我們也在考慮怎樣提供一種融合性。
雖然Kafka Connect提供了一個Transformation接口可以與Source Connector和Sink Connector進行協同,對數據進行基本的轉換。但這是以Connector為基本單位的,企業客戶需要編譯后部署到所有集群的節點,並且缺乏良好的可視化動態編譯調試環境支持。
基於這種情況,DataPipeline產品提供了兩種可視化配置環境:基本編碼引擎(Basic Code Engine)和高級編碼引擎(Advanced Code Engine)。前者提供包括字段過濾、字段替換和字段忽略等功能,后者基於Groovy可以更加靈活地對數據處理、並且校驗處理結果的Schema一致性。對於高級編碼引擎,DataPipeline還提供了數據采樣和動態調試能力。
4. 錯誤隊列機制。
我們在服務企業客戶的過程中也看到,用戶源端的數據永遠不會很“干凈”。不“干凈”的數據可能來自幾個方面,比如當文件類型數據源中的“臟記錄”、規則引擎處理特定數據產生未預期的異常、因為目的端Schema不匹配導致某些值無法寫入等各種原因。
面對這些情況,企業客戶要么把任務停下來,要么把數據暫存到某處后續再處理。而DataPipeline采取的是第二種方式,通過產品中錯誤隊列預警功能指定面對錯誤隊列的策略,支持預警和中斷策略的設置和實施等,比如錯誤隊列達到某個百分比的時候任務會暫停,這樣的設置可以保證任務不會因少量異常數據而中斷,被完整記錄下來的異常數據可以被管理員非常方便地進行追蹤、排查和處理。企業客戶認為,相比以前通過日志來篩查異常數據,這種錯誤隊列可視化設置功能大大提升管理員的工作效率。
在做數據集成的過程中,確實不應該對原始數據本身做過多的變換和計算。傳統ETL方案把數據進行大量的變換之后,雖然會產生比較高效的輸出結果,但是當用戶業務需求發生變化時,還需要重新建立一個數據管道再進行一次原始數據的傳輸。這種做法並不適應當前大數據分析的需求。
基於這種考慮,DataPipeline會建議客戶先做少量的清洗,盡量保持數據的原貌。但是,這並不是說,我們不重視數據質量。未來的重要工作之一,DataPipeline將基於Kafka Streaming將流式計算用於數據質量管理,它不對數據最終輸出的結果負責,而是從業務角度去分析數據在交換過程中是否發生了改變,通過滑動窗口去判斷到底數據發生了什么問題,判斷條件是是否超出一定比例歷史均值的記錄數,一旦達到這個條件將進一步觸發告警並暫停同步任務。
總結一下,DataPipeline經過不斷地努力,很好地解決了企業數據集成過程需要解決異構性、動態性、可伸縮性和容錯性等方面的問題;基於Kafka Connect的良好基礎支撐構建了成熟的企業級數據集成平台;基於Kafka Connect進行二次封裝和擴展,優化了應用Kafka Connect時面臨的挑戰:包括Schema映射和演進,任務並行策略和全局化管理等。未來,Datapipeline將會基於流式計算進一步加強數據質量管理。
PS.添加DataPipeline君微信:datapipeline2018,拉你進技術討論群。