datax將mysql數據導入hive表


環境:CDH 5.12.1版本 ,mysql 5.7

1、mysql表結構

 2、mysql表數據(user)

 3、下載datax

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

4、在datax的job目錄編寫一個mysql2hive.json文件

a) 下面是全量導入

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "age",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.75.101:3306/test"
                                ],
                                "table": [
                                    "user"
                                ]
                            }
                        ],
                        "password": "yang156122",
                        "username": "root",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "create_time",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.168.75.101:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "user",
                        "fileType": "text",
                        "path": "/user/datax/data/ceshi",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

b) 下面是按指定的時間,增量導入

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "age",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.75.101:3306/test"
                                ],
                                "table": [
                                    "user"
                                ]
                            }
                        ],
                        "password": "yang156122",
                        "username": "root",
                        "where": "create_time >= '2020-10-21'"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "create_time",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.168.75.101:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "user",
                        "fileType": "text",
                        "path": "/user/datax/data/ceshi",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

c)動態傳參,增量導入(推薦看這個)

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [ "id", "name",
                            "age", "create_time" ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.75.101:3306/test"
                                ],
                                "table": [
                                    "user"
                                ]
                            }
                        ],
                        "password": "yang156122",
                        "username": "root",
                        "where": "create_time >= '$date'"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "create_time", "type": "TIMESTAMP"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.168.75.101:8020",  "fieldDelimiter": "\t",
                        "fileName": "user",
                        "fileType": "text",
                        "path": "/user/datax/data/ceshi",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

5、創建hive的表

drop table if exists default.user;
create table default.user(id INT, name STRING , age INT , create_time TIMESTAMP
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

6、如果是增量導入(包括動態傳參),每執行一次datax,都要進行load data

load data inpath '/user/datax/data/ceshi' into table default.user ;

7、這一步,僅針對動態傳參,增量導入(可以忽略步驟6),vim start.sh

#! /bin/bash
echo "獲取前一天的時間,時間格式為2020-10-21" a
=`date -d yesterday -u +%Y-%m-%d` echo "開始啦" python /root/data/soft/datax/datax/bin/datax.py -p "-Ddate=${a}" /root/data/soft/datax/datax/job/mysql2hive.json sleep 10 echo "開始將數據入hive表" hive -e "load data inpath '/user/datax/data/ceshi' into table default.user;"

8、執行 sh start.sh 

9、查看數據

hive 
use default;
select * from user;

 

僅供參考.....如有問題,請留言....

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM