異構數據庫遷移——DATAX


背景

      在最近接觸到的一個case里面,需要把db2的數據遷移至oracle,客戶可接收的停機時間為3小時。


同步方式的比較

      一說到停機時間,大家第一時間想到Oracle公司的GoldenGate實時同步工具。但在測試過程中發現,由於無法提前檢查,而且初始化時間很久等問題,導致我們最后不得不放棄使用這一神器。

      既然OGG不能使用,那能傳統導出文本再用sql load導入,那是否可行呢?根據以往的經驗,只要數據一落地就存在亂碼,數據錯位等問題,由於無法進行hash對賬,數據質量根本無法保證。

      我司的某平台軟件,直接從內存中進行數據轉換,讓我們看到一大希望。由於列的順序不一,無法滿足部分需求,只能又放棄這一神器。

      就在此時,提出了一個開源軟件——DATAX。


什么是DATAX

        DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。

       注意,這里說的是離線數據同步工具,就是必須得停機之后再進行數據同步。那DATAX究竟支持哪些數據庫進行數據同步呢?以下是GitHub提供的列表:

類型

數據源

Reader(讀)

Writer(寫)

文檔

RDBMS 關系型數據庫

MySQL

Oracle   

    √   

    √   

SQLServer

PostgreSQL

DRDS

通用RDBMS(支持所有關系型數據庫)

阿里雲數倉數據存儲

ODPS

ADS

OSS

OCS

NoSQL數據存儲

OTS

Hbase0.94

Hbase1.1

MongoDB

Hive

無結構化數據存儲

TxtFile

FTP

HDFS

Elasticsearch


DataX3.0核心架構

        在這個case里面,我們使用datax3.0的核心架構。DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。

image

