關於大數據T+1執行流程


關於大數據T+1執行流程

前提: 搭建好大數據環境(hadoop hive hbase sqoop zookeeper oozie hue)

 

1.將所有數據庫的數據匯總到hive (這里有三種數據源 ORACLE  MYSQL SEQSERVER)

全量數據抽取示例:

ORACLE(注意表名必須大寫!!!)

sqoop import --connect jdbc:oracle:thin:@//10.11.22.33:1521/LPDR.china.com.hh --username root --password 1234 \
--table DATABASENAME.TABLENAME --hive-overwrite --hive-import --hive-database bgda_hw --hive-table lp_tablename \
--target-dir /user/hadouser_hw/tmp/lp_tablename --delete-target-dir \
--null-non-string '\\N' --null-string '\\N' \
--hive-drop-import-delims --verbose --m 1

  

MYSQL:

sqoop import --connect jdbc:mysql://10.33.44.55:3306/DATABASEBANE --username ROOT --password 1234 \
--query 'select * from DEMO t where t.DATE1 < current_date and $CONDITIONS' \
--hive-overwrite --hive-import --hive-database bgda_hw --hive-table DEMO \
--target-dir /user/hadouser_hw/tmp/DEMO --delete-target-dir \
--null-non-string '\\N' --null-string '\\N' \
--hive-drop-import-delims --verbose --m 1

  

SQLSERVER:

sqoop import --connect 'jdbc:sqlserver://10.55.66.15:1433;username=ROOT;password=ROOT;database=db_DD' \
--query 'select * from TABLE t where t.tasktime < convert(varchar(10),getdate(),120) and $CONDITIONS' \
--hive-overwrite --hive-import --hive-database bgda_hw --hive-table TABLENAME \
--target-dir /user/hadouser_hw/tmp/TABLENAME --delete-target-dir \
--null-non-string '\\N' --null-string '\\N' \
--hive-drop-import-delims --verbose --m 1

 

2.  編寫hive腳本,對數據進行處理

說明:

data 存儲T+1跑出來的數據信息,只存一天的數據量

data_bak : 存儲所有的數據信息

 

(初始化腳本)

use bgda_hw;
set hive.auto.convert.join=false;

drop table data_bak;
create table data_bak(
 scanopt   string 
 ,scanoptname   string 
 ,statisdate   string 
) row format delimited fields terminated by '\001'; 



insert overwrite table data_bak
SELECT 
a.scanopt 
,x0.name  as scanoptname
,to_date(a.scandate) as statisdate
from bgda_hw.scan  a 
left outer join bgda_hw.user x0 on x0.userid = a.scanopt 
where 1=1  
and datediff(a.scandate,'2019-01-01' )>=0
and datediff(a.scandate,'2019-09-20' )<0
GROUP BY  a.scanopt,x0.name,a.scandate
order by a.scandate
;

  

(t+1腳本)

use bgda_hw; 
set hive.auto.convert.join=false;

drop table data;
create table data(
 scanopt   string 
 ,scanoptname   string 
 ,statisdate   string 
) row format delimited fields terminated by '\001'; 


insert overwrite table data
SELECT 
a.scanopt 
,x0.name  as scanoptname
,to_date(a.scandate) as statisdate
from bgda_hw.scan  a 
left outer join bgda_hw.user x0 on x0.userid = a.scanopt 
where 1=1 
and a.scandate<date_add(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),0)
and a.scandate>=date_add(from_unixtime(unix_timestamp(),'yyyy-MM-dd'),-1)
GROUP BY  a.scanopt,x0.name,a.scandate
order by a.scandate
;

insert into table data_bak
select * from data
;

  

3.將結果數據抽取到結果庫里

sqoop export \
--connect jdbc:mysql://10.6.11.11:3306/report \
--username root \
--password 1234 \
--table data \
--export-dir /user/hive/warehouse/bgda_hw.db/data \
--columns scanopt,scanoptname,statisdate \
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--input-null-string '\\N' \
--input-null-non-string '\\N'

 

抽數腳本示例 (腳本中的insert.hql 則是上方定義的hive腳本信息)

#!/bin/bash

export CDH_PARCEL=/var/opt/cloudera/parcels/CDH/bin/
export PATH=${PATH}:${CDH_PARCEL}
export PYTHON_EGG_CACHE=~/.python-eggs

#kinit to user hadouser_hw
kinit -kt hadouser_hw.keytab hadouser@HADOOP-AD-ROOT.DC

echo "$CDH_PARCEL: {CDH_PARCEL} "
echo "$PATH: {PATH} "
echo "$PYTHON_EGG_CACHE: {PYTHON_EGG_CACHE} "

#sqoop import full data from mssql database to hdfs
set -x

beeline -u "jdbc:hive2://10.20.33.44:10000/default;principal=hive/sssssss012@HADOOP-AD-ROOT.DC" -f insert.hql

# 將數據抽取到mysql 結果數據 原數據
sqoop export \
--connect jdbc:mysql://10.6.11.15:3306/report \
--username root \
--password 1234 \
--table rs_kpitime_psdata \
--export-dir /user/hive/warehouse/bgda_hw_stg.db/rs_kpitime_psdata_bak \
--columns aplcustno,isapprv,statisdate,statisyear,statisquarter,statismonth,countdate \
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--input-null-string '\\N' \
--input-null-non-string '\\N'

ret=$?
set +x

if [[ $ret -eq 0 ]];then
  echo "insert table OK"
else
  echo "insert table failed!!!Please check!!!"
  exit $ret
fi

  

4.定義調度信息(oozie),每天定時跑出結果數據,自動抽取到結果庫中

HUE的基本使用

 

 

 

 

定義工作流信息

先進入workflow

 

 

開始定義

 

 

選定要執行的腳本 (圖片中提到的keytab 是一個認證文件)

 

 

定義定時任務

先進入定時任務頁面

 

 新建定時任務

 

定時任務詳細定義(點擊Options ,選擇ShangHai時區,然后定義任務執行時長(例如 從2019年到2099年,最后保存,保存好后記得點擊執行!!!))

 

5.配置可視化組件展示數據 saiku

這部分詳細教程請參考  https://www.cnblogs.com/DFX339/tag/saiku/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM