一致性測試
在做集市遷移時,甲方比較看重數據的一致性測試,一般會要求做新表與舊表的數據量比對,以及部分金額字段的字段級比對。
下面給出的參考SQL都可以先在Excel中做好拼接語句,再整合到Python腳本框架中。一般整合之前可以先調試好SQL語句,也就是直接在SQL查詢平台sqldbx,先跑一遍SQL,調試完畢再整合為Python腳本。
數據量比對
/* 表1與表2數據量比對 */
insert into 庫名.結果表名
(
table_name -- 表名
,new_cnt -- 新表記錄數
,org_cnt -- 舊表記錄數
,res -- 新舊表差額
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('表1與表2數據量比對' as varchar(50))
)
'base_info' as table_name
,t1.cnt as new_cnt
,t2.cnt as org_cnt
,t1.cnt-t2.cnt as res
from
(select count(*) as cnt from 庫名.表名1 where data_dt = '2021-11-11') t1
left join
(select count(*) as cnt from 庫名.表名2 where data_dt = '2021-11-11') t2 on 1=1
金額字段SUM比對
/* 表1與表2重要金額字段sum值比對 */
insert into 庫名.結果表名
(
table_name -- 表名
,column_name -- 字段名
,new_sum_cnt -- 新表字段求和
,org_sum_cnt -- 舊表字段求和
,res -- 新舊表字段差額
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('表1與表2數據量比對' as varchar(50))
)
'base_info' as table_name
,'amt' as column_name
,t1.cnt as new_cnt
,t2.cnt as org_cnt
,t1.cnt-t2.cnt as res
from
(select sum(amt) as cnt from 庫名.表名1 where data_dt = '2021-11-11') t1
left join
(select sum(amt) as cnt from 庫名.表名2 where data_dt = '2021-11-11') t2 on 1=1
主鍵唯一
/* 主鍵重復 */
insert into 庫名.結果表名
(
table_name -- 表名
,pk_name -- 主鍵名
,pk_res -- 結果值
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('主鍵重復' as varchar(50))
)
select
'base_info' as table_name
,'主鍵1,主鍵2' as pk_name
count(1)-count(distinct 主鍵1,主鍵2) as pk_res
from 庫名.表名 t1
where t1.data_dt = '2021-11-11'
空值校驗
/* 字段空值檢查 */
insert into 庫名.結果表名
(
nb -- 序號
,table_name -- 表名
,column_name -- 字段名
,column_count -- 次數
)
partition (data_dt,type_name)
select
'kn001' as nb
,'字段空值檢查' as type_name
,'base_info' as table_name
,'ident_id' as column_name
,cast(count(*) as bigint) as column_count
from 庫名.表名 t1
where t1.data_dt = '2021-11-11'
and cast(t1.ident_id as string) = ''
group by null --這句可省略
having count(*) > 0
枚舉值
/* 枚舉值 */
insert into 庫名.結果表名
(
table_name -- 表名
,column_name -- 字段名
,column_val -- 字段對應的值
,column_count -- 字段的值對應出現的次數
)
partition
(
data_dt = cast('2021-11-11' as varchar(10))
,type_name = cast('枚舉值' as varchar(50))
)
select
cast('base_info' as varchar(50)) as table_name
,cast('ident_id' as varchar(50)) as column_name
,cast(ident_id as varchar(50)) as column_val
,cast(count(*) as bigint as column_count
from 庫名.表名
where ident_id not in
(
select val --碼值
from 庫名.映射表名 --碼值映射表
where table_id = 'base_info' --表名
and clmn_id = 'ident_id' --字段名
)
group by 1,2,3,5
可以根據需要,考慮是否做格式轉換CAST('BASE_INFO' AS VARCHAR(50)) AS TABLE_NAME,如果查詢出來的結果與結果表的字段類型一致,就不需要格式轉換的。
腳本框架
# 框架
# 注釋
import sys
from job.base.JobBase import ExitCode
import job.base.ClientUtil as util
def checkArgs(length):
util.debug('參數檢查')
util.checkArgsEx(length)
try:
checkArgs(1)
各種參數
SQL主代碼
util.exit(ExitCode.EXIT_SUCCESS)
except Exception as e:
util.exit(ExitCode.EXIT_ERROR)
finally:
util.destory()
建表語句
drop table 庫名.結果表名;
create table if not exists 庫名.結果表名
(
table_name varchar(50) comment '表名'
,column_name varchar(50) comment '字段名'
,column_val varchar(50) comment '字段對應的值'
,column_count bigint comment '字段的值對應出現的次數'
)
partitioned by (
data_dt varchar(10) comment '數據日期'
type_name varchar(50) comment '類型'
)
comment '結果表名'
stored as parquet
這里有個小技巧,如果測試建表沒有特殊要求,可選擇靈活建表。可根據select查詢出的字段的類型,以此建表,省去cast(字段,as 字段類型) 轉換類型的麻煩
跑后查數
跑完腳本之后,多張表查數,可用union all來看
select '表名1' as table_name, count(*) from 庫名.表名1 where data_dt = '2021-11-11' union all
select '表名2' as table_name, count(*) from 庫名.表名2 where data_dt = '2021-11-11' union all
select '表名3' as table_name, count(*) from 庫名.表名3 where data_dt = '2021-11-11' union all
select '表名4' as table_name, count(*) from 庫名.表名4 where data_dt = '2021-11-11' union all
select '表名5' as table_name, count(*) from 庫名.表名5 where data_dt = '2021-11-11'