核心模塊介紹:
  • DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  • DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
  • 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
  • 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
  • DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0

  • JSON配置

    JOB全局配置:

           JOB通過配置對同步整個過程進行管控。

        "job": {
            "setting": {
                "speed": {
                    "byte": 1048576
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },


    speed為同步速度限制參數,這里有三個參數channel、record和byte。
    channel:管道數,可以理解為並行數,需與splitPk一同使用,否則無效果。
    record:每次同步多少條數據,取record和byte中的最小值
    byte:每次同步多少字節數據,取record和byte中的最小值

    errorLimit為錯誤數據限制,這里有兩個參數record和percentage,指當異常數據達到多少時同步取消,取record和percentage的最小值。


    Reader線程配置:

          RDBMSReader通過JDBC連接器連接到遠程的RDBMS數據庫,並根據用戶配置的信息生成查詢SELECT SQL語句並發送到遠程RDBMS數據庫,並將該SQL執行返回結果使用DataX自定義的數據類型拼裝為抽象的數據集,並傳遞給下游Writer處理。

          對於用戶配置Table、Column、Where的信息,RDBMSReader將其拼接為SQL語句發送到RDBMS數據庫;對於用戶配置querySql信息,RDBMS直接將其發送到RDBMS數據庫。

          DB2被阿里分為通用RDBMS的范疇。配置一個從RDBMS數據庫同步抽取數據作業樣例:

            "content": [
                {
                    "reader": {
                        "name": "rdbmsreader",
                        "parameter": {
                            "username": "xxx",
                            "password": "xxx",
                            "column": [
                                "id",
                                "name"
                            ],
                            "splitPk": "pk",
                            "connection": [
                                {
                                    "table": [
                                        "table"
                                    ],
                                    "jdbcUrl": [
                                        "jdbc:dm://ip:port/database"
                                    ]
                                }
                            ],
                            "fetchSize": 1024,
                            "where": "1 = 1"
                        }
                    },
    
    

    username
    源數據庫的用戶名

    password
    源數據庫的密碼 

    encoding

    數據庫字符集,GBK,UTF8等


    column
    所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息。用戶使用代表默認使用所有列配置,例如['']。
    支持列裁剪,即列可以挑選部分列進行導出。

    支持列換序,即列可以不按照表schema信息進行導出。

    支持常量配置,用戶需要按照JSON格式: ["id", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id為普通列名,1為整形數字常量,'bazhen.csy'為字符串常量,null為空指針,to_char(a + 1)為表達式,2.3為浮點數,true為布爾值。

    Column必須顯示填寫,不允許為空!


    splitPk

    RDBMSReader進行數據抽取時,如果指定splitPk,表示用戶希望使用splitPk代表的字段進行數據分片,DataX因此會啟動並發任務進行數據同步,這樣可以大大提供數據同步的效能。

    推薦splitPk用戶使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分片也不容易出現數據熱點。

    目前splitPk僅支持整形數據切分,不支持浮點、字符串型、日期等其他類型。如果用戶指定其他非支持類型,RDBMSReader將報錯!

    注意:這里並非只能是主鍵,擁有唯一約束的列也可。


    table

    所選取的需要同步的表名


    jdbcUrl

    描述的是到對端數據庫的JDBC連接信息,jdbcUrl按照RDBMS官方規范,並可以填寫連接附件控制信息。請注意不同的數據庫jdbc的格式是不同的,DataX會根據具體jdbc的格式選擇合適的數據庫驅動完成數據讀取。

  • db2格式 jdbc:db2://ip:port/database

  • fetchSize

    該配置項定義了插件和數據庫服務器端每次批量數據獲取條數,該值決定了DataX和服務器端的網絡交互次數,能夠較大的提升數據抽取性能。

    注意,該值最大建議值為2048。


    where

    篩選條件,RDBMSReader根據指定的column、table、where條件拼接SQL,並根據這個SQL進行數據抽取。例如在做測試時,可以將where條件指定為limit 10;在實際業務場景中,往往會選擇當天的數據進行同步,可以將where條件指定為gmt_create > $bizdate 。


    小結:
       我司某產品不被我們選用主要是配置中不存在column和where兩個條件的參數,無法對部分數據進行過濾。


    Writer線程配置:

           OracleWriter 插件實現了寫入數據到 Oracle 主庫的目的表的功能。在底層實現上, OracleWriter 通過 JDBC 連接遠程 Oracle 數據庫,並執行相應的 insert into ... sql 語句將數據寫入 Oracle,內部會分批次提交入庫。

           OracleWriter 面向ETL開發工程師,他們使用 OracleWriter 從數倉導入數據到 Oracle。同時 OracleWriter 亦可以作為數據遷移工具為DBA等用戶提供服務。

           OracleWriter 通過 DataX 框架獲取 Reader 生成的協議數據,根據你配置生成相應的SQL語句

  • insert into...(當主鍵/唯一性索引沖突時會寫不進去沖突的行)
  • 對於使用datax同步到oracle的表,建議刪除主鍵和唯一索引,數據校驗完成后再進行重建。


                    "writer": {
                        "name": "oraclewriter",
                        "parameter": {
                            "username": "root",
                            "password": "root",
                            "column": [
                                "id",
                                "name"
                            ],
                            "preSql": [
                                "delete from test"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",
                                    "table": [
                                        "test"
                                    ]
                                }
                            ]
                        }

    username

    目的數據庫的用戶名


    password

    目的數據庫的密碼


    encoding

    數據庫字符集


    batchSize

    一次性批量提交的記錄數大小,該值可以極大減少DataX與Oracle的網絡交互次數,並提升整體吞吐量。最大只能設置為1024


    column

    的表需要寫入數據的字段,字段之間用英文逗號分隔。例如: "column": ["id","name","age"]。如果要依次寫入全部列,使用表示, 例如: "column": [""]


    preSql

    執行語句時會先檢查是否存在若存在,根據條件刪除


    jdbcUrl

    目的數據庫的 JDBC 連接信息


    table

    目的表的表名稱。支持寫入一個或者多個表。當配置為多張表時,必須確保所有表結構保持一致。


    同步測試記錄

          數據轉換最高速度一條管道13000rec/s+,通過where手工對部分字段拆分進程進行同步,同步536250880rec大約需要90分鍾(容量300GB)的時間。


    免責聲明!

    本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



     
    粵ICP備18138465號   © 2018-2025 CODEPRJ.COM