datax實戰


一、全量同步

1.簡單字段同步

本文以mysql -> mysql為示例:

   本次測試的表為mysql的系統庫-sakila中的actor表,由於不支持目的端自動建表,此處預先建立目的表:

CREATE TABLE `actor_copy` (
  `actor_id` smallint(5) unsigned NOT NULL AUTO_INCREMENT,
  `first_name` varchar(45) NOT NULL,
  `last_name` varchar(45) NOT NULL,
  `last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`actor_id`),
  KEY `idx_actor_last_name` (`last_name`)
) ENGINE=InnoDB AUTO_INCREMENT=201 DEFAULT CHARSET=utf8;

  通過官方快速開始提供的命令,可以查看配置模板:

 python datax.py -r {YOUR_READER} -w {YOUR_WRITER} python datax.py -r streamreader -w streamwriter

  打開dataX的mysqlreader以及mysqlwriter文檔,編寫JSON配置文件:(此處經過試驗,即使是自增主鍵,同樣需要配置,否則會報輸入輸出不匹配的錯),加上JSON配置文件的x權限

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "Zcc170821#",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update"
                        ],
                        "splitPk": "actor_id",
                        "connection": [
                            {
                                "table": [
                                    "actor"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://192.168.19.129:3306/sakila"
                                ]
                            }
                        ]
                    }
                },
              "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "Zcc170821#",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update"
                        ],
                        "preSql": [
                            "truncate table actor_copy"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.19.129:3306/sakila",
                                "table": [
                                    "actor_copy"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

這樣,單表的最基本全量同步就完成了!

  通過python 命令運行即可:

python datax.py ../job/mysqltest.json

  2.增加常量與插入時間字段

    原表正常字段,目標表多出兩列:來源部門,插入時間。json配置如下:

      常量使用單引號,時間暫時未摸索到變量如何使用(以下通過啟動腳本已更新方式),通過時間函數實現

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "'自動生成'",
                "NOW()"
                        ],
                        "splitPk": "actor_id",
                        "connection": [
                            {
                                "table": [
                                    "actor"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://hadoop01:3306/sakila"
                                ]
                            }
                        ]
                    }
                },
              "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "src",
                "load_time"
                        ],
                        "preSql": [
                            "truncate table actor_copy"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop01:3306/sakila",
                                "table": [
                                    "actor_copy"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

   2020.1.11,更新通過啟動腳本控制時間戳:

    首先Json配置更改為變量:(注意變量有個單引號!

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "'${src}'",
                "'${systime}'"
                        ],
                        "splitPk": "actor_id",
                        "connection": [
                            {
                                "table": [
                                    "actor"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://hadoop01:3306/sakila"
                                ]
                            }
                        ]
                    }
                },
              "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "actor_id",
                            "first_name",
                            "last_name",
                            "last_update",
                "src",
                "load_time"
                        ],
                        "preSql": [
                            "truncate table actor_copy"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop01:3306/sakila",
                                "table": [
                                    "actor_copy"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

    在datax的srcipts文件下新建一個啟動腳本:

#coding:UTF-8
from datetime import datetime import os import sys configFilePath = sys.argv[1] src = '自動生成' currentTime = format(datetime.now(), '%Y-%m-%d %H:%M:%S') script2execute = "python /opt/datax/bin/datax.py {0} -p \"-Dsrc='{1}' -Dsystime='{2}'\"".format( configFilePath, src, currentTime) os.system(script2execute)

  在srcipts下的啟動命令為:

    

python ./datax_start.py '/opt/datax/job/mysql_actor_copy_arg.json'

 

 二、增量同步

  增量同步的核心思路是時間戳,需要同步的表中要有Update_time字段:

  參考實現:https://www.jianshu.com/p/34b3a084d7d8

      https://blog.csdn.net/quadimodo/article/details/82186788

  增量數據和全量數據如何合並?使用full join

    https://blog.csdn.net/kx306_csdn/article/details/89508323

  當然如果有例如更新時間,修改時間字段,可以直接將增量表INTO入昨日全量,然后根據ID去重,取最新時間也是可以的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM