DataX


DataX的安裝及使用

DataX 簡介

和 Sqoop 的功能類似,都是做離線采集的

Sqoop 是基於 MapReduce 的,分布式的

DataX 是基於 java 的,單機多線程的

DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。

DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。

開源地址

https://github.com/alibaba/DataX

DataX的安裝

DataX不需要依賴其他服務,直接上傳、解壓、安裝、配置環境變量即可

也可以直接在windows上解壓

隨便在哪個節點都行,只要有 Python 的環境即可,而 Linux 自帶 Python 環境

DataX的使用

stream2stream

本地測試是否安裝成功用的

編寫配置文件stream2stream.json
# stream2stream.json
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}
執行同步任務
datax.py stream2stream.json
執行結果

mysql2mysql

需要新建student2數據庫,並創建student2表

編寫配置文件mysql2mysql.json
{
    "job": {
        "content": [
            {
                "reader": { 	# 讀數據
                    "name": "mysqlreader", 	# 這個名字是固定的,不是自定義的
                    "parameter": {
                        "username": "root",	
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "splitPk": "age", 	# 多並行的時候可以指定一個分割的 key ,一般使用主鍵
                        "connection": [		# 連接
                            {
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            }
                        ]
                    }
                },
                "writer": {		# 寫數據
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "last_mod"
                        ],
                        "preSql": [		# 執行寫操作之前
                            "truncate student2"
                        ],                        
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student2"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6	# 並行度的意思,就像是啟動幾個管道的意思,多線程
            }
        }
    }
}
執行同步任務
datax.py mysql2mysql.json

mysql2hdfs

讀寫 hive 跟讀寫 hdfs 是一樣的

編寫配置文件mysql2hdfs.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz"
                        ],
                        "splitPk": "id",
                        "where":"age=23",	# 指定篩選條件,使用中文要注意編碼格式的問題
                        "connection": [
                            {
                                "table": [
                                    "student"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://master:9000",
                        "fileType": "text",
                        "path": "/user/data/student",
                        "fileName": "student",
                        "column": [
                            {
                                "name": "id",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "gender",
                                "type": "string"
                            },
                            {
                                "name": "clazz",
                                "type": "string"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": ","
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

需要先在hdfs中創建對應的目錄

hadoop dfs -mkdir -p /user/data/student

hbase2mysql

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hbase11xreader",	# 11x表示版本
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                        },
                        "table": "student",
                        "encoding": "utf-8",
                        "mode": "normal",
                        "column": [
                            {
                                "name": "rowkey",
                                "type": "string"
                            },
                            {
                                "name": "info:name",
                                "type": "string"
                            },
                            {
                                "name": "info:age",
                                "type": "int"
                            },
                            {
                                "name": "info:gender",
                                "type": "string"
                            },
                            {
                                "name": "info:clazz",
                                "type": "string"
                            }
                        ],
                        "range": {
                            "startRowkey": "",
                            "endRowkey": "",
                            "isBinaryRowkey": false
                        }
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz"
                        ],
                        "preSql": [
                            "truncate student2"
                        ],                        
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student2"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

mysql2hbase

mysql中的score表需將cource_id改為course_id,並將student_id、course_id設為主鍵

hbase需先創建score表:create 'score','cf1'

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "student_id",
                            "course_id",
                            "score"
                        ],
                        "splitPk": "student_id",
                        "connection": [
                            {
                                "table": [
                                    "score"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {
                      "hbaseConfig": {
                        "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                      },
                      "table": "score",
                      "mode": "normal",
                      "rowkeyColumn": [	# 將 student_id 和 course_id 作為 rowkey 
                          {
                            "index":0,	# student_id
                            "type":"string"
                          },
                          {
                            "index":-1,	# 分割方式
                            "type":"string",
                            "value":"_"
                          },
                          {
                            "index":1,	# course_id
                            "type":"string"
                          }
                      ],
                      "column": [
                        {
                          "index":2,
                          "name": "cf1:score",
                          "type": "int"
                        }
                      ],
                      "encoding": "utf-8"
                    }
                  }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

HDFSToHBase

將students.txt數據上傳至HDFS的/data/student1/目錄

在HBase中創建datax表:create 'datax','cf1'

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {		# 指定錯誤限制,超過這個限制任務就報錯結束了,出錯的條數不得超過總條數的0.02
                "record": 0,	# 出錯的行
                "percentage": 0.02	# 錯誤數據的百分比
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/data/student1/",
                        "defaultFS": "hdfs://master:9000",
                        "column": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            },
                            {
                                "index": 2,
                                "type": "string"
                            },
                            {
                                "index": 3,
                                "type": "string"
                            },
                            {
                                "index": 4,
                                "type": "string"
                            },
                            {
                                "index": 5,
                                "type": "string"
                            }
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                        },
                        "table": "datax",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": -1,
                                "type": "string",
                                "value": "_"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            }
                        ],
                        "column": [
                            {
                                "index": 2,
                                "name": "cf1:age",
                                "type": "string"
                            },
                            {
                                "index": 3,
                                "name": "cf1:gender",
                                "type": "string"
                            },
                            {
                                "index": 4,
                                "name": "cf1:clazz",
                                "type": "string"
                            },
                            {
                                "index": 5,
                                "name": "cf1:ts",
                                "type": "string"
                            }
                        ],
                        "versionColumn": {	# 指定數據版本字段
                            "index": 5 
                        },
                        "encoding": "utf-8"	# 指定編碼
                    }
                }
            }
        ]
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM