datax二次開發


從hive抽取數據,寫入hbase

一、datax插件hbase12xwriter開發   

查看datax源碼,發現有hbase094xwriter和hbase11xwriter源碼,說明datax支持hbase寫入,再查看測試和生產環境使用的hbase版本是:hbase-1.2.0-cdh5.8.4

自己寫一個hbase12xwriter插件包

開發流程:

1、搭建項目模塊module

datax-all項目上右擊->New->other->Maven->Maven Module選中, Next

Module Name:hbase12xwriter

Parent Project:datax-all

Next 下一步

Next下一步:

Finish完成,項目中就多了一個模塊hbase12xwriter,並且 datax-all的pom.xml中<modules>結點最后會自動加一行<module>hbase12xwriter</module>

 

2、把原來hbase11xwriter包中代碼和pom文件拷貝到新建的模塊中

修改拷貝過來的pom.xml

hbase版本號修改為:1.2.0   hadoop版本號修改為:2.6.0

 

3.hbase12xwriter項目右擊->build path->configure build path->source添加 Folder 

src/main/resources

將hbase11xwriter/src/main/resources中

plugin_job_template.json

plugin.json拷貝過來

 

4.將src/main/下的assembly文件夾及package.xml拷貝過來,不然打包不成功

拷貝過來修改里面內容為hbase12xwriter

 

5.hbase12xwriter項目右擊->run as->Maven build->install

BUILD SUCCESS就表示編譯成功

得到jar包:hbase12xwriter-0.0.1-SNAPSHOT.jar

 

6.拷貝doc/hbase11xwriter.md過來hbase12xwriter修改

 

7.在hive/tmp庫下建測試表:
create table tmp.test_hive_to_hbase
(
id         int          comment 'ID',
code    string     comment '訂單編碼',
amt      float       comment '訂單金額',
time     string     comment '發貨時間'
)
comment '訂單表'  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE ; 

插入數據到表:

在目錄下cd /home/zallds/users/wcy

vi 000000_0

寫入2行數據:

10001,order-001,189.25,2018-01-09
10002,order-002,3900.80,2018-01-10

進入hadoop命令目錄:

    cd /data/hadoop-2.6.0-cdh5.8.4/bin

從本地文件系統中復制單個或多個源路徑到hdfs目標文件系統。

    hadoop fs -put /home/zallds/users/wcy/000000_0  /user/hive/warehouse/tmp.db/test_hive_to_hbase

這個時候在hive里查表:select * from tmp.test_hive_to_hbase發現有數據了

 

8.在hbase中創建表:

create  'test_hbase_write',{NAME=>'cf'}

create  'temp_pm_price',{NAME=>'cf'}

 

9.構造json文件

{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"parameter": {
"fileType": "text",
"fieldDelimiter": ",",
"column": [{
"index": "0",
"type": "int"
}, {
"index": "1",
"type": "string"
}, {
"index": "2",
"type": "float"
}, {
"index": "3",
"type": "string"
}],
"encoding": "UTF-8",
"path": "/user/hive/warehouse/tmp.db/test_hive_to_hbase",
"defaultFS": "hdfs://cluster"
},
"name": "hdfsreader"
},
"writer": {
"name": "hbase12xwriter",
"parameter": {
"hbaseConfig": {
"hbase.rootdir": "hdfs://cluster/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master"
},
"table": "test_hbase_write",
"mode": "normal",
"rowkeyColumn": [{
"index": 0,
"type": "id"
}],
"column": [{
"index": 0,
"name": "cf:id",
"type": "int"
},
{
"index": 1,
"name": "cf:code",
"type": "string"
},
{
"index": 2,
"name": "cf:amt",
"type": "float"
},
{
"index": 3,
"name": "cf:time",
"type": "string"
}
],
"encoding": "utf-8"
}
}
}]
}
}
 
 

其中hbaseConfig的配置要查看/data/hbase-1.2.0-cdh5.8.4/conf/hbase-site.xml里的配置項,

    "hbase.rootdir": "hdfs://cluster/hbase",
    "hbase.cluster.distributed": "true",
    "hbase.zookeeper.quorum": "master"

上測試環境查看hbase配置文件:/data/hbase-1.2.0-cdh5.8.4/conf/hbase-site.xml

在datax/job/下

vi 290.json

把json內容粘貼進去

 

10.在/home/datax/plugin/writer下創建目錄

