最新項目稍有空隙,開始研究SQL Server 2012和2014的一些BI特性,參照(
)的一個示例,我們開始體驗SSIS中的CDC(Change Data Capture,變更數據捕獲)。注:如果需要了解關於SQL Server 2008中的CDC,請看這里http://www.cnblogs.com/downmoon/archive/2012/04/10/2439462.html),本文假定讀者對CDC的工作方式已有所了解。^_^。
我們分三步完成實例:
1、准備基礎數據;
2、設計一個初始包;
3、在2的基礎上設計一個增量包。
首先請完成以下准備安裝:
(1)Visual studio 2012或Visual Studio 2012 Shell (Isolated) Redistributable Package
http://www.microsoft.com/en-us/download/details.aspx?id=30678
http://www.microsoft.com/en-us/download/details.aspx?id=30670
(2)SQL Server Data Tools - Business Intelligence for Visual Studio 2012
http://www.microsoft.com/zh-cn/download/details.aspx?id=36843
(2)SQL Server 2012企業版或開發版
http://www.microsoft.com/en-us/download/details.aspx?id=29066
(3)示例數據庫AdventureWorksDW2012(本文必須,如果自建表則不必)
http://msftdbprodsamples.codeplex.com/releases/view/55330
好了,開始第一步:
/* -- ============================================= -- 創建測試數據庫及數據表,借助AdventureWorksDW2012示例數據庫 ---Generate By downmoon(邀月),3w@live.cn -- ============================================= */ --Create database CDCTest --GO --USE [CDCTest] --GO --SELECT * INTO DimCustomer_CDC --FROM [AdventureWorksDW2012].[dbo].[DimCustomer] --WHERE CustomerKey < 11500; --select * from DimCustomer_CDC;
/* -- ============================================= -- 啟用數據庫級別CDC,只對企業版和開發版有效 ---Generate By downmoon(邀月),3w@live.cn -- ============================================= */ USE [CDCTest] GO EXEC sys.sp_cdc_enable_db GO -- add a primary key to the DimCustomer_CDC table so we can enable support for net changes IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[dbo].[DimCustomer_CDC]') AND name = N'PK_DimCustomer_CDC') ALTER TABLE [dbo].[DimCustomer_CDC] ADD CONSTRAINT [PK_DimCustomer_CDC] PRIMARY KEY CLUSTERED ( [CustomerKey] ASC ) GO /* -- ============================================= -- 啟用表級別CDC ---Generate By downmoon(邀月),3w@live.cn -- ============================================= */ EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'DimCustomer_CDC', @role_name = N'cdc_admin', @supports_net_changes = 1 GO
/* -- ============================================= -- 創建一個目標表,與源表(Source)有相同的表結構 --注意,在生產環境中,完全可以是不同的實例或服務器,本例為了方便,在同一個數據庫實例的同一個數據庫中演示 ---Generate By downmoon(邀月),3w@live.cn -- ============================================= */ SELECT TOP 0 * INTO DimCustomer_Destination FROM DimCustomer_CDC --select @@version; select * from DimCustomer_Destination;
第二步:創建初始包
-- ============================================= -- 我們使用兩個包來完成示例,一個初始包完成數據的初始加載,一個增量包完成數據的變更捕獲 ---Generate By downmoon(邀月),3w@live.cn -- =============================================
初始包包含如下邏輯:
(1)使用CDC Control Task標記初始加載開始LSN(Use the CDC Control Task to mark the initial load start LSN)
(2)轉換所有源表數據到目標表(Transfer all of the data from the source table into our destination table)
(3)使用CDC Control Task標記初始加載結束LSN(Use the CDC Control Task to mark the initial load end LSN)
示例:http://code.msdn.microsoft.com/My-First-Integration-fa41c0b1
新建一個SSIS項目,創建一個包“Initial Load”,如下圖:
新建兩個CDC Control Task,分別命名為“CDC Control Task Start”和“CDC Control Task End”,分別對應屬性為“Mark initial load start”和""Mark initial load end"
連接管理器均為ADO.NET方式,其他屬性如下圖:
中間加入一個“Data Flow Task”,屬性默認。
此時,運行包,可見CDC_States有初始標記。
第三步:創建增量包
增量包包含如下邏輯:
(1)創建一個源數據庫的連接管理器(Create a connection manager for the Source database)
(2)設置CDC運算符以獲取處理邊界(Set the CDC Control Operation to Get processing range)
(3)創建一個新的CDC狀態變量(CDC_state)(Create a new CDC state variable (CDC_state))
(4)創建一個目標數據庫的連接管理器(Create a connection manager for the Destination database)
(5)選擇前面初始加載包創建的狀態表(Select the state table (this was created by the Initial Load package) – [dbo].[cdc_states])
(6)設置狀態名稱(必須匹配初始加載包使用過的狀態名稱,this must match what was used in the Initial Load package (CDC_State))
在項目中創建一個新包,命名為“Incremental Load”
在包的"Control Flow"視圖中,自上而下分別手動6個Task,順序如下圖,除去上面用到的三個Task,其余均為Execute SQL Task
注意:CDC Control Task End的CDC運算符為MARK Process Range,CDC Control Task Start的CDC運算符為Get Process Range
其余4個Execute SQL Task的SQL語句如下:
--Create stage Tables IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[stg_DimCustomer_UPDATES]') AND type in (N'U')) BEGIN SELECT TOP 0 * INTO stg_DimCustomer_UPDATES FROM DimCustomer_Destination END IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[stg_DimCustomer_DELETES]') AND type in (N'U')) BEGIN SELECT TOP 0 * INTO stg_DimCustomer_DELETES FROM DimCustomer_Destination END
-- batch update UPDATE dest SET dest.FirstName = stg.FirstName, dest.MiddleName = stg.MiddleName, dest.LastName = stg.LastName, dest.YearlyIncome = stg.YearlyIncome FROM [DimCustomer_Destination] dest, [stg_DimCustomer_UPDATES] stg WHERE stg.[CustomerKey] = dest.[CustomerKey]
-- batch delete DELETE FROM [DimCustomer_Destination] WHERE[CustomerKey] IN ( SELECT [CustomerKey] FROM [dbo].[stg_DimCustomer_DELETES] )
-- truncate table truncate table [dbo].[stg_DimCustomer_DELETES] truncate table [dbo].[stg_DimCustomer_UPDATES]
最關鍵的一步,選中CDC Control Task Start,並切換到Data Flow,自上而下分別拖動CDC Source,CDC Splitter Transformer,三個ADO.NET Destination,如下圖:
其中三個的目標表分別為:[DimCustomer_Destination],stg_DimCustomer_DELETES,stg_DimCustomer_UPDATES。
而CDC Source的連接管理器屬性如下圖:
此時,可運行增量包,但我們不會看到任何運行結果,因為此時我們還沒有進行數據的Insert或Update操作。
下來我們提供一個腳本,測試下效果:
-- ============================================= -- 更新一些數據,以顯示SSIS 2012中CDC的效果 ---Generate By downmoon(邀月),3w@live.cn -- ============================================= USE [CDCTest] GO -- Transfer the remaining customer rows SET IDENTITY_INSERT DimCustomer_CDC ON INSERT INTO DimCustomer_CDC ( CustomerKey, GeographyKey, CustomerAlternateKey, Title, FirstName, MiddleName, LastName, NameStyle, BirthDate, MaritalStatus, Suffix, Gender, EmailAddress, YearlyIncome, TotalChildren, NumberChildrenAtHome, EnglishEducation, SpanishEducation, FrenchEducation, EnglishOccupation, SpanishOccupation, FrenchOccupation, HouseOwnerFlag, NumberCarsOwned, AddressLine1, AddressLine2, Phone, DateFirstPurchase, CommuteDistance ) SELECT CustomerKey, GeographyKey, CustomerAlternateKey, Title, FirstName, MiddleName, LastName, NameStyle, BirthDate, MaritalStatus, Suffix, Gender, EmailAddress, YearlyIncome, TotalChildren, NumberChildrenAtHome, EnglishEducation, SpanishEducation, FrenchEducation, EnglishOccupation, SpanishOccupation, FrenchOccupation, HouseOwnerFlag, NumberCarsOwned, AddressLine1, AddressLine2, Phone, DateFirstPurchase, CommuteDistance FROM [AdventureWorksDW2012].[dbo].[DimCustomer] WHERE CustomerKey =11502 SET IDENTITY_INSERT DimCustomer_CDC OFF GO -- give 10 people a raise UPDATE DimCustomer_CDC SET YearlyIncome = YearlyIncome + 10 WHERE CustomerKey >= 11000 AND CustomerKey <= 11010 GO
此時,我們可以看到變更捕獲的結果:
如果您覺得還不夠直觀,請"Enable Data Viewer",
至此,一個SSIS 2012中CDC的實例演示結束,如果還有進一步的研究,請移駕MSDN,下面有鏈接。本文也提供示例項目包,以作研究之用。
本文參考:
http://msdn.microsoft.com/en-us/library/bb895315.aspx