DataX


 

1.什么是DataX

​       DataX 是阿里巴巴開源的一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。

  https://github.com/kris-2018/DataX

2. DataX的設計

為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。

                                       

 

3.  框架設計

                                 

 

DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。

Reader:數據采集模塊,負責采集數據源的數據,將數據發送給Framework。

Writer:數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。

Framework:用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。

4.  運行原理

                                     

1)  DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。

2)  DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。

3)  切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。

4)  每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。

5)  DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0

 5.快速入門

  前置要求
    - Linux
    - JDK(1.8以上,推薦1.8)
    - Python(推薦Python2.6.X)
   安裝
    1)將下載好的datax.tar.gz上傳到hadoop101的/opt/software
      [kris@hadoop101 software]$ ls
        datax.tar.gz
    2)解壓datax.tar.gz到/opt/module
      [kris@hadoop101 software]$ tar -zxvf datax.tar.gz -C /opt/module/
    3)運行自檢腳本
      [kris@hadoop101 bin]$ cd /opt/module/datax/bin/
      [kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json

[kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json 
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2019-07-14 07:51:25.661 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-07-14 07:51:25.676 [main] INFO  Engine - the machine info  => 

        osInfo: Oracle Corporation 1.8 25.144-b01
        jvmInfo:        Linux amd64 2.6.32-642.el6.x86_64
        cpu num:        8

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2019-07-14 07:51:25.708 [main] INFO  Engine - 
{
        "content":[
                {
                        "reader":{
                                "name":"streamreader",
                                "parameter":{
                                        "column":[
                                                {
                                                        "type":"string",
                                                        "value":"DataX"
                                                },
                                                {
                                                        "type":"long",
                                                        "value":19890604
                                                },
                                                {
                                                        "type":"date",
                                                        "value":"1989-06-04 00:00:00"
                                                },
                                                {
                                                        "type":"bool",
                                                        "value":true
                                                },
                                                {
                                                        "type":"bytes",
                                                        "value":"test"
                                                }
                                        ],
                                        "sliceRecordCount":100000
                                }
                        },
                        "writer":{
                                "name":"streamwriter",
                                "parameter":{
                                        "encoding":"UTF-8",
                                        "print":false
                                }
                        }
                }
        ],
        "setting":{
                "errorLimit":{
                        "percentage":0.02,
                        "record":0
                },
                "speed":{
                        "byte":10485760
                }
        }
}


2019-07-14 07:51:35.892 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2019-07-14 07:51:35.896 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2019-07-14 07:51:35.897 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 07:51:35.898 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.018s |  All Task WaitReaderTime 0.047s | Percentage 100.00%
2019-07-14 07:51:35.899 [job-0] INFO  JobContainer - 
任務啟動時刻                    : 2019-07-14 07:51:25
任務結束時刻                    : 2019-07-14 07:51:35
任務總計耗時                    :                 10s
任務平均流量                    :          253.91KB/s
記錄寫入速度                    :          10000rec/s
讀出記錄總數                    :              100000
讀寫失敗總數                    :                   0
View Code

 

6. 使用案例

①從stream流讀取數據並打印到控制台

1)查看配置模板

[kris@hadoop101 bin]$ python datax.py -r streamreader -w streamwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

②從MySQL的導入和導出

  ②. 1 讀取MySQL中的數據存放到HDFS

    查看官方模板

[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

 編寫配置文件並執行:

[kris@hadoop101 job]$ vim mysql2hdfs 
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop101:3306/test"
                                ], 
                                "table": [
                                    "stu1"
                                ]
                            }
                        ], 
                        "username": "root", 
                        "password": "123456"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            }
                        ],  
                        "defaultFS": "hdfs://hadoop101:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "student.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}
View Code

 

2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.071s             | 0.071s             | 0.071s             
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.072s             | 0.072s             | 0.072s             

2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 09:02:55.925 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2019-07-14 09:02:55.927 [job-0] INFO  JobContainer - 
任務啟動時刻                    : 2019-07-14 09:02:43
任務結束時刻                    : 2019-07-14 09:02:55
任務總計耗時                    :                 12s
任務平均流量                    :                3B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   3
讀寫失敗總數                    :                   0

     結果展示,從Mysql成功寫入HDFS中:

注意:HdfsWriter實際執行時會在該文件名后添加隨機的后綴作為每個線程寫入實際文件名。

 

[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter  
[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter     

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mongodbreader document:
     https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": [], 
                        "collectionName": "", 
                        "column": [], 
                        "dbName": "", 
                        "userName": "", 
                        "userPassword": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

  ②. 2讀取HDFS數據寫入MySQL

  將上個案例上傳的文件改名

  [kris@hadoop101 datax]$ hadoop fs -mv /student.txt* /student.txt

  查看官方模板

  [kris@hadoop101 datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter

  創建配置文件

{ "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": ["*"], "defaultFS": "hdfs://hadoop102:9000", "encoding": "UTF-8", "fieldDelimiter": "\t", "fileType": "text", "path": "/student.txt" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "id", "name" ], "connection": [ { "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", "table": ["student2"] } ], "password": "000000", "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "1" } } } }
View Code

  在MySQL的datax數據庫中創建student2

  mysql> use test;

  mysql> create table stu1(id int,name varchar(20));

 執行任務,查看數據庫中已成功寫入:

  [kris@hadoop101 datax]$ bin/datax.py job/hdfs2mysql.json

③DataX導入導出案例

  ③.1讀取MongoDB的數據導入到HDFS

  編寫配置文件

[kris@hadoop101 datax]$ vim job/mongdb2hdfs.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": ["127.0.0.1:27017"], 
                        "collectionName": "atguigu", 
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "dbName": "test", 
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "mongo.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

mongodbreader參數解析

  address: MongoDB的數據地址信息,因為MonogDB可能是個集群,則ip端口信息需要以Json數組的形式給出。【必填】

  userName:MongoDB的用戶名。【選填】

  userPassword: MongoDB的密碼。【選填】

  collectionName: MonogoDB的集合名。【必填】

  column:MongoDB的文檔列名。【必填】

  name:Column的名字。【必填】

  type:Column的類型。【選填】

  splitter:因為MongoDB支持數組類型,但是Datax框架本身不支持數組類型,所以mongoDB讀出來的數組類型要通過這個分隔符合並成字符串。【選填】

執行
[kris@hadoop101 datax]$ bin/datax.py job/mongdb2hdfs.json

  ③.2 讀取MongoDB的數據導入MySQL

  在MySQL中創建表 mysql> create table kris(name varchar(20),url varchar(20));

編寫DataX配置文件

  [kris@hadoop101 datax]$ vim job/mongodb2mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": ["127.0.0.1:27017"], 
                        "collectionName": "kris", 
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "dbName": "test", 
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop101:3306/test", 
                                "table": ["kris"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

  執行:

[kris@hadoop101 datax]$ bin/datax.py job/mongodb2mysql.json

     查看結果:

mysql> select * from kris;

   ③.3 讀取MongoDB的數據導入HBase

   vim mongodb2hbase.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader",
                    "parameter": {
                        "address": ["x.x.x.x:28888"],
                        "userName": "root",
                        "userPassword": "xxxxt",
                        "collectionName": "shop",
                        "column": [
                            {
                                "name":"_id",
                                "type":"string"
                            },
                            {
                                "name":"shopNo",
                                "type":"string"
                            }
                        ],
                        "dbName": "posB",
                    }
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.rootdir": "hdfs://hadoop101:9000/hbase",
                            "hbase.cluster.distributed": "true",
                            "hbase.zookeeper.quorum": "hadoop101,hadoop102,hadoop103"
                        },
                        "table": "NS:shop",
                        "mode": "normal",
                        "rowkeyColumn": [
                           {
                              "index":0,
                              "type":"string"
                            }
                        ],
                        "column": [
                           {
                            "index":0,
                            "name": "cf1:id",
                            "type": "string"
                          },
                          {
                            "index":1,
                            "name": "cf1:shopNo",
                            "type": "string"
                          }
                        ],
                        "versionColumn":{
                            "index": "-1",
                            "value":"123456789"
                        },
                        "encoding": "utf-8"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM