Datax3.0使用說明


原文鏈接:https://github.com/alibaba/DataX/blob/master/introduction.md

一、datax3.0介紹

1、DataX 是一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。

2、DataX3.0框架設計

DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。

1. Reader:數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
2. Writer:數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
3. Framework:用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。

3、DataX3.0核心架構

1. 核心模塊介紹:
- DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
- DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
- 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
- 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
- DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
2. DataX調度流程:
舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:
- DataXJob根據分庫分表切分成了100個Task。
- 根據20個並發,DataX計算共需要分配4個TaskGroup。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。

二、Datax3.0安裝部署

1、環境准備

Linux
jdk 1.8
python 2.7.5(datax是由python2開發的)

2、datax下載地址

https://github.com/alibaba/DataX?spm=a2c4e.11153940.blogcont59373.11.7a684c4fvubOe1

查看安裝成功:在bin目錄下執行 python datax.py ../job/job.json

3、查看配置文件

在bin目錄下已經給出了樣例配置,但不同的數據源配置文件不一樣。通過命令查看配置模板
# python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
示例:[xxx@xxxbin]$ python datax.py -r mysqlreader -w hdfswriter

4、Reader插件和Writer插件

DataX3.0版本提供的Reader插件和Writer插件,每種讀插件都有一種和多種切分策略

"reader": {
    "name": "mysqlreader", #從mysql數據庫獲取數據(也支持sqlserverreader,oraclereader)
    "name": "txtfilereader", #從本地獲取數據
    "name": "hdfsreader", #從hdfs文件、hive表獲取數據
    "name": "streamreader", #從stream流獲取數據(常用於測試)
    "name": "httpreader", #從http URL獲取數據
    }
"writer": {
    "name":"hdfswriter", #向hdfs,hive表寫入數據
    "name":"mysqlwriter ", #向mysql寫入數據(也支持sqlserverwriter,oraclewriter)
    "name":"streamwriter ", #向stream流寫入數據。(常用於測試)   
    }

5、json配置文件模板

1. 整個配置文件是一個job的描述;
2. job下面有兩個配置項,content和setting,其中content用來描述該任務的源和目的端的信息,setting用來描述任務本身的信息;
3. content又分為兩部分,reader和writer,分別用來描述源端和目的端的信息;
4. setting中的speed項表示同時起幾個並發去跑該任務。
1. mysql_to_hive示例
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "querySql": "", #自定義sql,支持多表關聯,當用戶配置querySql時,直接忽略table、column、where條件的配置。
                        "fetchSize": "", #默認1024,該配置項定義了插件和數據庫服務器端每次批量數據獲取條數,該值決定了DataX和服務器端的網絡交互次數,能夠較大的提升數據抽取性能,注意,該值過大(>2048)可能造成DataX進程OOM
                        "splitPk": "db_id", #僅支持整形型數據切分;如果指定splitPk,表示用戶希望使用splitPk代表的字段進行數據分片,如果該值為空,代表不切分,使用單通道進行抽取
                        "column": [], #"*"默認所有列,支持列裁剪,列換序
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://IP:3306/database?useUnicode=true&characterEncoding=utf8"], 
                                "table": [] #支持多張表同時抽取
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": "" #指定的column、table、where條件拼接SQL,可以指定limit 10,也可以增量數據同步,如果該值為空,代表同步全表所有的信息
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [], #必須指定字段名,字段類型,{"name":"","tpye":""}
                        "compress": "", #hdfs文件壓縮類型,默認不填寫意味着沒有壓縮。其中:text類型文件支持壓縮類型有gzip、bzip2;orc類型文件支持的壓縮類型有NONE、SNAPPY(需要用戶安裝SnappyCodec)。 
                        "defaultFS": "", #Hadoop hdfs文件系統namenode節點地址。
                        "fieldDelimiter": "", #需要用戶保證與創建的Hive表的字段分隔符一致
                        "fileName": "", #HdfsWriter寫入時的文件名,需要指定表中所有字段名和字段類型,其中:name指定字段名,type指定字段類型。 
                        "fileType": "", #目前只支持用戶配置為”text”或”orc”
                        "path": "", #存儲到Hadoop hdfs文件系統的路徑信息,hive表在hdfs上的存儲路徑
                        "hadoopConfig": {} #hadoopConfig里可以配置與Hadoop相關的一些高級參數,比如HA的配置。 
                        "writeMode": "" #append,寫入前不做任何處理,文件名不沖突;nonConflict,如果目錄下有fileName前綴的文件,直接報錯。 
                    }
                }
            }
        ],
        "setting": {
            "speed": { #流量控制
                "byte": 1048576, #控制傳輸速度,單位為byte/s,DataX運行會盡可能達到該速度但是不超過它
                "channel": ""  #控制同步時的並發數
                    }
            "errorLimit": { #臟數據控制
                "record": 0 #對臟數據最大記錄數閾值(record值)或者臟數據占比閾值(percentage值,當數量或百分比,DataX Job報錯退出
            }
        }
    }
}

2. hive_to_mysql示例
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [], #"*"默認所有列,指定Column信息時,type必須填寫,index/value必須選擇其一。 
                        "defaultFS": "", #hdfs文件系統namenode節點地址
                        "encoding": "UTF-8", #默認UTF-8
                        "nullFormat": "", #文本文件中無法使用標准字符串定義null(空指針),例如:nullFormat:”\N”,那么如果源頭數據是”\N”
                        "compress": "", #orc文件類型下無需填寫
                        "hadoopConfig": {}, #hadoopConfig里可以配置與Hadoop相關的一些高級參數,比如HA的配置。 
                        "fieldDelimiter": ",", #默認",";讀取textfile數據時,需要指定字段分割符,HdfsReader在讀取orcfile時,用戶無需指定字段分割符
                        "fileType": "orc", #文件的類型,目前只支持用戶配置為”text”、”orc”、”rc”、”seq”、”csv”。 
                        "path": "" #文件路徑,支持多文件讀取,可以使用"*",也可以指定通配符遍歷多文件,單文件只能單線程,多文件可以多線程,線程並發數通過通道數指定
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [], #必須指定,不能留空;如果要依次寫入全部列,使用表示, 例如: "column": [""],強烈不建議
                        "batchSize": "", #默認值1024 一次性批量提交的記錄數大小,該值可以極大減少DataX與Mysql的網絡交互次數,並提升整體吞吐量。但是該值設置過大可能會造成DataX運行進程OOM情況。
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://IP:3306/database?useUnicode=true&characterEncoding=utf8",
                                "table": [] #支持寫入一個或者多個表。當配置為多張表時,必須確保所有表結構保持一致。
                            }
                        ],
                        "password": "",
                        "preSql": [], #寫入數據到目的表前,會先執行這里的標准語句。例在導入表前先進行刪除操作:["delete from 表名"]
                        "postSql":[], #寫入數據到目的表后,會執行這里的標准語句。(原理同 preSql )
                        "session": [], #DataX在獲取Mysql連接時,執行session指定的SQL語句,修改當前connection session屬性
                        "username": "",
                        "writeMode": "" #默認insert ,可選insert/replace/update 
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
            "errorLimit": { #臟數據控制
                "record": 0 #對臟數據最大記錄數閾值(record值)或者臟數據占比閾值(percentage值,當數量或百分比,DataX Job報錯退出
            }
        }
    }
}

三、Datax3.0使用

# trail_pigeon導入hive

#hive里面建表

CREATE TABLE ods_db_bidata.trail_pigeon (
order_id int ,
order_apply_time string
)
stored as orc tblproperties ("orc.compress"="ZLIB");

 

#建shell腳本,執行python腳本前先清空目標表

#!/bin/bash

hive_db=ods_db_bidata
hive_table=trail_pigeon

 
         

hive -e "truncate table ${hive_db}.${hive_table}"

python /opt/app/datax/bin/datax.py /opt/app/datax/job/mysql2hive/trail_pigeon.json

 

#寫json配置文件
{
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"splitPk": "order_id",
"connection": [{
"jdbcUrl": ["jdbc:mysql://ip:3306/bidata?useUnicode=true&characterEncoding=utf8"],
"table": ["trail_pigeon"]
}],
"password": "password",
"username": "username",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [{"name": "order_id","type": "int"},
{"name": "order_apply_time","type": "string"}
],
"compress": "SNAPPY",
"defaultFS": "hdfs://192.168.0.127:8020",
"fieldDelimiter": "\u0001",
"fileName": "trail_pigeon",
"fileType": "orc",
"path": "/hive/warehouse/ods_db_bidata.db/trail_pigeon",
"writeMode": "nonConflict"
}
}
}],
"setting": {
"speed": {
"channel": "5"
}
}
}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM