IBM InfoSphere DataStage 8.1 DataStage Job 開發具體解釋


簡單介紹

DataStage 使用了 Client-Server 架構,server端存儲全部的項目和元數據,client DataStage Designer 為整個 ETL 過程提供了一個圖形化的開發環境。用所見即所得的方式設計數據的抽取清洗轉換整合和載入的過程。Datastage 的可執行單元是 Datastage Job ,用戶在 Designer 中對 Datastage Job 的進行設計和開發。

Datastage 中的 Job 分為 Server Job, Parallel Job 和 Mainframe Job 。當中 Mainframe Job 專供大型機上用。經常使用到的 Job 為 Server Job 和 Parallel Job 。本文將介紹怎樣使用 Server Job 和 Parallel Job 進行 ETL 開發。

Server Job

一個 Job 就是一個 Datastage 的可執行單元。Server Job 是最簡單經常使用的 Job 類型,它使用拖拽的方式將主要的設計單元 -Stage 拖拽到工作區中。並通過連線的方式代表數據的流向。通過 Server Job,能夠實現下面功能。

  1. 定義數據怎樣抽取
  2. 定義數據流程
  3. 定義數據的集合
  4. 定義數據的轉換
  5. 定義數據的約束條件
  6. 定義數據的聚載
  7. 定義數據的寫入

Parallel Job

Server Job 簡單而強大,適合高速開發 ETL 流程。Parallel Job 與 Server Job 的不同點在於其提供了並行機制,在支持多節點的情況下能夠迅速提高數據處理效率。Parallel Job 中包括很多其它的 Stage 並用於不同的需求。每種 Stage 使用上的限制也往往大於 Server Job。

Sequence Job

Sequence Job 用於 Job 之間的協同控制,使用圖形化的方式來將多個 Job 匯集在一起,並指定了 Job 之間的運行順序。邏輯關系和出錯處理等。

數據源的連接

DataStage 可以直接連接許多的數據源,應用范圍很大,可連接的數據源包含:

  • 文本文件
  • XML 文件
  • 企業應用程序。比方 SAP 、PeopleSoft 、Siebel 、Oracle Application
  • 差點兒全部的數據庫系統,比方 DB2 、Oracle 、SQL Server 、Sybase ASE/IQ 、Teradata 、Informix 以及可通過 ODBC 連接的數據庫等
  • Web Services
  • SAS 、WebSphere MQ

Server Job

Server Job 中的 Stage 綜述

Stage 是構成 Datastage Job 的基本元素。在 Server Job 中。Stage 可分為下面五種:

  1. General
  2. Database
  3. File
  4. Processing
  5. Real Time

本節中將介紹怎樣使用 Datastage 開發一個 Server Job。如圖 1 所看到的:

圖 1. Server Job
圖 1. Server Job

Sequential File Stage

Sequential File Stage 可用來從一個 Sequential 文件里獲取源數據或將數據載入到一個 Sequential 文件里。在使用 Sequential File Stage 時須要指定文件的路徑和名稱,文件的格式,列的定義和文件寫入的類型(覆蓋或追加)。

圖 2. Sequential File 屬性框
圖 2. Sequential File 屬性框
圖 3. Sequential File 列定義
圖 3. Sequential File 列定義

上圖是本節樣例中使用到的 Sequence File。

在 Input 頁中。File Name 參數代表文件的實際路徑,假設文件不存在將會被自己主動建立。Update Action 中選擇 Overwrite existing file 表示此文件在載入數據之前將被清空;在 Format 頁中,定義文件的格式,比如分隔符,NULL 值,首行是否為列定義等;在 Column 頁中,須要輸入文件的列定義。

 Hash File Stage

Hash File 以主鍵將記錄分成一個或多個部分的文件,在 Datastage 中通常被用做參考查找。

在進行參考查找的時候,Hash File 文件會被載入到內存中,因此具有較高的查找效率。

和 Sequence File 類似,使用 Hash File 時須要輸入文件的實際地址。通過參數設置寫入時的選項,並提供數據的列定義。須要注意的是,Hash File 須要指定主鍵。假設未指定,第一列被默覺得主鍵。進行參數查找時,使用主鍵值在 Hash File 中搜索。假設找到則返回該數據。假設未找到則返回 NULL 值。

