一 在HIVE中創建ETL數據庫
->create database etl;
二 在工程目錄下新建MysqlToHive.py 和conf文件夾
在conf文件夾下新建如下文件,最后的工程目錄如下圖

三 源碼
Import.xml
<?xml version="1.0" encoding="UTF-8"?> <root> <importtype> <value>add</value> <!-- 增量導入或者全導入 --> </importtype> <task type="all"> <table>user_all</table> <!-- 數據庫中需要增量導入的第一張表名 --> <table>oder_all</table> <!-- 數據庫中需要增量導入的第一張表名 --> </task> <task type="add"> <table>user_add</table> <!-- 數據庫中需要增量導入的第一張表名 --> <table>oder_add</table> <!-- 數據庫中需要增量導入的第一張表名 --> </task> </root>
oder_add.xml
<?xml version="1.0" encoding="UTF-8"?> <root> <sqoop-shell type="import"> <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 數據庫連接地址 --> <param key="username">root</param> <!-- 數據庫用戶名 --> <param key="password">123456</param> <!-- 數據庫密碼 --> <param key="table">oderinfo</param><!-- 數據庫中待導出的表名 --> <param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 --> <param key="hive-partition-key">dt</param> <!-- 通過時間分區 --> <param key="hive-partition-value">$dt</param> <param key="hive-import"></param> <param key="check-column">crt_time</param> <!-- 增量導入檢查的列 --> <param key="incremental">lastmodified</param> <!-- 按照時間簇來進行增量導入 --> <param key="last-value">23:59:59</param> <!-- 增量導入時間划分點 --> <param key="num-mappers">1</param> <!-- 使用map任務個數 --> <param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理 --> </sqoop-shell> </root>
oder_all.xml
<?xml version="1.0" encoding="UTF-8"?> <root> <sqoop-shell type="import"> <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 數據庫連接地址 --> <param key="username">root</param><!-- 數據庫用戶名 --> <param key="password">123456</param><!-- 數據庫密碼 --> <param key="table">oderinfo</param><!-- 數據庫中待導出的表名 --> <param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 --> <param key="hive-partition-key">dt</param> <!-- 通過時間分區 --> <param key="hive-partition-value">$dt</param> <param key="hive-import"></param> <param key="create-hive-table"></param> <!-- 在hive中新建一張同名同結構的表 --> <param key="hive-overwrite"></param> <!-- 覆蓋原來以存在的表 --> <param key="num-mappers">1</param> <!-- 使用map任務個數 --> <param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理 --> </sqoop-shell> </root>
user_add.xml
<?xml version="1.0" encoding="UTF-8"?> <root> <sqoop-shell type="import"> <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 數據庫連接地址 --> <param key="username">root</param> <!-- 數據庫用戶名 --> <param key="password">123456</param> <!-- 數據庫密碼 --> <param key="table">userinfo</param><!-- 數據庫中待導出的表名 --> <param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 --> <param key="hive-partition-key">dt</param> <!-- 通過時間分區 --> <param key="hive-partition-value">$dt</param> <param key="hive-import"></param> <param key="check-column">crt_time</param> <!-- 增量導入檢查的列 --> <param key="incremental">lastmodified</param> <!-- 按照時間簇來進行增量導入 --> <param key="last-value">23:59:59</param> <!-- 增量導入時間划分點 --> <param key="num-mappers">1</param> <!-- 使用map任務個數 --> <param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理 --> </sqoop-shell> </root>
user_all.xml
<?xml version="1.0" encoding="UTF-8"?> <root> <sqoop-shell type="import"> <param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 數據庫連接地址 --> <param key="username">root</param><!-- 數據庫用戶名 --> <param key="password">123456</param><!-- 數據庫密碼 --> <param key="table">userinfo</param><!-- 數據庫中待導出的表名 --> <param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 --> <param key="hive-partition-key">dt</param> <!-- 通過時間分區 --> <param key="hive-partition-value">$dt</param> <param key="hive-import"></param> <param key="create-hive-table"></param> <!-- 在hive中新建一張同名同結構的表 --> <param key="hive-overwrite"></param> <!-- 覆蓋原來以存在的表 --> <param key="num-mappers">1</param> <!-- 使用map任務個數 --> <param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理 --> </sqoop-shell> </root>
MysqlToHive.py
# _*_ coding:UTF-8 _*_
'''
Created on 2016��12��1��
@author: duking
'''
import datetime
import os
import xml.etree.ElementTree as ET
import collections
#獲取昨天時間
def getYesterday():
today=datetime.date.today()
oneday=datetime.timedelta(days=1)
yesterday=today-oneday
return yesterday
def Resolve_Conf(dt):
#獲取當前工程目錄
PROJECT_DIR = os.getcwd()
#獲得配置文件名
conf_file = PROJECT_DIR + "\conf\Import.xml"
#解析配置文件
xml_tree = ET.parse(conf_file)
#提取出本次導入的類型 全導入或者增量導入 通過配置import.xml中的plan標簽的value值設定
import_types = xml_tree.findall('./importtype')
for import_type in import_types:
aim_types = import_type.findall('./value')
for i in range(len(aim_types)):
aim_type = aim_types[i].text
#獲得task元素
tasks = xml_tree.findall('./task')
#用來保存待執行的sqoop命令的集合
cmds = []
for task in tasks:
#獲得導入類型,增量導入或者全量導入
import_type = task.attrib["type"]
#如果task的標簽導入類型與設定類型不同,結束本次循環
if(import_type != aim_type):
continue
#獲得表名集合
tables = task.findall('./table')
#迭代表名集合,解析表配置文件
for i in range(len(tables)):
#表名
table_name = tables[i].text
#表配置文件名
table_conf_file = PROJECT_DIR + "\conf\\" + table_name + ".xml"
#解析表配置文件
xmlTree = ET.parse(table_conf_file)
#獲取sqoop-shell 節點
sqoopNodes = xmlTree.findall("./sqoop-shell")
#獲取sqoop 命令類型
sqoop_cmd_type = sqoopNodes[0].attrib["type"]
#首先組裝成sqoop命令頭
command = "sqoop " + sqoop_cmd_type
#獲取
praNodes = sqoopNodes[0].findall("./param")
#用來保存param的信息的有序字典
cmap = collections.OrderedDict()
#將所有param中的key-value存入字典中
for i in range(len(praNodes)):
#獲取key的屬性值
key = praNodes[i].attrib["key"]
#獲取param標簽中的值
value = praNodes[i].text
#保存到字典中
cmap[key] = value
#迭代字典將param的信息拼裝成字符串
for key in cmap:
value = cmap[key]
#如果不是鍵值對形式的命令 或者值為空,跳出此次循環
if(value == None or value == "" or value == " "):
value = ""
if(key == "hive-partition-value"):
value = value.replace('$dt',str(dt))
#合成前一天的時間
if(key == "last-value"):
value = '"' + str(dt) + " " + value + '"'
#拼裝為命令
command += " --" + key + " " + value + " "
#將命令加入至待執行的命令集合
cmds.append(command)
return cmds
#python 模塊的入口:main函數
if __name__ == '__main__':
dt = getYesterday();
#解析配置文件,生成相應的HQL語句
cmds = Resolve_Conf(dt)
#迭代集合,執行命令
for i in range(len(cmds)):
cmd = cmds[i]
print cmd
#執行導入過秤
os.system(cmd)
