1 背景:
隨着公司業務的成長,數據量也隨之的不斷增長。隨之而來的問題是在做ETL的時候,時間花費也越來越長。為了節省時間開銷,我們只想要更新最新的數據,不想要把公司歷年所有的數據都進行處理。這種情況就被稱為變更數據捕獲(Change Data Capture,又名CDC)。在SQLServer2008之前,對數據變更的捕獲通常使用觸發器把DML操作中的INSERT/UPDATE/DELETE數據記錄下來,但是觸發器的維護比較困難;時間戳嚴重依賴於數據庫的設計,而且必須在業務代碼中對時間字段進行維護。
1.1 適用環境:
僅在SQLServer2008(含)以后的企業版、開發版和評估版中可用,標准版中不可用
1.2 詳解:
變更數據捕獲的更改數據源為 SQL Server 事務日志。 在將插入、更新和刪除應用於跟蹤的源表時,將會在日志中添加說明這些更改的項。 日志用作捕獲進程的輸入來源。 它會讀取日志,並在跟蹤的表的關聯更改表中添加有關更改的信息。 系統將提供一些函數,以枚舉在更改表中指定范圍內發生的更改,並以篩選的結果集的形式返回該值。 通常,應用程序進程使用篩選的結果集在某種外部環境中更新源表示形式。
2 基本操作
相關的存儲過程:
Sys.sp_cdc_add_job --添加job
Sys.sp_cdc_generate_wrapper_function
Sys.sp_cdc_change_job --修改job配置
Sys.sp_cdc_get_captured_columns --獲取捕獲列信息
Sys.sp_cdc_cleanup_change_table
Sys.sp_cdc_get_ddl_history
Sys.sp_cdc_disable_db --禁用當前數據的CDC,建議先禁用表,再禁用庫
Sys.sp_cdc_help_change_data_capture
Sys.sp_cdc_disable_table --關閉表捕獲
Sys.sp_cdc_help_jobs
Sys.sp_cdc_drop_job --刪除Job
Sys.sp_cdc_scan
Sys.sp_cdc_enable_db --啟用當前數據庫CDC功能
Sys.sp_cdc_start_job --啟動Job
Sys.sp_cdc_enable_table --開啟表捕獲
Sys.sp_cdc_stop_job --停止Job
相關函數:
Cdc.fn_cdc_get_all_changes_<capture_instance> Sys.fn_cdc_has_column_changed Cdc.fn_cdc_get_net_changes_<capture_instance> Sys.fn_cdc_increment_lsn Sys.fn_cdc_decrement_lsn Sys.fn_cdc_is_bit_set Sys.fn_cdc_get_column_ordinal Sys.fn_cdc_map_lsn_to_time Sys.fn_cdc_get_max_lsn Sys.fn_cdc_map_time_to_lsn Sys.fn_cdc_get_min_lsn
3. 操作案例
本節以microsoft發布的adventureworks數據庫為例來演示如何配置和使用Sql Server的CDC功能,本文中使用的數據庫為SQL Server 2008 R2, AdventureWorks數據庫可在這里下載,然后恢復到sql server中來。在操作之前確保SQL Agent服務是啟動狀態,最好設置為自動啟動。
3.1 對AdventureWorks啟用CDC
USE AdventureWorks GO EXECUTE sys.sp_cdc_enable_db; GO
說明:
sys.sp_cdc_enable_db
作用域: 整個目標庫,包含元數據、DDL觸發器、cdc架構和cdc用戶。無法對系統數據庫和分發數據庫啟用該功能。且執行者需要用sysadmin角色權限。
返回值: 0,成功; 1,失敗。
如果執行過程中出現 錯誤: 返回的錯誤為15517: '無法作為數據庫主體執行,因為主體"dbo" 不存在、無法模擬這種類型的主體,或您沒有所需的權限。 原因為某些存儲過程使用了具有WITH EXECUTE AS 的選項。使其在當前庫具有了某個架構,但是當在別的地方執行時,但是該架構不存在。解決辦法為(如果成功,請忽略):
USE AdventureWorks GO ALTER AUTHORIZATION ON DATABASE::[AdventureWorks] TO [sa] GO EXECUTE sys.sp_cdc_enable_db; GO
經過檢查,uspUpdateEmployeeHireInfo這個存儲過程的確有:WITH EXECUTE AS CALLER,使用sa的原因是即使sa被禁用,sa還是存在的。所以不會報錯。
3.2 檢查AdventureWorks CDC是否開啟
SELECT is_cdc_enabled,CASE WHEN is_cdc_enabled=0 THEN 'CDC OFF 'ELSE 'CDC ON'END as 描述 FROM sys.databases WHERE NAME = 'AdventureWorks'
數據庫CDC開啟后,將在當前數據庫下創建cdc用戶,在架構下面也會增加cdc這個架構,因為CDC要求獨占方式使用這兩個架構,所以要單獨創建。如果存在了非CDC功能創建的cdc用戶、架構的話,則需要先刪除該cdc命名的架構,才能開啟。如下圖所示:
當前數據庫的系統表下,會創建一些CDC輔助的表,用於在CDC過程中,記錄一些系統信息
3.3 對數據庫AdventureWorks中的表開啟CDC
使用db_owner角色的成員執行sys.sp_cdc_enable_table為每個需要跟蹤的表創建捕獲實例,sys.sp_cdc_enable_table的參數列表為:
sys.sp_cdc_enable_table [ @source_schema = ] 'source_schema', [ @source_name = ] 'source_name' , [ @role_name = ] 'role_name' [,[ @capture_instance = ] 'capture_instance' ] [,[ @supports_net_changes = ] supports_net_changes ] [,[ @index_name = ] 'index_name' ] [,[ @captured_column_list = ] 'captured_column_list' ] [,[ @filegroup_name = ] 'filegroup_name' ] [,[ @partition_switch = ] 'partition_switch' ]
說明:
a) 執行sys.sp_cdc_enable_table后,可以通過sys.tables目錄視圖中的is_tracked_by_cdc列來判斷是否創建成功。
b) 默認情況下會對表的全部列做捕獲,如果只需要對某些列做捕獲,可以在@captured_column_list參數中指定這些列;
c) 如果要把更改表放到文件組里的話,最好創建單獨的文件組(最起碼與源表獨立)。
d) 如果不想控制訪問角色,則@role_name必須顯式設置為null。如果指定了不存在的角色,執行該存儲過程后,會自動創建指定的角色。
本節我們選擇HumanResources.Department、Person.ADDRESS、Person.Person這三張表開啟CDC功能。
USE AdventureWorks; GO --表HumanResources.Department EXECUTE sys.sp_cdc_enable_table @source_schema = N'HumanResources' , @source_name = N'Department' , @role_name = N'cdc_Admin'--可以自動創建 , @capture_instance=DEFAULT GO --表Person.ADDRESS EXECUTE sys.sp_cdc_enable_table @source_schema = N'Person' , @source_name = N'ADDRESS' , @role_name = N'cdc_Admin'--可以自動創建 , @capture_instance=DEFAULT GO --表Person.Person EXECUTE sys.sp_cdc_enable_table @source_schema = N'Person' , @source_name = N'Person' , @role_name = N'cdc_Admin'--可以自動創建 , @capture_instance=DEFAULT GO
執行后,
1. 系統表中多出了三張cdc.{Schema}_{TableName}_CT的表,其中SchemaName為被捕獲表的架構名稱,TableName為被捕獲的表名
2. 在SQL Agent下創建了兩個Job, cdc.AdventureWorks_cleanup和cdc.AdventureWorks_capture
3. cdc_Admin角色被創建
3.4 驗證三張表的CDC開啟成功
SELECT name , is_tracked_by_cdc , CASE WHEN is_tracked_by_cdc = 0 THEN 'CDC ON' ELSE 'CDC OFF' END 描述 FROM sys.tables WHERE OBJECT_ID IN( OBJECT_ID('HumanResources.Department'), OBJECT_ID('Person.ADDRESS'), OBJECT_ID('Person.Person') )
結果如下
3.5 CDC功能驗證
3.5.1 查看HumanResources.Department變更之前的數據
3.5.2 新增數據
INSERT INTO HumanResources.Department ( Name , GroupName , ModifiedDate ) SELECT Name + '1' , GroupName + '1' , GETDATE() ModifiedDate FROM HumanResources.Department
操作之后的數據增加了16條,
查詢CDC變更表cdc.HumanResources_Department_CT:
SELECT * FROM cdc.HumanResources_Department_CT
結果如下:
可以看到的確多了16條記錄,注意__$operation的值
3.5.3 刪除HumanResources.Department在3.5.3步中新增的數據
DELETE FROM HumanResources.Department WHERE DepartmentID>16
查詢CDC數據
3.5.4 修改數據
UPDATE HumanResources.Department SET ModifiedDate=GETDATE()
結果如下
我們發現這次CDC表增加的數據增加了2倍,因為cdc.<capture_instance>_CT這樣命名的表,是用於記錄源表更改的表,對於insert/delete操作,會有對應的一行記錄,而對於update,會有兩行記錄。
__$operation列:1 = 刪除、2= 插入、3= 更新(舊值)、4= 更新(新值)
__$start_lsn列:由於更改是來源與數據庫的事務日志,保存事務日志的開始序列號(LSN)
微軟不建議直接查詢這類表,建議使用cdc.fn_cdc_get_all_changes_<捕獲實例> 和cdc.fn_cdc_get_net_changes_<capture_instance> 來查詢。
使用示例如下:
USE AdventureWorks; GO SELECT DepartmentID,Name,GroupName,__$operation as operation_type,ModifiedDate FROM cdc.fn_cdc_get_net_changes_HumanResources_Department(sys.fn_cdc_get_min_lsn('HumanResources_Department'), sys.fn_cdc_get_max_lsn(), N'all with mask') --最后一個參數說明如下: --all --返回的行和元數據列 $start_lsn 中應用該行所需的操作的最終更改的 LSN 和_ _$operation。 該列_ _$update_mask 始終為 NULL。 --all with mask --返回的行和元數據列 $start_lsn 中應用該行所需的操作的最終更改的 LSN 和_ _$operation。 此外,當更新操作返回 (__$operation = 4) 的更新中修改過的捕獲的列中返回的值中進行標記_ _$update_mask。 --all with merge --返回對元數據列 $start_lsn 中的行所做的最終更改的 LSN。 該列_ _$operation 將是兩個值之一: 1 表示刪除,5 表示應用更改所需的操作是插入或更新。 該列_ _$update_mask 始終為 NULL。
結果如下:
4. 常用操作舉例
4.1 查詢已經開啟的捕獲實例
由於可能不記得或者不知道開啟了什么表的捕獲,所以可以使用以下語句來查找
USE AdventureWorks; GO --返回所有表的變更捕獲配置信息 EXECUTE sys.sp_cdc_help_change_data_capture; GO
4.2 查看對某個實例(即表)的哪些列做了捕獲監控
USE AdventureWorks; GO --查看對某個實例(即表)的哪些列做了捕獲監控 EXEC sys.sp_cdc_get_captured_columns @capture_instance = 'HumanResources_Department' -- sysname
4.3 查看當前庫所有的cdc Job
USE AdventureWorks; GO --查看當前庫所有的cdc Job SELECT * FROM msdb.dbo.cdc_jobs
4.4 查看當前配置使用sp_cdc_help_jobs
啟用cdc之后會自動創建了兩個作業,可以先使用sp_cdc_help_jobs來查看
EXECUTE sp_cdc_help_jobs
對於一個大型的OLTP系統,由於數據更改會非常頻繁,變更表中的數據會非常多,如果存放過久(最久可以存放100年),那對數據庫空間是非常大的挑戰。此時可以調整上圖中cdc.AdventureWorks_cleanup 中retention(單位:分鍾),默認情況下,保留時間為3天.
4.5 修改作業配置
USE AdventureWorks; GO --顯示原有配置: EXEC sp_cdc_help_jobs GO --更改數據保留時間為分鍾 EXECUTE sys.sp_cdc_change_job @job_type = N'cleanup', @retention=100 GO --停用作業 EXEC sys.sp_cdc_stop_job N'cleanup' --job name 為NVARCHAR類型 GO --啟用作業 EXEC sys.sp_cdc_start_job N'cleanup' --job name 為NVARCHAR類型 GO --再次查看 EXEC sp_cdc_help_jobs GO
執行后,cleanup作業的運行時間間隔已經被修改成100分鍾了.
注意:修改后要先停用(如果已經啟用),再啟用,才能生效
4.6 作業的其他操作
USE AdventureWorks; GO --停用作業 EXEC sys.sp_cdc_stop_jobN'cleanup' GO --啟用作業 EXEC sys.sp_cdc_start_jobN'cleanup' GO --刪除作業 EXEC sys.sp_cdc_drop_job@job_type = N'cleanup' -- nvarchar(20) GO --創建作業 EXEC sys.sp_cdc_add_job @job_type = N'cleanup', @start_job = 0, @retention = 5760 GO --查看作業 EXEC sys.sp_cdc_help_jobs GO
4.7 DDL變更捕獲
CDC除了捕獲數據變更之外,還能捕獲DDL操作的變化。
4.7.1 現在先來對HumanResources.Department 表修改一下,把name的長度加長
USE AdventureWorks; GO ALTER TABLE HumanResources.Department ALTER COLUMN Name NVARCHAR(120) ; GO
4.7.2 查詢ddl記錄表
USE AdventureWorks; GO SELECT * FROM cdc.ddl_history
我們發現有一條schema的表更記錄
4.8 使用CDC的函數來獲取更改
4.8.1 使用cdc.fn_cdc_get_all_changes_HumanResources_Department 函數報告捕獲實例HumanResources_Department 的當前所有可用更改
USE AdventureWorks; GO DECLARE @from_lsn binary(10) --開始事務序列號 DECLARE @to_lsn binary(10) --終止事務序列號 SET @from_lsn = sys.fn_cdc_get_min_lsn('HumanResources_Department') SET @to_lsn = sys.fn_cdc_get_max_lsn() SELECT * FROM cdc.fn_cdc_get_all_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all update old'); GO ----------------------------------------------------------------------------------------------------------------------------------------------------- USE AdventureWorks; GO DECLARE @datetimeString as VARCHAR(100) --當前日期的前一天0點時間 SELECT @datetimeString=DATENAME(year,GETDATE())+'-'+DATENAME(month,GETDATE())+'-'+DATENAME(DAY,DATEADD(DAY,-1,GETDATE()))+' 00:00:00' DECLARE @begin_time as DATETIME set @begin_time=CONVERT(DATETIME,@datetimeString) DECLARE @end_time as DATETIME --當前時間 SET @end_time = GETDATE(); DECLARE @from_lsn BINARY(10), @to_lsn BINARY(10); -- Map the time interval to a change data capture query range. SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time); SELECT [__$operation] as operation_type ,[DepartmentID] ,[Name] ,[GroupName] ,[ModifiedDate] FROM cdc.fn_cdc_get_net_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all with merge');
4.8.2 獲取某個時間段的更改信息
先根據日志序列號(logsequence number ,LSN)來獲取跟蹤變更數據:Sys.fn_cdc_map_time_to_lsn獲取變更范圍內的最大、最小LSN值。可以使用:
a) Smallest greater than;
b) smallest greater than orequal;
c) largest less than;
d) largest less than or equal.
如查詢某個時間段插入的數據:
USE AdventureWorks; GO --插入數據 INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate) VALUES('test','abc',GETDATE()) INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate) VALUES('test1','abc1',GETDATE()) GO --檢查數據 DECLARE @bglsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('smallest greater than or equal','2018-12-29 14:32:00.997') DECLARE @edlsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('largest less than or equal',GETDATE()) SELECT DepartmentID,GroupName,Name FROM cdc.HumanResources_Department_CT WHERE [__$operation]=2 AND [__$start_lsn] BETWEEN @bglsn AND @edlsn
結果如下:
4.8.3 sys.fn_cdc_map_lsn_to_time 查詢變更時間
USE AdventureWorks; GO SELECT [__$operation] , CASE [__$operation] WHEN 1 THEN 'DELETE' WHEN 2 THEN 'INSERT' WHEN 3 THEN 'BEFORE UPDATE' WHEN 4 THEN 'AFTER UPDATE' END [類型], sys.fn_cdc_map_lsn_to_time([__$start_lsn]) [更改時間] , name , DepartmentID , GroupName , ModifiedDate FROM cdc.HumanResources_Department_CT
4.8.4 獲取LSN邊界
USE AdventureWorks; GO SELECT sys.fn_cdc_get_max_lsn() AS [數據庫級別的最大LSN], sys.fn_cdc_get_min_lsn('cdc.HumanResources_Department_CT') AS [捕獲實例的lsn]
這兩個值可以用於上面提到的函數里面用於篩選數據時使用。
4.8.5 更新DepartmentID=33的記錄
update HumanResources.Department set name ='changed for test record 33' where DepartmentID=33;
可以利用這個特性很方便地處理ETL過程中的緩慢變化維.
4.8.5 返回某個表的變更捕獲配置信息
EXEC sys.sp_cdc_help_change_data_capture '{schema}', '{table_name}'
例如查看HumanResources.Department的配置信息:
USE AdventureWorks;
GO
EXEC sys.sp_cdc_help_change_data_capture 'HumanResources', 'Department'
執行結果
4.8.6 獲取當前庫中所有變更捕獲配置信息
USE AdventureWorks; GO --返回所有表的變更捕獲配置信息 EXECUTE sys.sp_cdc_help_change_data_capture;
4.8.7 查看對某個表的哪些列做了捕獲監控,使用上面返回的capture_instance列值
USE AdventureWorks; GO --查看對某個表的哪些列做了捕獲監控,使用上面返回的capture_instance列值 EXEC sys.sp_cdc_get_captured_columns @capture_instance = 'HumanResources_Department'
由於在開啟該表時sys.sp_cdc_enable_table 的參數:@captured_column_list參數沒有指定,所以dbo.Department表的所有字段都進行監控了,如果你只關心某些字段,強烈建議在創建捕獲的時候設置這個屬性
4.8.8
declare @datetimeString as varchar(100) Select @datetimeString=Datename(year,GetDate())+'-'+Datename(month,GetDate())+'-'+Datename(day,dateadd(day,-1,GetDate()))+' 00:00:00' declare @begin_time as datetime set @begin_time=convert(datetime,@datetimeString) --select @begin_time declare @end_time as datetime SET @end_time = getdate(); declare @from_lsn binary(10), @to_lsn binary(10); -- Map the time interval to a change data capture query range. SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time); --select @from_lsn,@to_lsn SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@from_lsn, @to_lsn, N'all with merge'); --declare @min_lsn as binary(10) --declare @max_lsn as binary(10) --set @min_lsn =sys.fn_cdc_get_min_lsn('dbo_s_Fee') --set @max_lsn=sys.fn_cdc_get_max_lsn() --SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
--OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp --FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@min_lsn, @max_lsn, N'all with merge');
5. 參考文獻
b) https://www.cnblogs.com/chenmh/p/4408825.html