圖 4. Hash File 屬性框
圖 4. Hash File 屬性框

Transformer Stage

Transformer Stage 是一個重要的,功能強大的 Stage。它負責 ETL 過程中的數據轉換操作。在 Transformer Stage 中能夠指定數據的來源和目的地,匹配相應輸入字段和輸出字段,並指定轉換規則和約束條件。

圖 5. Transformer Stage 列映射
圖 5. Transformer Stage 列映射

Transformer Stage 中分為 5 個區域:

左上方區域,是用表格形式描寫敘述的輸入數據信息。

假設有多條輸入數據流。則有非常多表格。

本例中有一個輸入,一個參照查詢,因此左上方有兩個表格。

右上方區域。是用表格形式描寫敘述的輸出信息。

左下方區域為輸入的元數據列定義,包含列名。類型和長度等屬性。

右下方區域為輸出的元數據列定義,包含列名,類型和長度等屬性。

左上方和右上方的表格由帶有流向的箭頭連接,代表了字段的相應關系。

此例中,輸入的數據僅僅有一個字段 EMPLOYEE_ID。通過此字段在 Hash File 中進行參照查找。獲取 EMPLOYEE_NAME 字段。假設在 Hash File 中找到了 EMPLOYEE_NAME 則將數據發送到輸出端,這個條件是通過 Transformer Stage 提高的約束功能實現。我們在約束中的定義為 NOT(ISNULL(lkp_name.EMPLOYEE_ID))。另外不管是否在 Hash File 中查找到相應的數據,我們都將數據記錄到一個 csv 文件里,即相應的 save_all 輸出。

Parallel Job

Parallel Job 的 Stage 綜述

與 Server job 相比。Parallel Job 提供了更豐富的 stage。添加了 Development/Debug,Restructure 和 Transactional 類的 stage。同一時候。對於一些在 server job 中能夠在 transformer 中完畢的功能,Parallel job 也提供了專用的 stage 以提高執行性能和開發效率,比方 lookup。join,Compare 等。另外一個顯著的差別是在 Parallel Job 中內置地支持 job 的並行執行,並行執行也就意味着數據在 job 中的各個 stage 見處理時須要處理 partition 和 combination 的問題,所以在開發 job 時,我們須要設定 partition 和 combination 的策略。

Lookup DataSet 與 Lookup   Stage

Parallel Job 對 lookup 的實現做了一些調整,在 Server Job 中。我們通常是用 Transformer Stage 配合 lookup 數據源(通常是 hash 文件)來實現 lookup,同一個 transformer 中能夠同一時候完畢多個 lookup。類似於 sql 中的多表自然聯接,假設 lookup 數據源使用的是 database stage 而不是 hash file 並且對於一條記錄返回多條 lookup data 的話。job 會產生 warning(hash file 的鍵唯一特性使得它不會存在這個問題,后面插入的反復數據會覆蓋前面的同主鍵的數據)。

而在 Parallel Job 中,lookup 須要用一個單獨的 stage 來實現。transformer 不再兼職 lookup 的“副業”。在一個 lookup stage 中,能夠有一個主數據 link 和多個 lookup link。同一時候,Parallel 中的 lookup 還有下面的新特性

  • 支持 multi rows,在一個 lookup stage 中對於一行主輸入數據能夠有一個 lookup link 返回多於一行的 lookup 數據。

    結果也會變成多行。

  • Parallel 中不在支持 hash file,轉而使用封裝更強的 Data Set stage, Data Set 本質上也是 hash 數據結構,但對 Job 開發者隱藏了實現細節,我們不用象開發 Server Job 那樣去手動設定具體參數
  • Parallel 中除了支持等值 lookup 外,還直接支持 Range lookup 和 Caseless lookup。這樣我們在完畢類似月份轉換為季度性質的設計時就會很的方便和自然。

類似於 Server Job 中的 hash 文件,在 Parallel Job 中我們使用 Data Set 文件來緩存 lookup 數據,並載入到內存中,在 Data Set stage 中。我們僅僅須要制定記錄的主鍵和存儲的文件名稱,Parallel 引擎會為我們處理其它的操作。

但為了達到性能的最優化,我們有時須要制定 Data Set 的緩存策略和緩存大小,系統默認的緩存大小是 3M。假設我們的 lookup 數據比較大。就須要設定合適的緩存大小。否則會嚴重影響 lookup 的性能。

圖 6. DataSet 緩存設置
圖 6. DataSet 緩存設置

Sort Stage

Parallel Sort stage 的行為類似於 Sql 中的 order by,可是比 order by 提供了很多其它的選項。

在 job 中,Sort stage 接收一個輸入 link 並產生一個輸出 link。對於寫過 sql order by 或者排序程序的開發者使用 Sort Stage 的基本功能應該是非常easy的,可是要充分發揮 Parallel stage 的強大功能。我們還是須要注意以下幾點:

  • 並行還是串行運行,假設選擇串行運行。那么 Sort stage 的行為就類似於 Server Job 中的 Sort Stage,整個輸入數據都會依照設定的排序選項排序,但假設選擇分區 / 並行排序。則僅僅有每一個分區內的輸出是有序的,這在有些情況下是能夠接受的。但在另外一些情況下會導致代碼缺陷,須要依據 sort 的興許操作做出選擇。
  • 假設有可能。盡量在數據源端的數據庫中進行排序,這樣不但會提高數據排序的效率,還能大大降低 job 對內存。I/O 的壓力。Sort stage 僅僅有在接收完輸入之后才干完畢排序,進而輸出數據,使得 job 的興許 stage 都處於等待狀態。
  • 類似於 order by 后面的字段列表,我們能夠指定排序的方向,是升序還是降序,Sort Stage 也能夠指定對多個字段進行排序,排在前面的 column 稱為主排序字段,假設排序字段中有某一個或幾個字段已經是有序的,我么也能夠指定其為有序,這樣在排序的時候就能夠提高排序的效率。
  • 穩定排序(stable sort)/ 同意反復。stable sort 默認是 yes,這樣假設兩條記錄 sort key 同樣的話,排序的輸出和輸入順序將是同樣的,假設沒有選擇同意反復。兩條或者多條記錄的 sort key 同樣的話。將僅僅保留一條記錄。
  • 限制內存的使用,數據的排序操作是很耗費內存的,假設不加限制。讓全部的數據的排序都在內存中完畢的話,job 的其它操作或者其它 job 的運行的效率將受到嚴重影響,全部在 Sort Stage 中。我們能夠設定此排序能夠使用的最大內存數(M),這樣我們在能夠接受的排序效率和使用的內存數量之間找到平衡點。

Compare/Difference/Change Capture Stage

Compare, Difference 和 Change Capture Stage 是 Parallel job 中三個用於比較數據集合異同的 stage,對於這三個 stage 本身的使用沒有太多困難的地方,主要的參數和設置都非常簡明直觀,我們的介紹主要集中在這三個 stage 在使用中的同樣點和不同點上,一旦了解了這些 stage 的特點。使用的時候不但能依據需求選擇正確的 stage,也能依據 stage 特性知道須要設置哪些參數。

同樣點:

  • 都有兩個輸入,產生一個輸出,
  • 輸入的數據主鍵的字段名同樣,都須要指定須要比較的字段。

  • 產生的結果數據中都會添加一個整型的結果字段,用於表示兩行數據的比較結果

不同點:

  • Capture Change Stage 輸出的是以 after 輸入流為基礎,外加 change code 字段,適合和 change apply 配合使用,把 before 輸入流同步為和 after 一樣。
  • Difference Stage 的輸出是以 before 輸入流為基礎,外加 change code 字段
  • Compare Stage 產生的結果包含 before 和 after。以及 change code 字段

以下是一個 Capture Change Stage 的演示樣例:

圖 7. Capture Change Stage 的演示樣例
圖 7. Capture Change Stage 的演示樣例
Before source Sql: SELECT k,v 
FROM (values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10))
as temp(k,v) order by k asc 

After source Sql: SELECT k,v 
FROM (values (1,1),(2,2),(11,11),(4,5),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10))
as temp(k,v) order by k asc
圖 8. Capture Change Stage 參數設置
圖 8. Capture Change Stage 參數設置

從以上設置能夠看到。我們選擇了明白指定主鍵。剩余 column 都當作 value,對於比較的結果,假設結果同樣。則從結果中刪除,也就是我們僅僅希望看到對 Before 數據做 Delete。Edit,和 insert 后產生的差異,下圖是我們 job 執行得到的結果:

圖 9. Comparsion 結果
圖 9. Comparsion 結果

從以上的結果能夠看到。before 和 after 數據有三處差異。change_code 的值相應為 2,3。1。分別表示對 before 運行 Delete,Update。Insert 產生的差異。要同步這些差異,我們僅僅須要對 before 數據運行相應的 Delete,Update 和 Insert 就可以實現兩個數據集合的同步。

Filter Stage

Filter Stage 顧名思義是一個用於過濾的 Stage,其作用類似於我們寫 sql 中的 where 子句,並且其支持的邏輯表達式和運算符也類似於 sql 語句的 where 子句,比方,在 filter stage 中,我們能夠使用下面常見的邏輯表達式和運算符以及其組合 ,

  • true 和 false
  • 六個比較運算符 : =, <>, <, >, <=, >=
  • is null 和 is not null
  • like 和 between

從其語法作用上看。則類似於 java 或者 C 語言中的 switch case 語句。我們能夠通過設置“Output Row Only Once”選項的值來決定是否在每一個 case when 子句后面是否加入 break。通過加入或者刪除“Reject Link”來確定是否加入一個 default 子句 . 以下是一個簡單的樣例。

展示了我們怎樣通過員工編號和薪水的組合條件來過濾員工的記錄到不同的結果文件的。

圖 10. Filter Stage 的演示樣例
圖 10. Filter Stage 的演示樣例
圖 11. Filter Stage 的設置
圖 11. Filter Stage 的設置

對於每個 where 條件。我們須要設置相應的輸出鏈接。這是一個整型數字,我們能夠在“Link Ordering”頁簽上找到輸出鏈接的編號和名稱之間的相應關系。

另外須要注意的一點是,Filter Stage 不正確輸入的記錄做不論什么修改,僅僅做分發。可是你能夠手動設置輸出的 column,使得每一個輸出的 column 列表不一樣,但僅僅要是輸入 column 列表的子集就可以。可是對於 Reject Link, Column 列表是默認全然等同於輸入的,且不可更改。

用於調試的 Stages

我們知道 DataStage Server Job 中提供了 Debug 功能,我們在開發過程中遇到問題的時候能夠讓 Job 執行在 debug 模式下。細致查看每行數據在 Job 中各個 Stage 之間的流動和轉換情況,但 Parallel Job 並沒有給我們提供調試功能。但 Parallel Job 用第二種方式提供了調試的能力:Parallel Job 內置了用於調試的 Stage,使用這些 Stage,我們能夠依照我們的須要,把我們懷疑有問題的中間數據輸出,進而能夠深入查找問題的根源。在 Parallel Job 中提供了下面的 Stage 用於調試:

  • Head Stage
  • Tail Stage
  • Sample Stage
  • Peek Stage
  • Row Generator Stage
  • Column Generator Stage

我們以一個 peek 的實例展示一下 development/debug Stage 的使用。其它的 Stage 的使用方法類似,能夠參見后面的表格和文檔。例如以下圖是我們的 Job。DB2 Stage 的 Source Sql 例如以下:

 SELECT k,v FROM (values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),
 (10,10),(11,11),(12,12),(13,13),(14,14),(15,15),(16,16),(17,17),(18,18),(19,19),(20,20) )
 as temp(k,v)
圖 12. Peek Job 的演示樣例
圖 12. Peek Job 的演示樣例
圖 13. Peek Stage 的設置
圖 13. Peek Stage 的設置
圖 14. Peek 的結果
圖 14. Peek 的結果

下表簡述了這些 Stage 的特點和使用方法

表 1. Stage 的特點和使用方法
Stage 類型 用途 可設置項目
Head Stage 從頭部開始抓取輸入流的數據
一個輸入一個輸出
抓取的行數
從哪些分區抓取
每一個分區的起始位置
每次抓取的間隔
Tail Stage 抓取輸入流尾部的 N 行數據
一個輸入一個輸出
抓取的行數
從哪些分區抓取
Sample Stage 一個輸入,多個輸出依據設置的策略從輸入流的各個分區抓取數據,每一個輸出流有不同的百分比設置 百分比
隨機數的種子
每一個分區抓取的最多行數
Peek Stage 從一個數據流中有選擇地“偷窺”流經的數據。一個輸入。兩個輸出,一個輸出原樣輸出輸入數據,一個輸出生成一個文本,包括“偷窺到的數據” 每次抓取的間隔
每一個分區抓取的行數
“偷窺”哪些行
輸出“偷窺”結果到 log 還是輸出
Row Generator Stage 依據定義的數據 schema,生成模擬數據,無需輸入,一個輸出 Schema(column list 或者 schema file)
生成的行數
Column Generator Stage 在一個輸入流上加入新的 column,為這些新加入的或原有的 column 生成模擬數據 須要生成模擬數據的 column

Sequence Job

假設說每一個個 Server Job 或 Parallel Job 完畢 ETL 流程中的一個數據抽取,轉換。載入的子過程的話,那么 Sequence Job 的作用就是把這些子過程給串聯起來。形成一個完整的全局的 ETL 過程。從結構上看,一個 Sequence Job 類似於一個 C 或者 Java 語言的一個函數。但功能更為強大。

  • 能夠使用 UserVariables Activity Stage 定義局部變量。變量在定義的時候須要賦值,賦值表達式能夠是系統變量,Job 參數,Datastage 的宏。常量,Routine 的返回結果等。還能夠是這些單獨變量的條件。數學或者字符串運算后的結果。差點兒你在一個函數中能完畢的局部變量定義的功能這兒都能實現。以下的演示樣例定義了六個變量。
  • 能夠調用其它的功能模塊,通過 Job Activity Stage 能夠調用 Server Job,Parallel Job;通過 Execute Command Stage 調用 unix/windows cmd。通過 Routine Activity 支持調用 datastage routine。

  • 支持循環,Sequence Job 通過 StartLoop Activity Stage 和 EndLoop Activity Stage 提供了循環的功能。循環變量能夠是基於起始值,結束值和步長的整數循環,也能夠基於給定的列表進行循環。還能夠把這些循環中的暫時變量傳遞給每一個詳細的循環步驟。

    在 StartLoop Activity Stage 和 EndLoop Activity Stage 之間。能夠增加隨意多個的

  • 支持邏輯運算。Nested Condition Stage 支持類似 switch 的邏輯,Sequencer Stage 支持與和或的邏輯運算,通過這些 Stage 的組合,能夠支持隨意復雜的邏輯控制。

  • 支持 email 通知,使用 Notification Stage,在 job 執行成功,失敗或者滿足其它設定條件時,Sequence Job 能夠發送一封或者多封的通知郵件,使我們能夠更方便地監控 Job 的執行狀態。郵件的內容能夠包括 job 的執行狀態,當前的參數等等,凡是能夠在 User Variables Stage 中引用的變量都能夠包括在郵件中。同一時候還能夠包括我們指定的文件和 Sequence Job 的執行狀態等。
  • 支持錯誤處理和現場清理,使用 Terminator Activity Stage 和 Exception Handler Stage。我們能夠定義須要處理的錯誤,並在發生錯誤的使用依據定義的策略停止不必要的 Job 執行。
  • 通過 Wait for File Activity Stage 能夠支持等待時間。我們能夠定義僅僅有某個信號文件出現或者消失的時候才開始啟動 Wait for File Activity Stage 興許的執行。

以下的圖展示了一個簡單的 Sequence Job。在 Job 的開始,我們定義一組變量。這些變量在我們循環和發送通知郵件的時候將會被引用。然后利用 Job 參數 yearlist 開始循環,每個循環里面我們調用一次 Job extract_trans, 假設這個 job 調用執行不成功,我們就發郵件通知 Job 執行失敗,否則進入下一個循環,在循環結束后,發郵件通知 Sequence Job 執行成功

圖 15. Job 全景
圖 15. Job 全景
圖 16. 定義的變量
圖 16. 定義的變量
圖 17. StartLoop Stage 的定義
圖 17. StartLoop Stage 的定義
圖 18. Job Activity Triggers 定義
圖 18. Job Activity Triggers 定義


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM