Azure Data Factory 系列博客:
- ADF 第一篇:Azure Data Factory介紹
- ADF 第二篇:使用UI創建數據工廠
- ADF 第三篇:Integration runtime和 Linked Service
- ADF 第四篇:管道的執行和觸發器
- ADF 第五篇:轉換數據
- ADF 第六篇:Copy Data Activity詳解
- ADF 第七篇:控制流概述
- ADF 第八篇:傳遞參數(Pipeline的Parameter和Variable,Activity的output)和應用表達式
在Azure 數據工程中,可以使用Copy Data 活動把數據從on-premises 或雲中復制到其他存儲中。Copy Data 活動必須在一個IR(Integration Runtime)上運行,對於把存儲在on-premises中的數據復制到其他存儲時,必須創建一個self-hosted Integration Runtime。
一,認識Copy Data Activity
創建一個Pipeline,從Activities列表中找到“Copy data”,拖放到Pipeline畫布中,如下圖所示:
在General選項卡中,設置Activity的常規屬性
- Name:為Activity命名
- Timeout:設置Activity的超時時間
- Retry:重試次數
- Retry interval:重試一次間隔的時間,單位是second
- Secure output:安全輸出,如果勾選,那么該Activity的輸出不會記錄
- Secure input:安全入,如果勾選,那么該Activity的輸入不會記錄
二,設置源屬性
Source選項卡用於設置Copy data Activity的源屬性,
1,Source 的常規設置
Source dataset:設置源的dataset
use query:Table選項表示整個表作為一個數據源,Query或 Store procedure選項表示使用查詢語句或存儲過程來獲取數據源。
Query timeout(minutes):表示查詢超時的時間
Isolation level:設置查詢隔離級別,作用於數據源。
2,Partition option
指定從SQL Server加載數據的分區選項,當啟用分區選項時(不是None),從SQL Server 同時加載數據的並發度由Copy data Activity的Degree of copy parallelism屬性設置。Physical Partitions Of Table表示數據工廠根據原始表的分區定義來確定分區列和分區機制;當選擇Dynamic range選項時,用戶還需要設置Partition column name、Partition upper bound 和Partition lower bound三項,手動設置分區列和分區機制。
3,Additional columns
添加額外的列,Value由三種類型:Add dynamic content、$$COLUMN和Custom。
$$COLUMN:表示把源的指定列復制為另一列
Custom:表示添加一列,列指是常量
Add dynamic content,表示添加動態上下文(Dynamic Content),動態上下文是指數據工廠的上下文,這些動態上下文由系統變量(System variables)來提供:
三,設置Sink
Sink是Copy Data Activity復制數據的目標數據集,Data Factory 使用 Sink dataset來設置目標。
1,Store procedure name
從Sink dataset中選擇存儲過程,該存儲過程定義了如何把元數據應用於目標表。該存儲過程每個batch調用一次,對於僅運行一次且與源數據無關的操作,請使用 Pre-copy script 屬性。
如果使用Pre-copy script 屬性,通常意味着數據是全量更新,重寫整個表,比如以下腳本:
truncate table staging_table
Copy data activity的執行過程是:每次執行Copy data activity,數據工廠首先執行Pre-copy script,然后使用最新的數據插入數據到target table。
如果使用存儲過程,通常是對數據進行增量更新,要實現增量更新,實際上是把數據集作為參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型,存儲過程的代碼實現如下腳本所示,
CREATE PROCEDURE spOverwriteMarketing
@Marketing [dbo].[MarketingType] READONLY
, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END
2,Table option
如果設置為Auto create table,那么當目標表不存在時,數據工廠根據Source 的元數據自動創建目標表。
3,常規設置
Write batch timeout:每個batch數據寫入的超時時間
Write batch size:每個batch的數據行數量
Max concurrent connections:訪問數據存儲的最大的並發連接數量
四,設置Mapping
在Mapping選項卡中,主要設置Source 和 Sink之間的列映射
1,Type conversion settings用於設置類型轉換
- Allow data truncation: 在把source數據轉換到sink時,如果字段的類型不同,允許數據截斷。
- Treat boolean as number:把bool值作為數值來看待,把true看作1,把false看作1
- DateTime format:DateTime類型的格式
- DateTimeOffset format:數據間隔的格式
- TimeSpan format:TimeSpan的格式
- Culture:locale
2,列映射
設置列與列之間的映射關系,用戶需要點擊“Import schemas”來導入架構元數據。
五,設置Settings
配置Copy data Activity的設置
1,常規的設置
- Data integration unit:數據集成的單元
- Degree of copy parallelism:指定數據加載時並發度
- Data consistency verification:當勾選時,Copy data Activity會在數據移動之后,對數據進行一致性檢查
- Enable logging:啟用日志,記錄復制的文件,跳過的數據行和文件
- Enable staging:指定是否要通過臨時存儲來復制數據
2,設置Fault tolerance
當設置Fault tolerance (錯誤容忍)之后,用戶可以忽略在復制數據過程中出現的一些錯誤,可以忽略的錯誤類型主要有三個:
- Skip incompatible rows:跳過不兼容的行
- Skip missing files:跳過缺失的文件
- Skip forbidden files:跳過禁止的文件
六,數據更新的全量更新和增量更新
數據更新的方式主要有:全量更新、追加數據、增量更新。
1,數據的全量更新和追加更新
如果使用Pre-copy script 屬性,通常意味着數據是全量更新和追加更新。
在插入數據之前,如果先清空目標表,再向目標表插入數據,這種方式是全量更新;如果不清空目標表,只是向目標表插入新的數據,那么就是追加更新,前提是保證數據是無重復的新數據。
2,通過存儲過程來實現Copy data Activity的增量更新
如果Sink屬性使用存儲過程,那么是對數據進行增量更新。實現數據的增量更新,實際上是把數據集作為參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型。
由於存儲過程在連接表變量時,性能較差,建議對分batch插入,每個batch進行一次插入操作。
創建一個表類型,作為存儲過程的參數,表的架構和輸入數據的架構相同:
CREATE TYPE [dbo].[MarketingType] AS TABLE
( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL, [Category] [varchar](256) NOT NULL )
創建存儲過程,第一個變量是表變量,該存儲過程的作用是把表變量的數據更新到Sink指定的target table中。
CREATE PROCEDURE spOverwriteMarketing
@Marketing [dbo].[MarketingType] READONLY
, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END
3,使用臨時表來實現增量更新
先把數據加載到臨時表,通過merge語句把臨時數據歸並到product table。
參考文檔:
Copy data to and from SQL Server by using Azure Data Factory