一、銷售案例步驟
(一)ODS層
- 建立源數據庫並生成初始的數據
- 在Hive中創建源數據過渡區和數據倉庫的表
- 日期維度的數據裝載
- 數據的ETL => 進入dwd層,本案例簡單,不需要清洗
(二)DW層
- dwd層:ETL清洗,本案例不需要
- dws層:建模型+輕聚合,本案例只需要建模型,太簡單,不需要聚合。
- 輕聚合后建模 => 星型模型【注意,是輕聚合后,成為星型模型】
(三)DM層
- dm層:-> 寬表
- 存放在hive -> 太慢!適合復雜計算,用來機器學習/數據挖掘
- 存放在mysql/oracle等分析型數據庫 -> 快!用來數據分析
- 接口暴露:springboot 暴露接口
數據倉庫分層
- ODS(operational Date store) 源數據層
- DW(Data WareHouse) 數據倉庫層
- DM(Data Market) 數據集市層
二、數據倉庫之 構建步驟
(一)ODS層
(1)建立源數據庫mysql並生成初始的數據
/***************************************************** create database sales_source ******************************************************/ drop database if exists sales_source; create database sales_source default charset utf8 collate utf8_general_ci; use sales_source; /***************************************************** create table ******************************************************/ -- Table:Customer drop table if exists Customer; create table customer( customer_number int primary key not null auto_increment, customer_name varchar(32) not null, customer_street_address varchar(256) not null, customer_zip_code int not null, customer_city varchar(32) not null, customer_state varchar(32) not null ); -- Table:Product drop table if exists product; create table product( product_code int primary key not null auto_increment, product_name varchar(128) not null, product_category varchar(32) not null ); -- Table:Sales_order drop table if exists sales_order; create table sales_order( order_number int primary key not null auto_increment, customer_number int not null, product_code int not null, order_date date not null, entry_date date not null, order_amount int not null ); -- add constraint alter table sales_order add constraint fk_cust_order foreign key (customer_number) references customer(customer_number); alter table sales_order add constraint fk_product_order foreign key (product_code) references product(product_code);
/************************************************* insert data ***********************************************/ -- insert customer insert into customer ( customer_name,customer_street_address,customer_zip_code, customer_city,customer_state ) values ('Big Customers','7500 Louise Dr.',17050,'Mechanicsbrg','PA'), ('Small Stroes','2500 Woodland St.',17055,'Pittsubtgh','PA'), ('Medium Retailers','1111 Ritter Rd.',17055,'Pittsubtgh','PA'), ('Good Companies','9500 Scott St.',17050,'Mechanicsbrg','PA'), ('Wonderful Shops','3333 Rossmoyne Rd.',17050,'Mechanicsbrg','PA'), ('Loyal Clients','7070 Ritter Rd.',17055,'Mechanicsbrg','PA'); -- insert product insert into product (product_name,product_category) values ('Hard Disk','Storage'), ('Floppy Driver','Storage'), ('Icd panel','monitor'); -- insert sales_orders -- customer_numer int,product_code int,order_date,entry_date,order_amount drop procedure if exists proc_generate_saleorder; delimiter $$ create procedure proc_generate_saleorder() begin -- create temp table drop table if exists temp; create table temp as select * from sales_order where 1=0; -- declare var set @begin_time := unix_timestamp('2018-1-1'); set @over_time := unix_timestamp('2018-11-23'); set @i :=1; while @i <= 100000 do set @cust_number := floor(1+rand()*6); set @product_code := floor(1+rand()*3); set @tmp_data := from_unixtime(@begin_time+rand()*(@over_time-@begin_time)); set @amount := floor(1000+rand()*9000); insert into temp values(@i,@cust_number,@product_code,@tmp_data,@tmp_data,@amount); set @i := @i+1; end while; -- clear sales_orders truncate table sales_order; insert into sales_order select null,customer_number,product_code,order_date,entry_date,order_amount from temp; commit; drop table temp; end$$ delimiter ; call proc_generate_saleorder();
PS:
1.為什么要用constraint約束? 詳見 => https://www.cnblogs.com/sabertobih/p/13966709.html
2.為什么存儲過程中涉及批量插表的時候要用到臨時表?
- 已知commit一次是從內存表到物理表的過程,用不用臨時表有什么不一樣?
- 答:關鍵在於temp表是新create的表,對於新create的表,insert into是在內存里完成;
- 而對於早就存在的表,mysql默認每次insert語句都是一次commit,所以右上圖是不正確的,應該是commit了100000次。
(2)在Hive中創建源數據過渡區和數據倉庫的表
關於本案例中,hive的主鍵問題:
>>> 因為數據來源於數據庫,天生自帶主鍵當作sk,不用自己生成
>>> 否則,需要使用 row_number/ uuid/ md5生成主鍵,見 https://www.cnblogs.com/sabertobih/p/14031047.html
inithive.sql => 創建hive表!
drop database if exists ods_sales_source cascade; create database ods_sales_source; use ods_sales_source; drop table if exists ods_product; create table ods_product( product_code string, product_name string, product_category string, version string, ods_start_time string, ods_end_time string ) row format delimited fields terminated by '\u0001'; drop table if exists ods_customer; create table ods_customer( customer_number string, customer_name string, customer_street_address string, customer_zip_code string, customer_city string, customer_state string, version string, ods_start_time string, ods_end_time string ) row format delimited fields terminated by '\u0001'; drop table if exists ods_origin_sales_order; create table ods_origin_sales_order( order_number string, customer_number string, product_code string, order_date string, order_amount string ); drop table if exists ods_dynamic_sales_order; create table ods_dynamic_sales_order( order_number string, customer_number string, product_code string, order_date string, order_amount string ) partitioned by (ymd string);
import_hive.sh => 直接執行,直接實現從創表到自動從mysql中insert語句到hive
#! /bin/bash if [ $# = 0 ];then hive -f /opt/data/inithive.sql fi echo "import ods_product..."; #global import product sqoop import \ --connect jdbc:mysql://192.168.56.111:3306/sales_source \ --driver com.mysql.jdbc.Driver \ --username root \ --password root \ --query "select product_code,product_name,product_category,'1.0' as version,'2018-1-1' as ods_start_time,'9999-12-31' as ods_end_time from product where 1=1 and \$CONDITIONS" \ --target-dir /mytmp/ods/pro \ --hive-import \ --hive-database ods_sales_source \ --hive-table ods_product \ --hive-overwrite \ -m 1 echo "import ods_customer..."; #global import customer sqoop import \ --connect jdbc:mysql://192.168.56.111:3306/sales_source \ --driver com.mysql.jdbc.Driver \ --username root \ --password root \ --query "select customer_number,customer_name,customer_street_address,customer_zip_code,customer_city,customer_state,'1.0' as version,'2018-1-1' as ods_start_time,'9999-12-31' as ods_end_time from customer where 1=1 and \$CONDITIONS" \ --hive-import \ --target-dir /mytmp/ods/cust \ --hive-database ods_sales_source \ --hive-table ods_customer \ --hive-overwrite \ -m 1
(3)日期維度的數據裝載
如何自動導入分區表?
方法一(不推薦):使用sqoop手動分區,注意sqoop partition不可以帶有特殊符號,日期只可以%Y%m%d
echo "import sales_order..." #increment import sales_order #partition day=1 md=`date -d '2018-10-23' +%j` while [ $day -lt $md ] do mdd=`date -d "2018-1-1 +$day day" +%Y%m%d` hive -e "use ods_sales_source;alter table ods_start_order add partitioned(ymd=$mdd)" sqoop import \ --connect jdbc:mysql://192.168.56.111:3306/sales_source \ --driver com.mysql.jdbc.Driver \ --username root \ --password root \ --query "select order_number,customer_number,product_code,order_date,order_amount from sales_order where date_format(order_date,'%Y%m%d')=$mdd and \$CONDITIONS" \ --target-dir /mytmp/so \ --delete-target-dir --hive-import \ --hive-database ods_sales_source \ --hive-table ods_sales_order \ --hive-partition-key "ymd" \ --hive-partition-value "$mdd" \ -m 1 let day=$day +1 done
方法二:使用hive動態分區,先在hive中導入一個全量表,再從全量表==動態分區==>導入分區表
# 全量導入 echo "import ods_origin_sales_order..." sqoop import \ --connect jdbc:mysql://192.168.56.111:3306/sales_source \ --driver com.mysql.jdbc.Driver \ --username root \ --password root \ --query "select order_number,customer_number,product_code,order_date,order_amount from sales_order where \$CONDITIONS" \ --hive-import \ --target-dir /mytmp/ods/so \ --hive-database ods_sales_source \ --hive-table ods_origin_sales_order \ --hive-overwrite \ -m 1 echo "import dynamic..." hive -f /opt/data/dynamic.sql
-- 動態分區自動導入 use ods_sales_source; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions.pernode.Maximum=10000; set hive.exec.max.dynamic.partitions=10000; set hive.exec.max.created.files=10000; insert into ods_dynamic_sales_order partition(ymd) select order_number,customer_number,product_code,order_date,order_amount, date_format(order_date,'yyyyMMdd') as ymd from ods_origin_sales_order;
(4)數據的ETL
(二)DW層
(1)SQL:建dw層表語句
可以看到只有一張customer表,一張product表,說明給的數據是已經聚合后的!
createdwdinit.sql
-- 建表語句 -- 其中date表是外部表,且以textfile形式存儲,方便映射數據 drop database if exists DW_SALES_SOURCE cascade; create database dw_sales_source; use dw_sales_source; drop table if exists dwd_dim_customer; create table dwd_dim_customer( customer_sk int, customer_number int, customer_name string, customer_street_address string, custom_zip_code string, customer_city string, customer_state string, version string, effectice_date string, expiry_date string ) row format delimited fields terminated by ',' stored as parquetfile; drop table if exists dwd_dim_product; create table dwd_dim_product( product_sk int, product_code int, product_name string, product_category string, version string, effectice_date string, expiry_date string ) row format delimited fields terminated by ',' stored as parquetfile; drop table if exists dwd_dim_order; create table dwd_dim_order( order_sk int, order_number int, version string, effectice_date string, expiry_date string ) row format delimited fields terminated by ',' stored as parquetfile; drop table if exists dwd_dim_date; create external table dwd_dim_date( date_sk int, d_date string, d_month int, d_month_name string, d_quarter int, d_year int ) row format delimited fields terminated by ',' stored as textfile location '/opt/dwdate'; drop table if exists dwd_fact_sales_order; create table dwd_fact_sales_order( order_sk int, customer_sk int, product_sk int, date_sk int, order_amount float ) row format delimited fields terminated by ',' stored as parquetfile;
(2)SQL:ods層導入dw層數據:customer/product/order
dwd_import.sql
insert into dw_sales_source.dwd_dim_customer select customer_number as customer_sk, customer_number, customer_name , customer_street_address , customer_zip_code , customer_city , customer_state , version , ods_start_time as effectice_date, ods_end_time as expiry_date from ods_sales_source.ods_customer; insert into dw_sales_source.dwd_dim_product select product_code as product_sk, product_code , product_name , product_category , version, ods_start_time as effectice_date, ods_end_time as expiry_date from ods_sales_source.ods_product; insert into dw_sales_source.dwd_dim_order select order_number as order_sk, order_number, '1.0' as version, order_date as effectice_date, '9999-12-31' as expiry_date from ods_sales_source.ods_dynamic_sales_order
(3)腳本文件,生成date數據,導入dwd_fact_sales_order表
#!/bin/bash #創建dw層 echo '***********************************' echo 'create dw layout data table...' echo '***********************************' hive -f /opt/data/dw/createdwdinit.sql echo '***********************************' echo 'import data...' echo '***********************************' hive -f /opt/data/dw/dwd_import.sql echo '***********************************' echo 'generating dwd_date data...' echo '***********************************'
## hdfs判斷是否有文件或目錄 targetfilename=/opt/dwdate hdfs dfs -test -e $targetfilename if [ $? -eq 0 ] ;then echo 'exist' hdfs dfs -rm -R $targetfilename fi
## linux判斷是否有文件 filename=/opt/data/tmp if [ -e $filename ]; then rm -rf $filename fi touch $filename num=0 while(( $num<365 )) do dt=`date -d "2018-01-01 $num days" +%Y-%m-%d` mtname=`date -d "${dt}" +%B` mt=`date -d "${dt}" +%m` if [ $mt -le 3 ]; then qt=1 elif [ $mt -le 6 ]; then qt=2 elif [ $mt -le 9 ]; then qt=3 else qt=4 fi let num=$num+1 echo "${num},${dt},${dt:5:2},$mtname,$qt,${dt:0:4}" >> $filename #hive -e'insert into dw_sales_source.dwd_dim_date values($num+1,${dt},${dt:5:2},$mtname,$qt,${dt:0:4})' done echo "date data從本地移動到hdfs" hdfs dfs -rm -R /opt/dwdate hdfs dfs -mkdir -p /opt/dwdate hdfs dfs -put /opt/data/tmp /opt/dwdate echo '***********************************' echo 'import fact_sales_order data...' echo '***********************************' hive -e 'insert into dw_sales_source.dwd_fact_sales_order select oso.order_number as order_sk, oso.customer_number as customer_sk, oso.product_code as product_sk, dss.date_sk as date_sk, oso.order_amount as order_amount from
ods_sales_source.ods_dynamic_sales_order oso inner join dw_sales_source.dwd_dim_date dss on oso.order_date = dss.d_date'
(三)DM層
(1)如何形成寬表?
① 需求:
>>>
當天-> 顧客,產品,日期,訂單個數,當天金額 && 近兩天 -> 訂單個數,近兩天金額
<<<
② 調優見:https://www.cnblogs.com/sabertobih/p/14041854.html
③ 代碼:
dm_init.sql
drop database if exists dm_sales_source cascade; create database dm_sales_source; use dm_sales_source;
dm_run.sql
drop table if exists dm_sales_source.dm_sales_order_count; create table dm_sales_source.dm_sales_order_count as select dss.d_date,d.customer_sk,d.product_sk, count(d.order_sk) as order_num, sum(d.order_amount) as order_dailyamount, sum(sum(d.order_amount)) over(rows between 1 PRECEDING and current row) as recent_amount, sum(count(d.order_sk)) over(rows between 1 PRECEDING and current row) as recent_num from dw_sales_source.dwd_fact_sales_order d inner join dw_sales_source.dwd_dim_date dss on d.date_sk = dss.date_sk group by dss.d_date,d.customer_sk,d.product_sk order by dss.d_date
init.sh
#!/bin/bash hive -f /opt/data/dm/dm_init.sql hive -f /opt/data/dm/dm_run.sql
(2)sqoop從hdfs導出mysql(如果是orc等壓縮格式,老實用Java!)
① sqoop:適用於textffile
如何查看某個table存放在hdfs什么地方? show create table dm_sales_source.dm_sales_order_count;
!hdfs dfs -text /hive110/warehouse/dm_sales_source.db/dm_sales_order_count/000000_0
mysql中:
drop database if exists dmdb ; create database dmdb; use dmdb; create table dm_sales_order_count( d_date varchar(20), customer_sk int, product_sk int, order_num int, order_dailyamount double, recent_amount double, recent_num int );
然后sqoop從hdfs到mysql
mysql到hive,hive-> hdfs -> mysql,都需要 ‘\001’
sqoop export \ --connect jdbc:mysql://192.168.56.111:3306/dmdb \ --username root \ --password root \ --table dm_sales_order_count \ --export-dir /hive110/warehouse/dm_sales_source.db/dm_sales_order_count \ --input-fields-terminated-by '\001' \ -m 1
② Java方法:萬能!
見:https://www.cnblogs.com/sabertobih/p/14043929.html
(3)如何暴露接口?
見:https://www.cnblogs.com/sabertobih/p/14043895.html
三、數據倉庫之 更新數據