hadoop項目實戰--ETL--(三)實現mysql表到HIVE表的全量導入與增量導入


一 在HIVE中創建ETL數據庫

  ->create database etl;

 

二 在工程目錄下新建MysqlToHive.py 和conf文件夾

  在conf文件夾下新建如下文件,最后的工程目錄如下圖

  

 

三 源碼

  Import.xml

<?xml version="1.0" encoding="UTF-8"?>
<root>
	<importtype>
		<value>add</value>    <!-- 增量導入或者全導入 -->
	</importtype>

	<task type="all">
		<table>user_all</table> <!-- 數據庫中需要增量導入的第一張表名 -->
		<table>oder_all</table> <!-- 數據庫中需要增量導入的第一張表名 -->
	</task>
	
	<task type="add">
		<table>user_add</table> <!-- 數據庫中需要增量導入的第一張表名 -->
		<table>oder_add</table> <!-- 數據庫中需要增量導入的第一張表名 -->
	</task>
	
</root>

  oder_add.xml

<?xml version="1.0" encoding="UTF-8"?>

<root>
	<sqoop-shell type="import">
		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 數據庫連接地址 -->
		<param key="username">root</param> <!-- 數據庫用戶名 -->
		<param key="password">123456</param> <!-- 數據庫密碼 -->
		<param key="table">oderinfo</param><!-- 數據庫中待導出的表名 -->
		<param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 -->
		<param key="hive-partition-key">dt</param>   <!-- 通過時間分區 -->
		<param key="hive-partition-value">$dt</param>
		<param key="hive-import"></param>
		<param key="check-column">crt_time</param> <!-- 增量導入檢查的列 -->
		<param key="incremental">lastmodified</param> <!-- 按照時間簇來進行增量導入 -->
		<param key="last-value">23:59:59</param> <!-- 增量導入時間划分點 -->
		<param key="num-mappers">1</param>   <!-- 使用map任務個數 -->
		<param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理  -->
	</sqoop-shell>
</root>

  oder_all.xml

<?xml version="1.0" encoding="UTF-8"?>

<root>
	<sqoop-shell type="import">
		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 數據庫連接地址 -->
		<param key="username">root</param><!-- 數據庫用戶名 -->
		<param key="password">123456</param><!-- 數據庫密碼 -->
		<param key="table">oderinfo</param><!-- 數據庫中待導出的表名 -->
		<param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 -->
		<param key="hive-partition-key">dt</param>   <!-- 通過時間分區 -->
		<param key="hive-partition-value">$dt</param>
		<param key="hive-import"></param>                  
		<param key="create-hive-table"></param>   <!-- 在hive中新建一張同名同結構的表 -->
		<param key="hive-overwrite"></param> <!-- 覆蓋原來以存在的表 -->
		<param key="num-mappers">1</param>   <!-- 使用map任務個數 -->
		<param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理  -->
	</sqoop-shell>
</root>

  user_add.xml

<?xml version="1.0" encoding="UTF-8"?>

<root>
	<sqoop-shell type="import">
		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 數據庫連接地址 -->
		<param key="username">root</param> <!-- 數據庫用戶名 -->
		<param key="password">123456</param> <!-- 數據庫密碼 -->
		<param key="table">userinfo</param><!-- 數據庫中待導出的表名 -->
		<param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 -->
		<param key="hive-partition-key">dt</param>   <!-- 通過時間分區 -->
		<param key="hive-partition-value">$dt</param>
		<param key="hive-import"></param>
		<param key="check-column">crt_time</param> <!-- 增量導入檢查的列 -->
		<param key="incremental">lastmodified</param> <!-- 按照時間簇來進行增量導入 -->
		<param key="last-value">23:59:59</param> <!-- 增量導入時間划分點 -->
		<param key="num-mappers">1</param>   <!-- 使用map任務個數 -->
		<param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理  -->
	</sqoop-shell>
</root>

  user_all.xml

  

<?xml version="1.0" encoding="UTF-8"?>

<root>
	<sqoop-shell type="import">
		<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 數據庫連接地址 -->
		<param key="username">root</param><!-- 數據庫用戶名 -->
		<param key="password">123456</param><!-- 數據庫密碼 -->
		<param key="table">userinfo</param><!-- 數據庫中待導出的表名 -->
		<param key="hive-database">etl</param> <!-- 指定導入到HIVE的哪個數據庫中 -->
		<param key="hive-partition-key">dt</param>   <!-- 通過時間分區 -->
		<param key="hive-partition-value">$dt</param>
		<param key="hive-import"></param>                  
		<param key="create-hive-table"></param>   <!-- 在hive中新建一張同名同結構的表 -->
		<param key="hive-overwrite"></param> <!-- 覆蓋原來以存在的表 -->
		<param key="num-mappers">1</param>   <!-- 使用map任務個數 -->
		<param key="split-by">id</param> <!-- 將表按照id水平切分交給map處理  -->
	</sqoop-shell>
</root>

  MysqlToHive.py

# _*_ coding:UTF-8 _*_
'''
Created on 2016��12��1��

@author: duking
'''
import datetime
import os
import xml.etree.ElementTree as ET
import collections

#獲取昨天時間
def getYesterday(): 
    today=datetime.date.today() 
    oneday=datetime.timedelta(days=1) 
    yesterday=today-oneday  
    return yesterday


def Resolve_Conf(dt):
    
    #獲取當前工程目錄
    PROJECT_DIR = os.getcwd()
    #獲得配置文件名
    conf_file = PROJECT_DIR + "\conf\Import.xml"
    #解析配置文件
    xml_tree = ET.parse(conf_file)
    
    #提取出本次導入的類型  全導入或者增量導入  通過配置import.xml中的plan標簽的value值設定
    import_types = xml_tree.findall('./importtype')
    for import_type in import_types:
        aim_types = import_type.findall('./value')
        for i in range(len(aim_types)):
            aim_type = aim_types[i].text
            
    #獲得task元素
    tasks = xml_tree.findall('./task')
    
    #用來保存待執行的sqoop命令的集合
    cmds = []
    
    for task in tasks:
        #獲得導入類型,增量導入或者全量導入
        import_type = task.attrib["type"]
        
        #如果task的標簽導入類型與設定類型不同,結束本次循環
        if(import_type != aim_type):
            continue

        #獲得表名集合
        tables = task.findall('./table')
        
        #迭代表名集合,解析表配置文件
        for i in range(len(tables)):
            #表名
            table_name = tables[i].text
            #表配置文件名
            table_conf_file = PROJECT_DIR + "\conf\\" + table_name + ".xml"
            
            #解析表配置文件
            xmlTree = ET.parse(table_conf_file)
            
            #獲取sqoop-shell 節點
            sqoopNodes = xmlTree.findall("./sqoop-shell")
            #獲取sqoop 命令類型
            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
            
            #首先組裝成sqoop命令頭
            command = "sqoop " + sqoop_cmd_type
                
            #獲取
            praNodes = sqoopNodes[0].findall("./param")
            
            #用來保存param的信息的有序字典
            cmap = collections.OrderedDict()            
            #將所有param中的key-value存入字典中
            for i in range(len(praNodes)):
                #獲取key的屬性值
                key = praNodes[i].attrib["key"]
                #獲取param標簽中的值
                value = praNodes[i].text
                #保存到字典中
                cmap[key] = value
             
            #迭代字典將param的信息拼裝成字符串
            for key in cmap:
                  
                value = cmap[key]
                 
                #如果不是鍵值對形式的命令 或者值為空,跳出此次循環
                if(value == None or value == "" or value == " "):
                    value = ""
                    
                if(key == "hive-partition-value"):
                    value = value.replace('$dt',str(dt))  
                #合成前一天的時間
                if(key == "last-value"):
                    value = '"' + str(dt) + " " + value + '"'
                        
                #拼裝為命令
                command += " --" + key + " " + value + " " 
                    
            #將命令加入至待執行的命令集合
            cmds.append(command)
        
    return cmds   
        

#python 模塊的入口:main函數
if __name__ == '__main__':
    
    dt = getYesterday();
    
    #解析配置文件,生成相應的HQL語句
    cmds = Resolve_Conf(dt)
    
    #迭代集合,執行命令
    for i in range(len(cmds)):
        cmd = cmds[i]
        print cmd
        #執行導入過秤
        os.system(cmd)

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM