在SSIS 2012中使用CDC(數據變更捕獲)


最新項目稍有空隙,開始研究SQL Server 2012和2014的一些BI特性,參照()的一個示例,我們開始體驗SSIS中的CDC(Change Data Capture,變更數據捕獲)。

注:如果需要了解關於SQL Server 2008中的CDC,請看這里http://www.cnblogs.com/downmoon/archive/2012/04/10/2439462.html),本文假定讀者對CDC的工作方式已有所了解。^_^。

我們分三步完成實例:

1、准備基礎數據;

2、設計一個初始包;

3、在2的基礎上設計一個增量包。

首先請完成以下准備安裝:

(1)Visual studio 2012或Visual Studio 2012 Shell (Isolated) Redistributable Package

http://www.microsoft.com/en-us/download/details.aspx?id=30678

http://www.microsoft.com/en-us/download/details.aspx?id=30670

(2)SQL Server Data Tools - Business Intelligence for Visual Studio 2012

http://www.microsoft.com/zh-cn/download/details.aspx?id=36843

(2)SQL Server 2012企業版或開發版

http://www.microsoft.com/en-us/download/details.aspx?id=29066

(3)示例數據庫AdventureWorksDW2012(本文必須,如果自建表則不必)

http://msftdbprodsamples.codeplex.com/releases/view/55330

 

好了,開始第一步:

/*
-- =============================================
-- 創建測試數據庫及數據表,借助AdventureWorksDW2012示例數據庫
---Generate By downmoon(邀月),3w@live.cn
-- =============================================
*/
--Create database CDCTest
--GO
--USE [CDCTest]
--GO

--SELECT * INTO DimCustomer_CDC
--FROM [AdventureWorksDW2012].[dbo].[DimCustomer]
--WHERE CustomerKey < 11500;

--select * from DimCustomer_CDC;
/*
-- =============================================
-- 啟用數據庫級別CDC,只對企業版和開發版有效
---Generate By downmoon(邀月),3w@live.cn
-- =============================================
*/
USE
 [CDCTest]
GO

EXEC sys.sp_cdc_enable_db
GO

-- add a primary key to the DimCustomer_CDC table so we can enable support for net changes
IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = 
OBJECT_ID(N'[dbo].[DimCustomer_CDC]') AND name = N'PK_DimCustomer_CDC')
  ALTER TABLE [dbo].[DimCustomer_CDC] ADD CONSTRAINT 
[PK_DimCustomer_CDC] PRIMARY KEY CLUSTERED 
(
    [CustomerKey] ASC
)
GO

/*
-- =============================================
-- 啟用表級別CDC
---Generate By downmoon(邀月),3w@live.cn
-- =============================================
*/
EXEC sys.sp_cdc_enable_table 
@source_schema = N'dbo',
@source_name = N'DimCustomer_CDC',
@role_name = N'cdc_admin',
@supports_net_changes = 1

GO
/*
-- =============================================
-- 創建一個目標表,與源表(Source)有相同的表結構
--注意,在生產環境中,完全可以是不同的實例或服務器,本例為了方便,在同一個數據庫實例的同一個數據庫中演示
---Generate By downmoon(邀月),3w@live.cn
-- =============================================
*/
SELECT TOP 0 * INTO DimCustomer_Destination
FROM DimCustomer_CDC
--select @@version;
select * from DimCustomer_Destination;

邀月工作室邀月工作室

第二步:創建初始包

-- =============================================
-- 我們使用兩個包來完成示例,一個初始包完成數據的初始加載,一個增量包完成數據的變更捕獲
---Generate By downmoon(邀月),3w@live.cn
-- =============================================

初始包包含如下邏輯:
(1)使用CDC Control Task標記初始加載開始LSN(Use the CDC Control Task to mark the initial load start LSN)
(2)轉換所有源表數據到目標表(Transfer all of the data from the source table into our destination table)
(3)使用CDC Control Task標記初始加載結束LSN(Use the CDC Control Task to mark the initial load end LSN)

示例:http://code.msdn.microsoft.com/My-First-Integration-fa41c0b1

新建一個SSIS項目,創建一個包“Initial Load”,如下圖:

邀月工作室

新建兩個CDC  Control Task,分別命名為“CDC Control Task Start”和“CDC Control Task End”,分別對應屬性為“Mark initial load start”和""Mark initial load end"

連接管理器均為ADO.NET方式,其他屬性如下圖:

邀月工作室

邀月工作室

中間加入一個“Data Flow Task”,屬性默認。

邀月工作室

此時,運行包,可見CDC_States有初始標記。

邀月工作室

 

第三步:創建增量包

增量包包含如下邏輯:
(1)創建一個源數據庫的連接管理器(Create a connection manager for the Source database)
(2)設置CDC運算符以獲取處理邊界(Set the CDC Control Operation to Get processing range)
(3)創建一個新的CDC狀態變量(CDC_state)(Create a new CDC state variable (CDC_state))
(4)創建一個目標數據庫的連接管理器(Create a connection manager for the Destination database)
(5)選擇前面初始加載包創建的狀態表(Select the state table (this was created by the Initial Load package) – [dbo].[cdc_states])
(6)設置狀態名稱(必須匹配初始加載包使用過的狀態名稱,this must match what was used in the Initial Load package (CDC_State))

在項目中創建一個新包,命名為“Incremental Load”

在包的"Control Flow"視圖中,自上而下分別手動6個Task,順序如下圖,除去上面用到的三個Task,其余均為Execute SQL Task

邀月工作室

注意:CDC Control Task End的CDC運算符為MARK Process Range,CDC Control Task Start的CDC運算符為Get Process Range

其余4個Execute SQL Task的SQL語句如下:

--Create stage Tables
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[stg_DimCustomer_UPDATES]') AND type in (N'U'))
BEGIN
   SELECT TOP 0 * INTO stg_DimCustomer_UPDATES
   FROM DimCustomer_Destination
END

IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[stg_DimCustomer_DELETES]') AND type in (N'U'))
BEGIN
   SELECT TOP 0 * INTO stg_DimCustomer_DELETES
   FROM DimCustomer_Destination
END
-- batch update
UPDATE dest
SET 
    dest.FirstName = stg.FirstName, 
    dest.MiddleName = stg.MiddleName,
    dest.LastName = stg.LastName, 
    dest.YearlyIncome = stg.YearlyIncome
FROM 
    [DimCustomer_Destination] dest, 
    [stg_DimCustomer_UPDATES] stg
WHERE 
    stg.[CustomerKey] = dest.[CustomerKey]
-- batch delete
DELETE FROM [DimCustomer_Destination]
  WHERE[CustomerKey] IN 
(
    SELECT [CustomerKey]
    FROM [dbo].[stg_DimCustomer_DELETES]
)
-- truncate table 
truncate table  [dbo].[stg_DimCustomer_DELETES]
truncate table  [dbo].[stg_DimCustomer_UPDATES]

 

最關鍵的一步,選中CDC Control Task Start,並切換到Data Flow,自上而下分別拖動CDC Source,CDC Splitter Transformer,三個ADO.NET Destination,如下圖:

邀月工作室

其中三個的目標表分別為:[DimCustomer_Destination],stg_DimCustomer_DELETES,stg_DimCustomer_UPDATES。

邀月工作室

邀月工作室

而CDC Source的連接管理器屬性如下圖:

邀月工作室

此時,可運行增量包,但我們不會看到任何運行結果,因為此時我們還沒有進行數據的Insert或Update操作。

下來我們提供一個腳本,測試下效果:

-- =============================================
-- 更新一些數據,以顯示SSIS 2012中CDC的效果
---Generate By downmoon(邀月),3w@live.cn
-- =============================================

USE [CDCTest]
GO
 
-- Transfer the remaining customer rows
SET IDENTITY_INSERT DimCustomer_CDC ON
 
INSERT INTO DimCustomer_CDC
(
       CustomerKey, GeographyKey, CustomerAlternateKey, Title, FirstName, 
       MiddleName, LastName, NameStyle, BirthDate, MaritalStatus, 
       Suffix, Gender, EmailAddress, YearlyIncome, TotalChildren, 
       NumberChildrenAtHome, EnglishEducation, SpanishEducation,
       FrenchEducation, EnglishOccupation, SpanishOccupation, 
       FrenchOccupation, HouseOwnerFlag, NumberCarsOwned, AddressLine1, 
       AddressLine2, Phone, DateFirstPurchase, CommuteDistance
)
SELECT CustomerKey, GeographyKey, CustomerAlternateKey, Title, FirstName, 
       MiddleName, LastName, NameStyle, BirthDate, MaritalStatus, 
       Suffix, Gender, EmailAddress, YearlyIncome, TotalChildren, 
       NumberChildrenAtHome, EnglishEducation, SpanishEducation,
       FrenchEducation, EnglishOccupation, SpanishOccupation, 
       FrenchOccupation, HouseOwnerFlag, NumberCarsOwned, AddressLine1, 
       AddressLine2, Phone, DateFirstPurchase, CommuteDistance
FROM [AdventureWorksDW2012].[dbo].[DimCustomer]
WHERE CustomerKey =11502
 
SET IDENTITY_INSERT DimCustomer_CDC OFF
GO
 
-- give 10 people a raise
UPDATE DimCustomer_CDC 
SET 
    YearlyIncome = YearlyIncome + 10
WHERE
    CustomerKey >= 11000 AND CustomerKey <= 11010
 
GO

此時,我們可以看到變更捕獲的結果:

邀月工作室

如果您覺得還不夠直觀,請"Enable Data Viewer",

邀月工作室

邀月工作室

至此,一個SSIS 2012中CDC的實例演示結束,如果還有進一步的研究,請移駕MSDN,下面有鏈接。本文也提供示例項目包,以作研究之用。

項目文件下載1項目文件下載2

 

本文參考:

http://msdn.microsoft.com/en-us/library/bb895315.aspx

http://www.mattmasson.com/index.php/2011/12/cdc-in-ssis-for-sql-server-2012-2/?utm_source=rss&utm_medium=rss&utm_campaign=cdc-in-ssis-for-sql-server-2012-2

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM