開篇介紹
在 SSIS Data Flow 中有一個 Multicast 組件,它的作用和 Merge, Merge Join 或者 Union All 等合並數據流組件對比起來作用正好相反。非常直觀,它可以將一個數據流平行分開成為多個數據流供下游其它 Data Flow 組件使用。
使用 Multicast Task 的幾種情形
第一種,從同一個數據源中取出一部分數據直接放到 A 表中,一部分數據直接放到 B 表中。我曾經遇到一個370多列的一個文件,這370列的文件可以分出列數不等的7,8 張表。我最開始的做法是先將這個文件的數據 Load 到一個大表中,然后再從大表抽取不同的列到小表中,這樣相當於形成了二次加載,效率不高,后來使用 Multicast 就可以一次性直接將 370 列寬的文件分散到不同的表中。
第二種,從數據源 A 抽取數據到 B,B 每次都會先 Truncate 一下,但是又需要備份一下每次從 A 抽取的數據,這個時候也可以使用 Multicast。在每次從 A 抽取數據的時候,通過 Multicast 使數據在導向 B 的同時也導向到 B 的備份表。
第三種,類似於第二種,不同的是沒有備份表,但是需要保留加載的一些 Audit 信息數據。比如,從 Source 抽取數據到 Staging 的時候,同時需要記錄一下抽取的行數,以及用來標示這批 Staging 數據中最大的時間戳,表名和列的名稱。這樣的話,下次加載數據到 Staging 的時候就只選擇加載新增的 Source 數據,也就是上一批最大時間戳之后的新數據。
當然,同一種問題可能有不同的解決的方式,歡迎大家補充!
下面的示例演示一下第一種和第三種情形。
第一種 - 分散同一個 Source 表數據到多個目標表中
先從 TSQL2012 中抽取一部分測試的數據 (TSQL2012 是 Microsoft SQL Server 2012 High-Performance T-SQL Using Window Function 一書中的示例數據庫),我們的 Source 表就是 SalesOrderSource。
USE BIWORK_SSIS GO IF OBJECT_ID('dbo.SalesOrderSource','U') IS NOT NULL DROP TABLE dbo.SalesOrderSource GO SELECT * INTO dbo.SalesOrderSource FROM TSQL2012.Sales.Orders WHERE orderdate < '2006-08-01' SELECT * FROM dbo.SalesOrderSource
創建兩個目標表,一個用來簡單存儲 Order 相關信息,一個用來簡單存儲 Ship 相關信息。
IF OBJECT_ID('dbo.SalesOrder','U') IS NOT NULL DROP TABLE dbo.SalesOrder IF OBJECT_ID('dbo.OrderShip','U') IS NOT NULL DROP TABLE dbo.OrderShip CREATE TABLE dbo.SalesOrder ( OrderID INT, CustID INT, EmpID INT, OrderDate DATETIME, CreateDate DATETIME DEFAULT(GETDATE()) ) CREATE TABLE dbo.OrderShip ( OrderID INT, ShippedDate DATETIME, Shipperid INT, freight MONEY, shipname NVARCHAR(40), CreateDate DATETIME DEFAULT(GETDATE()) )
創建連接管理器的過程就不多說了,新建一個 Data Flow Task, 然后創建一個 OLE DB Source 指向 SalesOrderSource 這張數據源表。
在 Columns 中選擇只需要向下輸出的列,減少不必要的數據傳輸。
這時就可以添加 Multicast 組件了,並且同時添加另外兩個 OLE DB Destination 組件連接到 Multicast 上。
編輯 OLE_DST_Order
這里的 Avaliable Input 從 數據源經 Multicast 出來的對於 OLE_DST_Order 和 OLE_DST_Ship Task 來說都是等同的,一模一樣的。不一樣的就是 OLE_DST_Order 可能只需要其中一部分列,而 OLE_DST_Ship 只需要另外的一部分列。
編輯 OLE_DST_Ship
Column Mapping
保存並執行 Package,可以看到一個數據源通過 Multicast 就將數據流分成不同的支流走向其它的 Task,並且支流中的數據也是一模一樣的。只不過不同的 Destination Task 做了不同的操作,兩個分支數據流的執行也是並行執行的,效率上得到的極大的提升。
查詢一下最終輸出的結果,同一個數據源的數據分拆到不同的目標表中了。
第二種和第一種類似,略過。
第三種
假設 SalesOrderSource 是數據源表。現在需要從 數據源表加載數據到 Staging 表中,並且同時需要記錄一下每次加載數據的條數,以及能夠標示 SalesOrderSource 的在當此加載的最大時間戳,這樣下次加載的時候就可以判斷應該從什么時候開始只加載新增的部分數據了。
這種情況一般會配合 ProcessLogID 來使用,Package 每執行一次就是一個 Process 有 一個 ProcessLogID,並且可以記錄成功或者失敗的狀態。在這個例子中,就不提供 ProcessLog 的流程了,只簡單演示一下通過 Multicast 來實現 Audit 信息記錄的處理方式。
先看一下這個訂單表,每增加一筆訂單就會增加一條數據,OrderDate 表示了下單的日期。假設今天是 2006-12-01,每次加載的時候只加載一個月以前的歷史數據。如果第一次加載了 OrderDate < 2006-11-01 之前的數據,那么下一個月 2007-01-01 就只應該加載 2006-11-01 至 2006-12-01 之間的所謂增量數據了。這種情況下,時間點很好確定,都是上一個月1日之前的數據。那如果加載周期不固定呢?那么就有必要記錄上次加載的最大時間戳,而這里的 OrderDate 列就是記錄上次加載最大時間戳的時間依據。
運行下面的 SQL 語句創建相關測試表 -
USE BIWORK_SSIS GO IF OBJECT_ID('dbo.SalesOrderSource','U') IS NOT NULL DROP TABLE dbo.SalesOrderSource GO SELECT * INTO dbo.SalesOrderSource FROM TSQL2012.Sales.Orders WHERE orderdate < '2006-08-01' SELECT * FROM dbo.SalesOrderSource IF OBJECT_ID('dbo.OrderShipStaging','U') IS NOT NULL DROP TABLE dbo.OrderShipStaging IF OBJECT_ID('dbo.StagingAudit','U') IS NOT NULL DROP TABLE dbo.StagingAudit CREATE TABLE dbo.OrderShipStaging ( OrderID INT, OrderDate DATETIME, ShippedDate DATETIME, Shipperid INT, freight MONEY, shipname NVARCHAR(40), CreateDate DATETIME DEFAULT(GETDATE()) ) CREATE TABLE dbo.StagingAudit ( ProcessLogID INT PRIMARY KEY IDENTITY(1,1), TableName NVARCHAR(50), TrackColumnName NVARCHAR(50), TotalCount INT, LastLoadingDate DATETIME ) SELECT * FROM dbo.StagingAudit SELECT * FROM dbo.OrderShipStaging
StagingAudit 表可以用來跟蹤很多有 Fact 表特征的數據表加載信息,因為類似於 Dimension 信息一般在 Staging 階段每次都是全部重新加載,數據量本身不大。但是對於有 Fact 事實特征的歷史數據,每次都重新加載是非常浪費時間的,所以這些表都是要 Track 的。
StagingAudit 表中的 ProcessLogID 只是用來模擬一下 ProcessLog,實際開發當中還會用到其它的列信息,在這里就都省略了。包括每次加載的時候還有檢查上一次加載是否成功,上一次加載的時間戳等檢查邏輯在這里也省略掉。
把上一個例子中的 Data Flow Task 復制一份,然后刪除掉 Multicast 下 Order Destination 的 Task,新添一個 Aggregate 組,OLE_DST_Ship 中的目標表現在換成 dbo.OrderShipStaging 並重新 Mapping 一下。
這里取到數據流中的行數以及最大的 OrderDate 值。
下面添加一個 Derived Column,寫入 Table Name 和 Column Name,如果有 Process 或者其它的表的話,也可以添加其它表的信息。
添加一個 OLE DB Destination 組件,並且配置 StagingAudit 表。
Column Mapping
保存 Package 並執行這個 Data Flow Task,在這里可以看到當從 Source 源抽取數據到 Staging 表的同時,通過 Multicast 並行的將數據流分向 Aggregate 組件進行了信息統計,並且保存到 StagingAudit 表中。
再來看看 StagingAudit 表中的記錄。
至此,可以通過上面的幾個小示例認識到 Multicast 的作用和特點。
總結
對於上面幾個例子中提出的一些解決方案在性能上也有進一步的提升空間,特別是在數據量超過千萬級以上應該要好好嘗試一下不同的解決方案,哪一種方案在性能上會更好一些。
比如說,之前提到的有關表備份的問題,通過 Multicast 分支平行寫入目標表。在我本機的測試過程中(8GB+2.5GHz), 通過 Multicast 平行寫入,500W的測試數據大概在30秒 - 50秒左右完成,千萬級差不多在90秒內。當數據量超過千萬級或者億級的時候,也可以考慮使用分區表切換分區的方式來進行。
還有就是第三個示例中出現的 Aggregate 組件和 Sort 組件一樣是一個 Blocked 組件,非同步組件,它需要將上游數據流全部加載完畢處理完了之后才開發它自身的數據流到下游組件。這樣它所在的整個 Data Flow Task 將會一直被阻塞直到它全部處理完成之后才會走向下一個 Data Flow Task。 所以,也可以考慮當 Staging 數據寫完之后再來基於 Staging 數據來記錄一些統計信息。
關於在上面提到的有關解決方案,需求不同,項目背景不同可能在解決方案的選擇上會有一些差別。在性能方面出現的問題,可能與表本身結構,索引相關,與數據源網絡傳輸,與開發環境的配置高低多少,數據量大小都有關系,但是不同解決方案性能上的臨界點,高低之分是需要在這些特定環境下不斷嘗試,優化才能真正選擇一個最優的解決方案。
更多 BI 文章請參看 BI 系列隨筆列表 (SSIS, SSRS, SSAS, MDX, SQL Server)如果覺得這篇文章看了對您有幫助,請幫助推薦,以方便他人在 BIWORK 博客推薦欄中快速看到這些文章