[root@master writer]$ mkdir hbase12xwriter

把hbase12xwriter-0.0.1-SNAPSHOT.jar包上傳到hbase12xwriter 目錄下

 

在hbase12xwriter目錄下創建libs目錄,將本地項目的jar包全部上傳

把plugin_job_template.json和plugin.json文件拷貝進去

 

12.在datax/bin目錄下執行任務

python datax.py ../job/290.json

沒有報錯信息,但數據沒有進入hbase

再細看日志,發現報了錯:

 

原來是hdfsreader讀取數據時,用到了datax-common或plugin-unstructured-storage-util包里的工具類,沒有定義int類型和IntColumn類型,把hdfsreader中配置字段為int的改成long,hbase12xreader對應的reader還是配置int

就可以執行成功,並且hbase表中有數據,可都是缺少第一條數據

從http接口取抽取數據,到hive

問題梳理:

    1.http請求拿到返回結果,是json串

        輸入參數:

                1)http請求地址:URL   

                2)http請求參數:parameters  eg:  name='zs' & 

        返回結果:json串,解析成一條條記錄,

            將第條記錄解析成一個個字段值,

            寫入hive表:要知道 庫,表,表的字段名

    2.tbl_transfer表結構已經不適合該需求了,離線平台任務配置也不適合該需求了,要重新開發頁面來配置?表可否共用呢?擴展字段?

    target_db_type:hdfs  

    target_db_name:tmp

    target_table:tmp_user_http

    query_sql:不用填

    columns:同其他一樣,不過列的順序要按hdfs表中順序一致,eg:name:name,type:string;name:age,type:int;name:mobile,type:string;name:address,type:string

    需要增加字段:

        1)類型:標識是http接口到hdfs抽取 ;  source_db_type:http

        2)http接口URL地址;

        3)http接口參數

    3.dataxJson如何修改:

        根據抽取類型:http接口  來生成對應的json,源相關的字段用不到,要用到新增加字段httpUrl和params,和列columns來生成json

    4.zeus上任務該如何配呢?

        同其他任務一樣配置

 

zeus調度任務配置:

/home/bin/dump 內容:

#! /usr/bin/env bash
. /etc/profile
export DATAX_HOME=/data/datax

if [ $# == 2 ]; then
today=$2
else 
today=`date -d -1days '+%Y-%m-%d'`
fi

java -jar /home/software/dataxJson_product.jar $1 $today
if test $? -ne 0
then 
exit 11
fi


cd $DATAX_HOME/bin
python datax.py ../job/$1.json

 

/home/bin/hiveF 內容:

#! /usr/bin/env bash
. /etc/profile
command=`java -jar /home/lib/hiveF.jar $*`
echo "$command"
hive -e "$command"

 

 

 搭建httpreader模塊,加入datax-all的module中

1、開發流程:

 

2.部署流程:

1)在datax/plugin/reader/目錄下創建httpreader目錄

httpreader目錄下四個文件如下:

 

httpreader.jar包deploy拷貝進來,eclipse中libs下所有jar文件拷貝到libs目錄,兩個.json是模板文件,修改下

 

3.開發column列配置

 

4.考慮http到hdfs增量抽取,默認load_type為空,即是truncate,可選擇update和partUpdate  測試環境需要把ssh2連接的包ganymed-ssh2-build210.jar上傳到hdfswriter的libs下

 

mysql到hdfs支持增量導入

datax支持hive增量同步思想:

1.truncate 全量

2.復蓋:

    全量復蓋

    部分分區復蓋

3.更新

    全表更新

    分區更新

4.追加

    

分類:
    1.load_type==1 全量同步,無分區;(適用於小表,參數表,全刪全插)--->對應我們的truncate
    2.load_type==2 增量或者全量同步合並全量,按天分區;(適用於類似用戶訪問明細表,新增不修改,直接插入)
    3.load_type==3 增量同步合並全量,無分區(適用於類似用戶表,直接插入)--->對應我們的update
    4.load_type==4 增量同步,有分區,抽取數據為目標表多個分區全量數據;(適用於類似支付訂單表,按照分區字段覆蓋分區插入)
    5.load_type==5 增量同步,有分區;(適用於類似訂單表,按創建時間分區,最后修改時間抽取,按照非分區字段覆蓋分區插入)
 
 
 
        w_partition_name不為空    '''hive -e "use %s;alter table %s add if not exists partition(%s='%s') location '%s';"''' 
load_data_cmd = load_data_cmd % (job_info["w_database"],job_info["w_table"],job_info["w_partition_name"],job_info["w_partition_value"],load_path)
流程:
    1.創建hive分區
        1)計算分區,拿到分區值:分區默認按時間【年月日分區】 date=2018-06-05
        2)判斷該分區是否已經存在,存在的先刪除
        3)組裝拼成sql,執行hive -e sql,創建分區表
     2.   刪除創建臨時表
        1)if(load_type>1)   即load_type=2,3,4,5時,
            a)先刪除臨時表:hive -e drop table if exists temp.temp_table;
            b)創建臨時表:hive -e  create_sql
        create_sql=use r_database;

                           create table table_name(  

  1. col1 string,  
  2. col2 string,  
  3. col3 string,  
  4. col4 string,  
  5. col5 string)ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'   
  6. WITH SERDEPROPERTIES ( "input.regex" = "正則表達式" ) STORED AS TEXTFILE;  
    3.構建路徑
        1)if(load_type==1) ,
            a)系統默認使用時間分區【年月日】,計算分區值,拼路徑load_path=hdfs://SAD-HDP-003:9000/user/hive/warehouse/庫名.db/表名/分區【date=2018-06-06】
            b)拼裝datax.py -p命令
                datax.py -p '-Dr_host=%s -Dr_port=%s -Dr_username=%s -Dr_password=%s -Dr_database=%s -Dr_table=%s -Dr_where=%s -Dr_sql=%s -Dr_columns=%s
                    -Dr_encoding=%s -Dr_params=%s -Dr_concurrency=%s -Dw_dir=load_path -Dprefix_filename=%s -Dw_encoding=%s -Dw_buffer_size=%s -Dw_del_mode=%s 
                    -Dw_concurrency=%s ' ROOT_PATH /job/js_mysql_to_hdfs_job.xml
        2)if(load_type>1),
            a)根據hive表名構建hdfs臨時表路徑
                load_path = "hdfs://nameservice1/user/hive/warehouse/temp.db/" + table_name
            b)拼裝datax.py -p命令
                datax.py -p '-Dr_host=%s -Dr_port=%s -Dr_username=%s -Dr_password=%s -Dr_database=%s -Dr_table=%s -Dr_where=%s -Dr_sql=%s -Dr_columns=%s
                    -Dr_encoding=%s -Dr_params=%s -Dr_concurrency=%s -Dw_dir=load_path -Dprefix_filename=%s -Dw_encoding=%s -Dw_buffer_size=%s -Dw_del_mode=%s 
                    -Dw_concurrency=%s ' ROOT_PATH /job/js_mysql_to_hdfs_job.xml
    4.執行命令,抽取數據 datax etl
        1)替換datax.py -p命令中replace("$START_TIME",date)
        2)替換datax.py -p命令中replace("$YM_TEXT",ym)
        3)替換datax.py -p命令中replace("$LAST_YM_TEXT",last_ym)
        4)替換datax.py -p命令中replace("$END_TIME",date_after)
       然后執行命令
    5.后序處理:
        1)指定表的location
            if(load_type<=1),
                a)默認分區,/date=年-月-日
                b)添加一個分區:
                    hive -e use w_database;alter table w_table add if not exists partition(date='w_partition_value') location load_path;
                c)執行命令
            if(load_type>1),
                a)loady_type>1,即是2,3,4,5都是增量,
                    hive -e use temp; ALTER table %s  set  location '%s'; ("temp",job_info_map["temp_table"],load_path)
                b)執行命令
        2)合並處理:merge
            a)if(load_type==2)
                i)處理columns拼接上date:即id,name,password,date這樣的
                ii)拼裝hive_sql:
                    hive -e set set hive.exec.dynamic.partition.mode=nonstrict;
                                set hive.exec.max.dynamic.partitions.pernode=1000;
                                INSERT OVERWRITE TABLE  w_database.w_table  PARTITION(w_partition_name)  select colums_str from temp.temp_table
                iii)執行hive_sql命令
            b)if(load_type==3,4,5) 暫時不考慮
                
                
 
 
mysql->hive
1.默認writeMode:truncate  全量導入(二次開發:先刪,后加,全量)
2.writeMode:partAppend   分區增量更新
3.writeMode:append                不分區增量追加(原始,不合並,不更新,只追加)
4.writeMode:update                不分區增量更新(合並,更新,追加)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM