mysql執行拉鏈表操作


 

拉鏈表需求:

  1.數據量比較大

  2.變化的比例和頻率比較小,例如客戶的住址信息,聯系方式等,比如有1千萬的用戶數據,每天全量存儲會存儲很多不變的信息,對存儲也是浪費,因此可以使用拉鏈表的算法來節省存儲空間

       3.拉鏈歷史表,既能反映每個客戶不同時間的不同狀態,也可查看某個時間點的全量快照信息

拉鏈表設計

  

設計的拉鏈歷史表:

反映A客戶的狀態信息

select * from ods_account where cst_id='A';

 

 

反映20190601歷史數據:

 select * from ods_account where eff_date<='20190601' and end_date>'20190601';

 

反映20190602歷史全量數據:

 select * from ods_account where eff_date<='20190602' and end_date>'20190602';

 

 建表:

use edw;

drop table if exists src_account;
create table if not exists src_account(
cst_id varchar(64) comment '客戶唯一編號',
bal    float       comment '余額',
date_id varchar(16) comment '日期'
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table src_account add primary key(cst_id,date_id);

drop table if exists delta_account;
create table if not exists delta_account(
cst_id varchar(64) comment '客戶唯一編號',
bal    float       comment '余額',
etl_flag varchar(16) comment 'ETL標記'
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table delta_account add primary key(cst_id,etl_flag);

drop table if exists odshis_account;
create table if not exists odshis_account(
cst_id varchar(64) comment '客戶唯一編號',
bal    float       comment '余額',
eff_date varchar(16) comment '生效日期',
end_date varchar(16) comment '失效日期',
job_seq_id varchar(16) comment '批次號',
new_job_seq_id varchar(16) comment '最新批次號'
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table odshis_account add primary key(cst_id,new_job_seq_id);

drop table if exists ods_account;
create table if not exists ods_account(
cst_id varchar(64) comment '客戶唯一編號',
bal    float       comment '余額',
eff_date varchar(16) comment '生效日期',
end_date varchar(16) comment '失效日期',
job_seq_id varchar(16) comment '批次號'
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table ods_account add primary key(cst_id,eff_date,end_date);

 

加載原始數據:

  

delete from src_account;
insert into src_account values('A','20','20190601');
insert into src_account values('B','30','20190601');
insert into src_account values('C','50','20190601');
insert into src_account values('D','35','20190601');

insert into src_account values('A','20','20190602');
insert into src_account values('B','50','20190602');
insert into src_account values('D','20','20190602');
insert into src_account values('E','50','20190602');


insert into src_account values('A','30','20190603');
insert into src_account values('B','50','20190603');
insert into src_account values('C','50','20190603');
insert into src_account values('D','20','20190603');


insert into src_account values('A','30','20190604');
insert into src_account values('B','40','20190604');
insert into src_account values('C','30','20190604');
insert into src_account values('D','20','20190604');
insert into src_account values('E','20','20190604');
insert into src_account values('F','20','20190604');
insert into src_account values('G','20','20190604');

 

開始拉鏈過程:

 

 #清空增量數據
   truncate delta_account;
   #加載增量數據(新增)
   insert into delta_account
select t1.cst_id,t1.bal,'I' as etl_flag from 
(select * from src_account where date_id = '${job_date_id}') t1 
left join 
(select * from src_account where date_id = '${before_job_date_id}') t2
on t1.cst_id = t2.cst_id where t2.cst_id is null;

#加載增量數據(刪除)
insert into delta_account
select t1.cst_id,t1.bal,'D' as etl_flag from 
(select * from src_account where date_id = '${before_job_date_id}') t1 
left join 
(select * from src_account where date_id = '${job_date_id}') t2
on t1.cst_id = t2.cst_id where t2.cst_id is null;

#加載增量數據(變更前)
insert into delta_account
select t1.cst_id,t1.bal,'A' as etl_flag from 
(select * from src_account where date_id = '${job_date_id}') t1 
left join
(select * from src_account where date_id = '${before_job_date_id}') t2
on t1.cst_id = t2.cst_id where t2.cst_id is not null
and t1.bal <> t2.bal;

#加載增量數據(變更后)
insert into delta_account
select t1.cst_id,t2.bal,'B' as etl_flag from 
(select * from src_account where date_id = '${job_date_id}') t1 
left join
(select * from src_account where date_id = '${before_job_date_id}') t2
on t1.cst_id = t2.cst_id where t2.cst_id is not null
and t1.bal <> t2.bal;

#1.重跑:刪除已跑入數據
delete from ods_account where job_seq_id = '${job_date_id}';

#2.重跑:從歷史表恢復數據
insert into ods_account(cst_id,bal,eff_date,end_date,job_seq_id)
select  cst_id,bal,eff_date,end_date,job_seq_id from odshis_account
where new_job_seq_id = '${job_date_id}';

#3.重跑:刪除已跑入歷史數據
delete from odshis_account where new_job_seq_id = '${job_date_id}';

#4.備份數據到歷史表
insert into odshis_account(cst_id,bal,eff_date,end_date,job_seq_id,new_job_seq_id)
select cst_id,bal,eff_date,end_date,job_seq_id,'${job_date_id}'
from ods_account t
where t.end_date='99991231' and exists ( select 1 from delta_account s
where t.cst_id=s.cst_id );

#5.斷鏈
update ods_account t set end_date='${job_date_id}',job_seq_id = '${job_date_id}' where t.end_date='99991231' and exists ( select 1 from delta_account s where etl_flag in ('I','D','A') and t.cst_id=s.cst_id );

#6.加鏈
insert into ods_account(cst_id,bal,eff_date,end_date,job_seq_id)
select cst_id,bal,'${job_date_id}' as eff_date,'99991231' as end_date,'${job_date_id}' as job_seq_id from delta_account where etl_flag in ('A','I');

#7.保持數據完整性
insert into ods_account (cst_id,bal,eff_date,end_date,job_seq_id)
select t.cst_id,t.bal,'${job_date_id}','${job_date_id}' as end_date,'${job_date_id}' as job_seq_id from delta_account t  where etl_flag = 'D' and not exists (select 1 from ods_account s
where t.cst_id=s.cst_id)
 
         
 
        


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM