如何管理和記錄 SSIS 各個 Task 的開始執行時間和結束時間以及 Task 中添加|刪除|修改的記錄數


開篇語

在這篇日志中 如何在 ETL 項目中統一管理上百個 SSIS 包的日志和包配置框架 我介紹到了包級別的日志管理框架,那么這個主要是針對包這一個層級的 Log 信息,包括包開始執行和結束時間,以及各個包的執行成功或者失敗狀態。

但是我們可以更加深一層次的將日志記錄 Logging 以及數據信息 Auditing 信息延伸到包中的重要 Task 中。

通常情況下,SSIS 包從各個數據源加載數據到 Staging 表中,數據源可以是文件,也可以是其它數據庫。然后經過數據倉庫 SCD 以及 Lookup 等操作,將 Staging 中的數據清理並整理加載到各個維度以及事實表中。

假設我們需要知道在當前操作中,各個 Staging 表加載了多少數據,使用了多長時間。各個處理維度和事實的 Task 使用了多少時間,新增了多少數據,修改了多少數據。這些我們也是有能力做到的,如果再配合 如何在 ETL 項目中統一管理上百個 SSIS 包的日志和包配置框架 這篇文章中提到的包級別日志記錄,那么我們將非常清晰的知道我們的 SSIS 包無論是在包級別,還是在各個重要 Task 級別的各種日志,數據信息。這些信息對於我們的包維護,性能分析,錯誤糾正,錯誤修復都是非常有價值的。

比如,我可以很輕松的通過自定義的報表瀏覽哪些 Task 在同等記錄情況下最耗時間,各個 Task 在整個包的執行過程中所用的時間比。

不同的 ETL 項目在 Auditing 上會采取不同的策略,比如以文件加載為主的 ETL 是允許有部分錯誤數據加載失敗的,但是以數據倉庫為主的 ETL 則不希望出現錯誤數據加載的。因此在設計 Auditing 的時候要考慮到這些情況,比如設計的時候多出一個 失敗數據總數的記錄用於跟蹤文件數據等。

在這里只講如何實現 Auditing,簡單的介紹一下核心操作,大家可以在這個基礎之上去擴充。

關鍵點

實現 Auditing 的關鍵點就是要借用控制流 Task 中的 Event Handler 下的 OnPostExecute 和 OnPreExecute 功能。

  1. OnError 功能已經在這篇 如何在 ETL 項目中統一管理上百個 SSIS 包的日志和包配置框架 文章中詳細的介紹到了。
  2. OnPreExecute - 在 Task 執行之前觸發的事件。
  3. OnPostExecute - 在 Task 執行完成之后觸發的事件。

通過這樣兩個事件我們很容易實現對 Task 執行前后表數據變化的操作記錄。

數據源,目標表及其它數據庫對象

測試數據源是 AdventureWorksLT2012

BIWORK_SSIS 數據庫中的目標表 

IF OBJECT_ID('dbo.SalesOrderDetail') IS NOT NULL
DROP TABLE dbo.SalesOrderDetail GO
 
CREATE TABLE [dbo].[SalesOrderDetail]( [SalesOrderID] [int] NOT NULL, [SalesOrderDetailID] [int] NOT NULL, [OrderQty] [smallint] NOT NULL, [ProductID] [int] NOT NULL, [UnitPrice] [money] NOT NULL, [UnitPriceDiscount] [money] NOT NULL, [LineTotal] [numeric](38, 6) NOT NULL, [rowguid] [uniqueidentifier] NOT NULL, [ModifiedDate] [datetime] NOT NULL ) ON [PRIMARY]
GO

Task 執行狀態表

EXECUTION_ID 應該使用 如何在 ETL 項目中統一管理上百個 SSIS 包的日志和包配置框架 中的 PROCESS_LOG_ID, 這樣就將 SSIS 包日志和 TASK 關聯起來了。

USE BIWORK_SSIS GO

IF OBJECT_ID('TASK_EXECUTION_STATUS') IS NOT NULL
DROP TABLE TASK_EXECUTION_STATUS GO

CREATE TABLE TASK_EXECUTION_STATUS ( EXECUTION_ID NVARCHAR(255), PACKAGE_NAME NVARCHAR(100), TASK_ID NVARCHAR(250), TASK_NAME NVARCHAR(250), TABLE_NAME NVARCHAR(250), ExistingRowsBefore BIGINT, StartTime DATETIME, DeletedRows BIGINT, UpdatedRows BIGINT, InsertedRows BIGINT, ExistingRowsAfter BIGINT, EndTime DATETIME, ExecutionStatus INT )

獲取表的條數

IF OBJECT_ID('dbo.GET_TABLE_COUNT') IS NOT NULL
DROP PROCEDURE dbo.GET_TABLE_COUNT GO

CREATE PROCEDURE dbo.GET_TABLE_COUNT @TABLE_NAME NVARCHAR(50), @ROW_COUNT BIGINT OUTPUT AS
BEGIN 

    SELECT @ROW_COUNT = SUM(PART.rows) FROM sys.tables TBL INNER JOIN sys.partitions PART ON TBL.object_id = PART.object_id
    INNER JOIN sys.indexes IDX ON PART.object_id = IDX.object_id
    AND PART.index_id = IDX.index_id WHERE TBL.name = @TABLE_NAME
    AND IDX.index_id < 2
    GROUP BY TBL.object_id, TBL.name RETURN @ROW_COUNT
END
GO

記錄時間

這個存儲過程用來每次在執行 Task 之前獲取目標表中的條數,並且插入 Task 啟動時間 -

IF OBJECT_ID('dbo.USP_INSERT_TASK_EXECUTION','P') IS NOT NULL
DROP PROCEDURE dbo.USP_INSERT_TASK_EXECUTION GO

CREATE PROCEDURE USP_INSERT_TASK_EXECUTION @TARGET_TABLE_NAME NVARCHAR(50), @EXECUTION_ID NVARCHAR(255) , @PACKAGE_NAME NVARCHAR(100), @TASK_ID NVARCHAR(255), @TASK_NAME NVARCHAR(250) AS
BEGIN
 
    DECLARE @ExistingRowsBefore BIGINT

    EXECUTE dbo.GET_TABLE_COUNT @TABLE_NAME = @TARGET_TABLE_NAME, @ROW_COUNT = @ExistingRowsBefore OUTPUT INSERT INTO TASK_EXECUTION_STATUS ( EXECUTION_ID, PACKAGE_NAME, TASK_ID, TASK_NAME, TABLE_NAME, ExistingRowsBefore, StartTime, DeletedRows, UpdatedRows, InsertedRows, ExistingRowsAfter, EndTime, ExecutionStatus ) VALUES ( @EXECUTION_ID, @PACKAGE_NAME, @TASK_ID, @TASK_NAME, @TARGET_TABLE_NAME, @ExistingRowsBefore, GETDATE(), NULL, --@DeletedRows,
        NULL, --@UpdatedRows,
        NULL, --@InsertedRows,
        NULL, --@ExistingRowsAfter
        NULL, --@EndTime
        0 -- In process
) END

更新 Task 狀態表 

當 @DeletedRows = -1 的時候,表明操作是 Truncate 操作。

IF OBJECT_ID('dbo.USP_UPDATE_TASK_EXECUTION' ) IS NOT NULL
DROP PROCEDURE dbo.USP_UPDATE_TASK_EXECUTION GO

CREATE PROCEDURE dbo.USP_UPDATE_TASK_EXECUTION @ExecutionID NVARCHAR(250), @TaskID NVARCHAR(250), @DeletedRows BIGINT, @UpdatedRows BIGINT, @InsertedRows BIGINT 
AS
BEGIN

    UPDATE dbo.TASK_EXECUTION_STATUS SET DeletedRows = (CASE WHEN @DeletedRows = -1 THEN ExistingRowsBefore ELSE @DeletedRows END), UpdatedRows = (CASE WHEN @DeletedRows = -1 THEN 0 ELSE @UpdatedRows END), InsertedRows = (CASE WHEN @DeletedRows = -1 THEN 0 ELSE @InsertedRows END), ExistingRowsAfter = (CASE WHEN @DeletedRows = -1 THEN 0 ELSE (ExistingRowsBefore + @InsertedRows - @DeletedRows) END), EndTime = GETDATE(), ExecutionStatus = 1
    WHERE EXECUTION_ID = @ExecutionID
        AND TASK_ID = @TaskID
END 
GO

SSIS 包中的流程實現

SSIS 包 - 第二個和第三個 Task 的功能完全一樣,只為了演示的目的。

EST_TRUNCATE_SALES_ORDER_DETAIL Task - 在加載數據之前刪除表數據。

它的 OnPreExecute 事件中添加了一個 Execute SQL Task 組件用來向 Task Execution 表插入操作前的記錄。

調用 USP_INSERT_TASK_EXECUTION 存儲過程根據表名查詢記錄數。

參數 Mapping 關系,注意這里要用 Source ID , Source Name 而不是 Task ID, Task Name。因為 Task 是指當前執行這些 SQL 的 Task 自身,而我們要監控的不是這個事件下的 Task ,而是控制流中的 Task 組件。

EST_TRUNCATE_SALES_ORDER_DETAIL 中 OnPostExecute 的配置 -

這里就是 Update 操作了,因為 EST_TRUNCATE_SALES_ORDER_DETAIL 是 Truncate 表操作,所以這里給了 DeletedRows = -1。

更新的時候直接根據 Execution ID 和 Task ID 就可以了。

第二個 Task 也要配置 OnPreExecute 和 OnPostExecute 事件,也就是說每一個你需要監控的 Task 都要配置。感覺比較復雜,但是一次配置完成以后,受用可是長期的。

要注意的是第二個 Task 是從數據源加載數據,這樣需要在加載的過程中獲取記錄數,通過 ROW COUNT 可以實現將數據流的條數賦值給變量保存。

另外要注意的是 - 這個變量的 SCOPE 是控制流組件自身,即作用域。因為可能要有很多 Task 需要用到記錄條數的變量,全部放到包級別中這個變量會非常多,並且容易出錯。可以理解為 InsertedRows 是局部變量,它的生命周期就是 Task 本身。

如果創建的變量位於包級別 SCOPE,可以點擊下方的小方框 Move 到當前 Task 的 SCOPE 中。

變量的賦值。

OnPreExecute 的配置和上面的 Task 一樣,復制一份即可,這里是 OnPostExecute 的配置。

需要什么變量就記錄什么變量,就配置什么變量。

后面的兩個 Task 一模一樣,只是為了測試使用。

運行兩次的結果,數據條數的記錄是非常連貫的。

記錄 SCD 的修改和新增條數只需要在相應的地方添加 ROW COUNT 組件來捕獲即可。

當然除了使用 ROW COUNT 組件,在某些特定的情況下也可以使用 @@ROWCOUNT 來獲取新增,刪除或者修改所影響到的條數。

DECLARE @UpdateRowCnt INT
DECLARE @InsertRowCnt INT

--Inserting records from Source to Destination which does not exists
insert into dbo.Client(ClientName,Country,Town) Select clientName, Country, Town from dbo.ClientSource S WHERE NOT EXISTS ( Select 1 from dbo.Client CL WHERE CL.ClientName=S.ClientName) SELECT @InsertRowCnt=@@ROWCOUNT

--Update Already existing records from Source
Update CL set CL.ClientType=S.CLientType from dbo.Client CL INNER JOIN dbo.ClientSource S ON Cl.ClientName=S.ClientName SELECT @UpdateRowCnt=@@ROWCOUNT

最后一個問題

如果每次都在各個 Task 中的 OnPreExecute 和 OnPostExecute 中配置非常麻煩,有沒有改進的方法。

答案是有的。

我提供一個思路,有興趣的話可以自動動手嘗試 -

Task 級別的 OnPreExecute 和 OnPostExecute 事件是當 Task 被執行前后被觸發的,要注意的是包級別的 OnPreExecute 和 OnPostExecute 也是可以捕獲 Task 級別的 OnPreExecute 和 OnPostExecute 事件。

可以定義一張表,表中記錄需要被處理的 Task 名稱,然后在包級別的 OnPreExecute 和 OnPostExecute 中處理 各個 Task 的 Auditing 信息。不在列表上的,就可以不用處理。

同時還要注意 Task 同步的問題,若是很多 Task 同時執行,並行執行的話,就需要在各自 Task 中定義好變量來記錄然后再賦值給 Package 級別的變量可以避免這一問題。

與本文相關的文章

如何在 ETL 項目中統一管理上百個 SSIS 包的日志和包配置框架

更多 BI 文章請參看 BI 系列隨筆列表 (SSIS, SSRS, SSAS, MDX, SQL Server)      如果覺得這篇文章看了對您有幫助,請幫助推薦,以方便他人在 BIWORK 博客推薦欄中快速看到這些文章。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM