ADF 第六篇:Copy Data Activity詳解


Azure Data Factory 系列博客:

 

在Azure 數據工程中,可以使用Copy Data 活動把數據從on-premises 或雲中復制到其他存儲中。Copy Data 活動必須在一個IR(Integration Runtime)上運行,對於把存儲在on-premises中的數據復制到其他存儲時,必須創建一個self-hosted Integration Runtime。

一,認識Copy Data Activity

創建一個Pipeline,從Activities列表中找到“Copy data”,拖放到Pipeline畫布中,如下圖所示:

在General選項卡中,設置Activity的常規屬性

  • Name:為Activity命名
  • Timeout:設置Activity的超時時間
  • Retry:重試次數
  • Retry interval:重試一次間隔的時間,單位是second
  • Secure output:安全輸出,如果勾選,那么該Activity的輸出不會記錄
  • Secure input:安全入,如果勾選,那么該Activity的輸入不會記錄

二,設置源屬性

Source選項卡用於設置Copy data Activity的源屬性,

1,Source 的常規設置

Source dataset:設置源的dataset

use query:Table選項表示整個表作為一個數據源,Query或 Store procedure選項表示使用查詢語句或存儲過程來獲取數據源。

Query timeout(minutes):表示查詢超時的時間

Isolation level:設置查詢隔離級別,作用於數據源。

2,Partition option

指定從SQL Server加載數據的分區選項,當啟用分區選項時(不是None),從SQL Server 同時加載數據的並發度由Copy data Activity的Degree of copy parallelism屬性設置。Physical Partitions Of Table表示數據工廠根據原始表的分區定義來確定分區列和分區機制;當選擇Dynamic range選項時,用戶還需要設置Partition column name、Partition upper bound 和Partition lower bound三項,手動設置分區列和分區機制。

3,Additional columns

添加額外的列,Value由三種類型:Add dynamic content、$$COLUMN和Custom。

$$COLUMN:表示把源的指定列復制為另一列

Custom:表示添加一列,列指是常量

Add dynamic content,表示添加動態上下文(Dynamic Content),動態上下文是指數據工廠的上下文,這些動態上下文由系統變量(System variables)來提供:

三,設置Sink

Sink是Copy Data Activity復制數據的目標數據集,Data Factory 使用 Sink dataset來設置目標。

1,Store procedure name

從Sink dataset中選擇存儲過程,該存儲過程定義了如何把元數據應用於目標表。該存儲過程每個batch調用一次,對於僅運行一次且與源數據無關的操作,請使用 Pre-copy script 屬性。

如果使用Pre-copy script 屬性,通常意味着數據是全量更新,重寫整個表,比如以下腳本:

truncate table staging_table

Copy data activity的執行過程是:每次執行Copy data activity,數據工廠首先執行Pre-copy script,然后使用最新的數據插入數據到target table。

如果使用存儲過程,通常是對數據進行增量更新,要實現增量更新,實際上是把數據集作為參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型,存儲過程的代碼實現如下腳本所示,

CREATE PROCEDURE spOverwriteMarketing 
  @Marketing [dbo].[MarketingType] READONLY
  , @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source   ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END

2,Table option

如果設置為Auto create table,那么當目標表不存在時,數據工廠根據Source 的元數據自動創建目標表。

3,常規設置

Write batch timeout:每個batch數據寫入的超時時間

Write batch size:每個batch的數據行數量

Max concurrent connections:訪問數據存儲的最大的並發連接數量

四,設置Mapping

在Mapping選項卡中,主要設置Source 和 Sink之間的列映射

1,Type conversion settings用於設置類型轉換

  • Allow data truncation: 在把source數據轉換到sink時,如果字段的類型不同,允許數據截斷。
  • Treat boolean as number:把bool值作為數值來看待,把true看作1,把false看作1
  • DateTime format:DateTime類型的格式
  • DateTimeOffset format:數據間隔的格式
  • TimeSpan format:TimeSpan的格式
  • Culture:locale

2,列映射

設置列與列之間的映射關系,用戶需要點擊“Import schemas”來導入架構元數據。

五,設置Settings

配置Copy data Activity的設置

1,常規的設置

  • Data integration unit:數據集成的單元
  • Degree of copy parallelism:指定數據加載時並發度
  • Data consistency verification:當勾選時,Copy data Activity會在數據移動之后,對數據進行一致性檢查
  • Enable logging:啟用日志,記錄復制的文件,跳過的數據行和文件
  • Enable staging:指定是否要通過臨時存儲來復制數據

2,設置Fault tolerance

當設置Fault tolerance (錯誤容忍)之后,用戶可以忽略在復制數據過程中出現的一些錯誤,可以忽略的錯誤類型主要有三個:

  • Skip incompatible rows:跳過不兼容的行
  • Skip missing files:跳過缺失的文件
  • Skip forbidden files:跳過禁止的文件

六,數據更新的全量更新和增量更新

數據更新的方式主要有:全量更新、追加數據、增量更新。

1,數據的全量更新和追加更新

如果使用Pre-copy script 屬性,通常意味着數據是全量更新和追加更新。

在插入數據之前,如果先清空目標表,再向目標表插入數據,這種方式是全量更新;如果不清空目標表,只是向目標表插入新的數據,那么就是追加更新,前提是保證數據是無重復的新數據。

2,通過存儲過程來實現Copy data Activity的增量更新

如果Sink屬性使用存儲過程,那么是對數據進行增量更新。實現數據的增量更新,實際上是把數據集作為參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型。

由於存儲過程在連接表變量時,性能較差,建議對分batch插入,每個batch進行一次插入操作。

創建一個表類型,作為存儲過程的參數,表的架構和輸入數據的架構相同:

CREATE TYPE [dbo].[MarketingType] AS TABLE
( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL, [Category] [varchar](256) NOT NULL )

創建存儲過程,第一個變量是表變量,該存儲過程的作用是把表變量的數據更新到Sink指定的target table中。

CREATE PROCEDURE spOverwriteMarketing 
  @Marketing [dbo].[MarketingType] READONLY
  , @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END

3,使用臨時表來實現增量更新

先把數據加載到臨時表,通過merge語句把臨時數據歸並到product table。

 

參考文檔:

Copy data to and from SQL Server by using Azure Data Factory

Copy activity in Azure Data Factory


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM