-
背景
- Hive 實現緩慢變化維,沒有使用事務表的更新和刪除操作(最新版本Hive已經支持,但需要Server/Client做相應配置,Hive實現的事務還有一定的局限性)
- Hive 自身的SQL使用MapReduce引擎,速度慢,這里使用SparkSQL實現
- 自動化的SCD引擎待后續實現
- 參考:
https://cwiki.apache.org/confluence/display/Hive/
https://juejin.im/post/5bdf0b84e51d450eca7c6c8b -
准備基礎維度表 base_dim
- 業務字段:id, name, city, st.
- 維度表默認字段:sk(surrogate key 代理鍵,全局唯一), scd_update_date, scd_version, scd_valid_flag, scd_start_date, scd_end_date
- scd1字段 city, st; scd2字段 name.
- scd1 字段變動不記錄scd_version, 但會記錄變更時間scd_update_date
import spark.sql
import spark.implicits._
// scd1: city, st; scd2: name
sql("drop table if exists base_dim")
sql(
s"""
| create table base_dim
| (sk int, id int, name string, city string, st string, scd_update_date string,scd_version int,scd_valid_flag string,scd_start_date string, scd_end_date string)
| stored as orc
""".stripMargin)
sql(
s"""
| insert into table base_dim
| values
| (1,1,"zhangsan","us","ca","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
| (2,2,"lisi","us","cb","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
| (3,3,"wangwu","ca","bb","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
| (4,4,"zhaoliu","ca","bc","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
| (5,5,"mazi","aa","aa","2019-01-01",1,"Y","2019-01-01","9999-12-31")
""".stripMargin)
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| 3| 3| wangwu| ca| bb| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
| 4| 4| zhaoliu| ca| bc| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
| 5| 5| mazi| aa| aa| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
| 1| 1|zhangsan| us| ca| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
| 2| 2| lisi| us| cb| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
- 階段表 stage
- 數據每行對應的變化:1: scd2 + scd1; 2: expired(snapshot) | unchanged (incrementing mode); 3: unchanged; 4: scd1; 5: scd2; 6: new record
- 快照模式(snapshot):不包含在該表中的記錄視為已過期的。
- 增量變動模式(incrementing mode):不包含在該表中的記錄仍然視為有效。
- 除了1中所述業務字段外,還有表征每行記錄的時間字段
sql("drop table if exists stage")
sql("create table stage (id int, name string, city string, st string, update_date string) stored as orc")
// 1: scd2 + scd1; 2: expired(snapshot) | unchanged (incrementing mode); 3: unchanged; 4: scd1; 5: scd2; 6: new record
val dt = getDate()
sql(
s"""
| insert into table stage
| values
| (1,"zhang","u","c",‘$dt’),
| (3,"wangwu","ca","bb",‘$dt‘),
| (4,"zhaoliu","ac","cb",$dt’),
| (5,"ma","aa","aa",‘$dt’),
| (6,"laoyang","dd","dd",’$dt‘)
""".stripMargin)
+---+-------+----+---+-----------+
| id| name|city| st|update_date|
+---+-------+----+---+-----------+
| 1| zhang| u| c| 2019-08-06|
| 3| wangwu| ca| bb| 2019-08-06|
| 4|zhaoliu| ac| cb| 2019-08-06|
| 5| ma| aa| aa| 2019-08-06|
| 6|laoyang| dd| dd| 2019-08-06|
+---+-------+----+---+-----------+
- 獲取不變的記錄
- 快照模式: 在快照中,且規定的特定字段均不變(業務主鍵+SCD2+SCD1)的記錄
val unchangedDF = sql(
s"""
|SELECT
| bd.sk,
| s.*,
| bd.scd_version,
| bd.scd_valid_flag,
| bd.scd_start_date,
| bd.scd_end_date
|FROM
| base_dim bd
| JOIN stage s
| ON bd.id=s.id AND bd.name=s.name AND bd.city=s.city AND bd.st=s.st
""".stripMargin)
+---+---+------+----+---+-----------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+------+----+---+-----------+-----------+--------------+--------------+------------+
| 3| 3|wangwu| ca| bb| 2006| 1| Y| 2019-01-01| 9999-12-31|
+---+---+------+----+---+-----------+-----------+--------------+--------------+------------+
2) 增量變動模式: 1. 在階段表中且規定的特定字段均不變(業務主鍵+SCD2+SCD1)的記錄;2. 不在階段表中的記錄
val unchangedDF = sql(
s"""
|SELECT
| bd.*
|FROM
| base_dim bd
| LEFT JOIN stage s
| ON bd.id=s.id
| WHERE s.id IS NULL OR (bd.name=s.name AND bd.city=s.city AND bd.st=s.st)
""".stripMargin)
+---+---+------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+------+----+---+---------------+-----------+--------------+--------------+------------+
| 3| 3|wangwu| ca| bb| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
| 2| 2| lisi| us| cb| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
+---+---+------+----+---+---------------+-----------+--------------+--------------+------------+
- 標記過期的記錄
- 快照模式: 1. 不在快照中的記錄 2. 發生SCD2的舊的記錄
val pre_date = getPreDate()
val max_date = "9999-12-31"
val expireDF = sql(
s"""
|SELECT
| bd.sk sk,
| bd.id id,
| bd.name name,
| bd.city city,
| bd.st st,
| bd.scd_update_date scd_update_date,
| bd.scd_version scd_version,
| 'N' scd_valid_flag,
| bd.scd_start_date scd_start_date,
| '$pre_date' scd_end_date
|FROM
| (
| SELECT *
| FROM base_dim
| WHERE scd_end_date='$max_date'
| ) bd
| LEFT JOIN stage s
| ON bd.id=s.id
| WHERE s.id IS NULL OR bd.name <> s.name
""".stripMargin)
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| 5| 5| mazi| aa| aa| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 1| 1|zhangsan| us| ca| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 2| 2| lisi| us| cb| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
2) 增量變動模式: 1. 發生SCD2的舊的記錄
val expireDF = sql(
s"""
|SELECT
| bd.sk sk,
| bd.id id,
| bd.name name,
| bd.city city,
| bd.st st,
| bd.scd_update_date scd_update_date,
| bd.scd_version scd_version,
| 'N' scd_valid_flag,
| bd.scd_start_date scd_start_date,
| '$pre_date' scd_end_date
|FROM
| (
| SELECT *
| FROM base_dim
| WHERE scd_end_date='$max_date'
| ) bd
| LEFT JOIN stage s
| ON bd.id=s.id
| WHERE bd.name <> s.name
""".stripMargin)
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| 5| 5| mazi| aa| aa| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 1| 1|zhangsan| us| ca| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
- 獲取SCD2的新紀錄
- 也可能包含SCD2新行中的SCD1變化
val scd2NewLineDF = sql(
s"""
| SELECT
| ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max sk,
| t1.id,
| t1.name,
| t1.city,
| t1.st,
| t1.scd_update_date,
| t1.scd_version,
| t1.scd_valid_flag,
| t1.scd_start_date,
| t1.scd_end_date
| FROM
| (
| SELECT
| t2.id id,
| t2.name name,
| t2.city city,
| t2.st st,
| t2.update_date scd_update_date,
| t1.scd_version + 1 scd_version,
| "Y" scd_valid_flag,
| t1.scd_start_date scd_start_date,
| '$max_date' scd_end_date
| FROM
| base_dim t1
| JOIN
| stage t2
| ON
| t1.id=t2.id AND t1.name<>t2.name
| ) t1
| CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM base_dim) t2
|
""".stripMargin)
+---+---+-----+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+-----+----+---+---------------+-----------+--------------+--------------+------------+
| 6| 1|zhang| u| c| 2019-08-06| 2| Y| 2019-01-01| 9999-12-31|
| 7| 5| ma| aa| aa| 2019-08-06| 2| Y| 2019-01-01| 9999-12-31|
+---+---+-----+----+---+---------------+-----------+--------------+--------------+------------+
- 獲取SCD1變化的記錄
val scd1UpdateDF = sql(
s"""
| SELECT
| bd.sk,
| bd.id,
| bd.name,
| s.city,
| s.st,
| s.update_date,
| bd.scd_version,
| bd.scd_valid_flag,
| bd.scd_start_date,
| bd.scd_end_date
| FROM
| base_dim bd
| JOIN
| stage s
| ON
| bd.id=s.id AND bd.name=s.name
| WHERE
| bd.city<>s.city OR bd.st<>s.st
""".stripMargin)
+---+---+-------+----+---+-----------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+-------+----+---+-----------+-----------+--------------+--------------+------------+
| 4| 4|zhaoliu| ac| cb| 2019-08-06| 1| Y| 2019-01-01| 9999-12-31|
+---+---+-------+----+---+-----------+-----------+--------------+--------------+------------+
- 獲取全新的記錄,並匯聚成最終結果
val scdDF = unchangedDF.union(expireDF).union(scd2NewLineDF).union(scd1UpdateDF)
scdDF.createOrReplaceTempView("new_base_dim")
// grand-new records
val brandNewDF = sql(
s"""
| SELECT
| ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max sk,
| t1.*,
| 1,
| "Y",
| '$pre_date',
| '$max_date'
| FROM
| (
| stage s
| LEFT ANTI JOIN
| base_dim bd
| ON
| s.id=bd.id
| ) t1
| CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM new_base_dim) t2
""".stripMargin)
brandNewDF.show()
val finalDF = scdDF.union(brandNewDF)
finalDF.show()
+---+---+-------+----+---+-----------+---+---+-----------+----------+
| sk| id| name|city| st|update_date| 1| Y|update_date|9999-12-31|
+---+---+-------+----+---+-----------+---+---+-----------+----------+
| 8| 6|laoyang| dd| dd| 2019-08-06| 1| Y| 2019-08-06|9999-12-31|
+---+---+-------+----+---+-----------+---+---+-----------+----------+
8.最終結果
1) 快照模式:
+---+---+--------+----+---+-----------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+-----------+-----------+--------------+--------------+------------+
| 3| 3| wangwu| ca| bb| 2019-08-06| 1| Y| 2019-01-01| 9999-12-31|
| 5| 5| mazi| aa| aa| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 1| 1|zhangsan| us| ca| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 2| 2| lisi| us| cb| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 6| 1| zhang| u| c| 2019-08-06| 2| Y| 2019-01-01| 9999-12-31|
| 7| 5| ma| aa| aa| 2019-08-06| 2| Y| 2019-01-01| 9999-12-31|
| 4| 4| zhaoliu| ac| cb| 2019-08-06| 1| Y| 2019-01-01| 9999-12-31|
| 8| 6| laoyang| dd| dd| 2019-08-06| 1| Y| 2019-08-06| 9999-12-31|
+---+---+--------+----+---+-----------+-----------+--------------+--------------+------------+
2)增量變動模式:
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| 3| 3| wangwu| ca| bb| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
| 2| 2| lisi| us| cb| 2019-01-01| 1| Y| 2019-01-01| 9999-12-31|
| 5| 5| mazi| aa| aa| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 1| 1|zhangsan| us| ca| 2019-01-01| 1| N| 2019-01-01| 2019-08-05|
| 6| 1| zhang| u| c| 2019-08-06| 2| Y| 2019-01-01| 9999-12-31|
| 7| 5| ma| aa| aa| 2019-08-06| 2| Y| 2019-01-01| 9999-12-31|
| 4| 4| zhaoliu| ac| cb| 2019-08-06| 1| Y| 2019-01-01| 9999-12-31|
| 8| 6| laoyang| dd| dd| 2019-08-06| 1| Y| 2019-08-06| 9999-12-31|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
本文通過手動地實現SCD2+SCD1的中間變化操作,方便大家理解緩慢變化維度的概念和實現過程。另外,本文中的轉換都是Spark在內存中的操作,在數據量大的情況下可能會影響性能,建議考慮用臨時表做中間轉換的可能性。后續將對快照式或增量變動式的階段表進行數據插入操作的中間細節進行包裝,實現自動化SCD引擎,敬請關注。
轉發自同名博客:https://blog.csdn.net/fzlulee/article/details/98489677
<全文完>