數據倉庫之 案例(基礎篇)


一、銷售案例步驟

(一)ODS層

  1. 建立源數據庫並生成初始的數據
  2. 在Hive中創建源數據過渡區和數據倉庫的表
  3. 日期維度的數據裝載
  4. 數據的ETL => 進入dwd層,本案例簡單,不需要清洗

(二)DW層

  1. dwd層:ETL清洗,本案例不需要
  2. dws層:建模型+輕聚合,本案例只需要建模型,太簡單,不需要聚合。
    •  輕聚合后建模 => 星型模型【注意,是輕聚合后,成為星型模型】

(三)DM層

  1. dm層:-> 寬表 
    1. 存放在hive -> 太慢!適合復雜計算,用來機器學習/數據挖掘
    2. 存放在mysql/oracle等分析型數據庫 -> 快!用來數據分析 
  2. 接口暴露:springboot 暴露接口

數據倉庫分層

  1. ODS(operational Date store) 源數據層
  2. DW(Data WareHouse) 數據倉庫層
  3. DM(Data Market) 數據集市層

二、數據倉庫之 構建步驟

(一)ODS層

(1)建立源數據庫mysql並生成初始的數據

/*****************************************************
            create database sales_source
******************************************************/
drop database  if exists sales_source;
create database sales_source default charset utf8 collate utf8_general_ci;
use sales_source;

/*****************************************************
            create table 
******************************************************/
-- Table:Customer
drop table if exists Customer;
create table customer(
    customer_number int primary key not null auto_increment,
    customer_name varchar(32) not null,
    customer_street_address varchar(256) not null,
    customer_zip_code int not null,
    customer_city varchar(32) not null,
    customer_state varchar(32) not null
);

-- Table:Product
drop table if exists product;
create table product(
    product_code int primary key not null auto_increment,
    product_name varchar(128) not null,
    product_category varchar(32) not null
);

-- Table:Sales_order
drop table if exists sales_order;
create table sales_order(
    order_number int primary key not null auto_increment,
    customer_number int not null,
    product_code int not null,
    order_date date not null,
    entry_date date not null,
    order_amount int not null
);

-- add constraint
alter table sales_order add constraint fk_cust_order 
    foreign key (customer_number) references customer(customer_number);
alter table sales_order add constraint fk_product_order 
    foreign key (product_code) references product(product_code);
    
/*************************************************
                insert data
***********************************************/
-- insert customer
insert into customer
(
    customer_name,customer_street_address,customer_zip_code,
    customer_city,customer_state
)
values
('Big Customers','7500 Louise Dr.',17050,'Mechanicsbrg','PA'),
('Small Stroes','2500 Woodland St.',17055,'Pittsubtgh','PA'),
('Medium Retailers','1111 Ritter Rd.',17055,'Pittsubtgh','PA'),
('Good Companies','9500 Scott St.',17050,'Mechanicsbrg','PA'),
('Wonderful Shops','3333 Rossmoyne Rd.',17050,'Mechanicsbrg','PA'),
('Loyal Clients','7070 Ritter Rd.',17055,'Mechanicsbrg','PA');

-- insert product
insert into product (product_name,product_category) values
('Hard Disk','Storage'),
('Floppy Driver','Storage'),
('Icd panel','monitor');
-- insert sales_orders 
-- customer_numer int,product_code int,order_date,entry_date,order_amount
drop procedure if exists proc_generate_saleorder;
delimiter $$
create procedure proc_generate_saleorder()
begin
    -- create temp table 
    drop table if exists temp;
    create table temp as select * from sales_order where 1=0;
    -- declare var 
    set @begin_time := unix_timestamp('2018-1-1');
    set @over_time := unix_timestamp('2018-11-23');
    set @i :=1;
    while @i <= 100000 do
        set @cust_number := floor(1+rand()*6);
        set @product_code := floor(1+rand()*3);
        set @tmp_data := from_unixtime(@begin_time+rand()*(@over_time-@begin_time));
        set @amount := floor(1000+rand()*9000);
        insert into temp values(@i,@cust_number,@product_code,@tmp_data,@tmp_data,@amount);
        set @i := @i+1;
    end while;
    -- clear sales_orders
    truncate table sales_order;
    insert into sales_order select null,customer_number,product_code,order_date,entry_date,order_amount from temp;
    commit;
    drop table temp;
end$$
delimiter ;
call proc_generate_saleorder();

PS:

  1.為什么要用constraint約束? 詳見 => https://www.cnblogs.com/sabertobih/p/13966709.html

  2.為什么存儲過程中涉及批量插表的時候要用到臨時表?

    • 已知commit一次是從內存表到物理表的過程,用不用臨時表有什么不一樣?
    •  

       

    • 答:關鍵在於temp表是新create的表,對於新create的表,insert into是在內存里完成;
      • 而對於早就存在的表,mysql默認每次insert語句都是一次commit,所以右上圖是不正確的,應該是commit了100000次。

(2)在Hive中創建源數據過渡區和數據倉庫的表

關於本案例中,hive的主鍵問題:

>>>   因為數據來源於數據庫,天生自帶主鍵當作sk,不用自己生成

>>>   否則,需要使用 row_number/ uuid/ md5生成主鍵,見 https://www.cnblogs.com/sabertobih/p/14031047.html

 inithive.sql => 創建hive表!

drop database if exists ods_sales_source cascade;
create database ods_sales_source;

use ods_sales_source;

drop table if exists ods_product;
create table ods_product(
product_code string,
product_name string,
product_category string,
version string,
ods_start_time string,
ods_end_time string
)
row format delimited fields terminated by '\u0001';

drop table if exists ods_customer;
create table ods_customer(
customer_number string,
customer_name string,
customer_street_address string,
customer_zip_code string,
customer_city string,
customer_state string,
version string,
ods_start_time string,
ods_end_time string
)
row format delimited fields terminated by '\u0001';

drop table if exists ods_origin_sales_order;
create table ods_origin_sales_order(
order_number string,
customer_number string,
product_code string,
order_date string,
order_amount string
);

drop table if exists ods_dynamic_sales_order;
create table ods_dynamic_sales_order(
order_number string,
customer_number string,
product_code string,
order_date string,
order_amount string
)
partitioned by (ymd string);

import_hive.sh => 直接執行,直接實現從創表到自動從mysql中insert語句到hive

#! /bin/bash
if [ $# = 0 ];then
hive -f /opt/data/inithive.sql
fi

echo "import ods_product...";
#global import product
sqoop import \
    --connect jdbc:mysql://192.168.56.111:3306/sales_source \
    --driver com.mysql.jdbc.Driver \
    --username root \
    --password root \
    --query "select product_code,product_name,product_category,'1.0' as version,'2018-1-1' as ods_start_time,'9999-12-31' as ods_end_time from product where 1=1 and \$CONDITIONS" \
    --target-dir /mytmp/ods/pro \
    --hive-import \
    --hive-database ods_sales_source \
    --hive-table ods_product \
    --hive-overwrite \
    -m 1
    
echo "import ods_customer...";
#global import customer
sqoop import \
    --connect jdbc:mysql://192.168.56.111:3306/sales_source \
    --driver com.mysql.jdbc.Driver \
    --username root \
    --password root \
    --query "select customer_number,customer_name,customer_street_address,customer_zip_code,customer_city,customer_state,'1.0' as version,'2018-1-1' as ods_start_time,'9999-12-31' as ods_end_time from customer where 1=1 and \$CONDITIONS" \
    --hive-import \
    --target-dir /mytmp/ods/cust \
    --hive-database ods_sales_source \
    --hive-table ods_customer \
    --hive-overwrite \
    -m 1

(3)日期維度的數據裝載 

如何自動導入分區表?

方法一(不推薦):使用sqoop手動分區,注意sqoop partition不可以帶有特殊符號,日期只可以%Y%m%d

echo "import sales_order..."
#increment import sales_order 
#partition 
day=1
md=`date -d '2018-10-23' +%j`    
while [ $day -lt $md ]
do
    mdd=`date -d "2018-1-1 +$day day" +%Y%m%d`
    hive -e "use ods_sales_source;alter table ods_start_order add partitioned(ymd=$mdd)"
    sqoop import \
    --connect jdbc:mysql://192.168.56.111:3306/sales_source \
    --driver com.mysql.jdbc.Driver \
    --username root \
    --password root \
    --query "select order_number,customer_number,product_code,order_date,order_amount from sales_order where date_format(order_date,'%Y%m%d')=$mdd and \$CONDITIONS" \
    --target-dir /mytmp/so \
    --delete-target-dir
    --hive-import \
    --hive-database ods_sales_source \
    --hive-table ods_sales_order \
    --hive-partition-key "ymd" \
    --hive-partition-value "$mdd" \
    -m 1
    let day=$day +1
done

方法二:使用hive動態分區,先在hive中導入一個全量表,再從全量表==動態分區==>導入分區表

# 全量導入
echo "import ods_origin_sales_order..."
sqoop import \
    --connect jdbc:mysql://192.168.56.111:3306/sales_source \
    --driver com.mysql.jdbc.Driver \
    --username root \
    --password root \
    --query "select order_number,customer_number,product_code,order_date,order_amount from sales_order where \$CONDITIONS" \
    --hive-import \
    --target-dir /mytmp/ods/so \
    --hive-database ods_sales_source \
    --hive-table ods_origin_sales_order \
    --hive-overwrite \
    -m 1
    
echo "import dynamic..."
hive -f /opt/data/dynamic.sql
-- 動態分區自動導入
use ods_sales_source;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions.pernode.Maximum=10000;
set hive.exec.max.dynamic.partitions=10000;
set hive.exec.max.created.files=10000;
insert into ods_dynamic_sales_order partition(ymd) select order_number,customer_number,product_code,order_date,order_amount,
date_format(order_date,'yyyyMMdd') as ymd from ods_origin_sales_order;

(4)數據的ETL

(二)DW層 

(1)SQL:建dw層表語句

可以看到只有一張customer表,一張product表,說明給的數據是已經聚合后的!

createdwdinit.sql 

-- 建表語句
-- 其中date表是外部表,且以textfile形式存儲,方便映射數據
drop database if exists DW_SALES_SOURCE cascade;
create database dw_sales_source;
use dw_sales_source;

drop table if exists dwd_dim_customer;
create table dwd_dim_customer(
customer_sk int,
customer_number int,
customer_name string,
customer_street_address string,
custom_zip_code string,
customer_city string,
customer_state string,
version string,
effectice_date string,
expiry_date string
)
row format delimited fields terminated by ','
stored as parquetfile;

drop table if exists dwd_dim_product;
create table dwd_dim_product(
product_sk int,
product_code int,
product_name string,
product_category string,
version string,
effectice_date string,
expiry_date string
)
row format delimited fields terminated by ','
stored as parquetfile;

drop table if exists dwd_dim_order;
create table dwd_dim_order(
order_sk int,
order_number int,
version string,
effectice_date string,
expiry_date string
)
row format delimited fields terminated by ','
stored as parquetfile;

drop table if exists dwd_dim_date;
create external table dwd_dim_date(
date_sk int,
d_date string,
d_month int,
d_month_name string,
d_quarter int,
d_year int
)
row format delimited fields terminated by ','
stored as textfile
location '/opt/dwdate';

drop table if exists dwd_fact_sales_order;
create table dwd_fact_sales_order(
order_sk int,
customer_sk int,
product_sk int,
date_sk int,
order_amount float
)
row format delimited fields terminated by ','
stored as parquetfile;

(2)SQL:ods層導入dw層數據:customer/product/order

dwd_import.sql

insert into dw_sales_source.dwd_dim_customer
select 
customer_number as customer_sk,
customer_number,
customer_name ,
customer_street_address ,
customer_zip_code ,
customer_city ,
customer_state ,
version ,
ods_start_time as effectice_date,
ods_end_time as expiry_date
from ods_sales_source.ods_customer;
    
insert into dw_sales_source.dwd_dim_product
select
product_code as product_sk,
product_code ,
product_name ,
product_category ,
version,
ods_start_time as effectice_date,
ods_end_time as expiry_date
from
ods_sales_source.ods_product;
    
insert into dw_sales_source.dwd_dim_order
select
order_number as order_sk,
order_number,
'1.0' as version,
order_date as effectice_date,
'9999-12-31' as expiry_date
from
ods_sales_source.ods_dynamic_sales_order

(3)腳本文件,生成date數據,導入dwd_fact_sales_order表

#!/bin/bash

#創建dw層

echo '***********************************'
echo 'create dw layout data table...'
echo '***********************************'
hive -f /opt/data/dw/createdwdinit.sql

echo '***********************************'
echo 'import data...'
echo '***********************************'
hive -f /opt/data/dw/dwd_import.sql

echo '***********************************'
echo 'generating dwd_date data...'
echo '***********************************'

## hdfs判斷是否有文件或目錄 targetfilename
=/opt/dwdate hdfs dfs -test -e $targetfilename if [ $? -eq 0 ] ;then echo 'exist' hdfs dfs -rm -R $targetfilename fi
## linux判斷是否有文件 filename
=/opt/data/tmp if [ -e $filename ]; then rm -rf $filename fi touch $filename num=0 while(( $num<365 )) do dt=`date -d "2018-01-01 $num days" +%Y-%m-%d` mtname=`date -d "${dt}" +%B` mt=`date -d "${dt}" +%m` if [ $mt -le 3 ]; then qt=1 elif [ $mt -le 6 ]; then qt=2 elif [ $mt -le 9 ]; then qt=3 else qt=4 fi let num=$num+1 echo "${num},${dt},${dt:5:2},$mtname,$qt,${dt:0:4}" >> $filename #hive -e'insert into dw_sales_source.dwd_dim_date values($num+1,${dt},${dt:5:2},$mtname,$qt,${dt:0:4})' done echo "date data從本地移動到hdfs" hdfs dfs -rm -R /opt/dwdate hdfs dfs -mkdir -p /opt/dwdate hdfs dfs -put /opt/data/tmp /opt/dwdate echo '***********************************' echo 'import fact_sales_order data...' echo '***********************************' hive -e 'insert into dw_sales_source.dwd_fact_sales_order select oso.order_number as order_sk, oso.customer_number as customer_sk, oso.product_code as product_sk, dss.date_sk as date_sk, oso.order_amount as order_amount from
  ods_sales_source.ods_dynamic_sales_order oso inner
join dw_sales_source.dwd_dim_date dss on oso.order_date = dss.d_date'

(三)DM層 

(1)如何形成寬表?

① 需求:

>>>

當天-> 顧客,產品,日期,訂單個數,當天金額  && 近兩天 -> 訂單個數,近兩天金額

<<<

② 調優見:https://www.cnblogs.com/sabertobih/p/14041854.html

③ 代碼:

dm_init.sql

drop database if exists dm_sales_source cascade;
create database dm_sales_source;
use dm_sales_source;

dm_run.sql

drop table if exists dm_sales_source.dm_sales_order_count;
create table dm_sales_source.dm_sales_order_count as 
select 
dss.d_date,d.customer_sk,d.product_sk,
count(d.order_sk) as order_num,
sum(d.order_amount) as order_dailyamount,
sum(sum(d.order_amount)) over(rows between 1 PRECEDING and current row) as recent_amount,
sum(count(d.order_sk)) over(rows between 1 PRECEDING and current row) as recent_num
from 
dw_sales_source.dwd_fact_sales_order d
inner join dw_sales_source.dwd_dim_date dss 
on d.date_sk = dss.date_sk
group by 
dss.d_date,d.customer_sk,d.product_sk
order by dss.d_date

init.sh

#!/bin/bash

hive -f /opt/data/dm/dm_init.sql
hive -f /opt/data/dm/dm_run.sql

(2)sqoop從hdfs導出mysql(如果是orc等壓縮格式,老實用Java!)

① sqoop:適用於textffile

如何查看某個table存放在hdfs什么地方? show create table dm_sales_source.dm_sales_order_count; 

 !hdfs dfs -text /hive110/warehouse/dm_sales_source.db/dm_sales_order_count/000000_0 

mysql中:

drop database if exists dmdb ;
create database dmdb;
use dmdb;
create table dm_sales_order_count(
d_date varchar(20), 
customer_sk int, 
product_sk int, 
order_num int, 
order_dailyamount double, 
recent_amount double, 
recent_num int 
);

然后sqoop從hdfs到mysql

mysql到hive,hive-> hdfs -> mysql,都需要 ‘\001’

sqoop export \
--connect jdbc:mysql://192.168.56.111:3306/dmdb \
--username root \
--password root \
--table dm_sales_order_count \
--export-dir /hive110/warehouse/dm_sales_source.db/dm_sales_order_count \
--input-fields-terminated-by '\001' \
-m 1

② Java方法:萬能!

 見:https://www.cnblogs.com/sabertobih/p/14043929.html

(3)如何暴露接口?

 見:https://www.cnblogs.com/sabertobih/p/14043895.html

三、數據倉庫之 更新數據         

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM