Kettle 4.2源碼分析第三講--Kettle 轉換機制transformation介紹


轉換機制

  每個轉換步驟都是ETL數據流里面的一個任務。轉換步驟包括輸入、處理和輸出。輸入步驟從外部數據源獲取數據,例如文件或者數據庫;處理步驟處理數據流,字段計算,流處理等,例如整合或者過濾。輸出步驟將數據寫會到存儲系統里面,例如文件或者數據庫。

 

圖 1 轉換步驟示例

1. Step類圖簡介

  Kettle為擴展插件提供了4個擴展點,這4個擴展點也是每個步驟的組成。每個類都有其特定的目的及扮演的角色。以TableInput為例,下圖說明了這4個類的繼承體系。

 

圖 2 StepInterface繼承體系

  實現StepInterface接口的類,在轉換運行時,將是數據實際處理的位置。每個執行線程都表示一個實現StepInterface的實例。

  BaseStep實現了StepInterface是各step具體實現類的基類。完成了公用的處理函數,如putRow(),但是對於更具體的processRow()在StepBase的子類中。StepBase的主要成員有

  public ArrayList<RowSet>  inputRowSets,outputRowSets;

  StepBase的子類每次從inputRowSets中取出一行數據,向outputRowSets中寫入一行數據。

 

圖 3 StepDataInterface繼承體系

  實現StepDataInterface接口的類為數據類,當插件執行時,對於每個執行執行的線程都是唯一的。保存於step相關的數據信息,比如行的元數據信息。

 

圖 4 StepMetaInterface繼承體系

  實現了StepMetaInterface接口的類為元數據類。它的職責是保存和序列化特定步驟的實例配置,例如保存步驟的名稱、字段名稱等,如何生成加載xml或者讀寫數據庫。

圖 5 StepDialogInterface繼承體系

  實現了StepDialogInterface接口的類為對話框類,該類實現了該步驟與用戶交互的界面,它顯示一對話框,通過對話框用戶可以根據自己的要求進行步驟的設定。該對話框類與元數據類關系非常密切,對話框里面的配置數據均會保存在元數據類里面。

2. 步驟間交互通信類

2.1.    RowSet

圖 6 步驟之間通信機制

  RowSet的實現類,負責步驟之間的相互通信,rowset對象即是前一個step的成員也是后一個step的成員,訪問是線程安全的。

 

圖 7 RowSet實現類內存快照

  RowSet類中包含源step,目標step和由源向目標發送的一個rowMeta和一組data。其中data數據是以行為單位的隊列(queArray)。一個RowSet作為此源step的outputrowsets的一部分。同時作為目標step的inputRowsets一部分。源Step每次向隊列中寫一行數據,目標step每次從隊列中讀取一行數據。

圖 8 RowSet實現類

2.2.    行元數據

  所有的data均擦除為object對象。步驟與步驟之間以行為單位進行處理,自然需要知道每行的結構,即行元數據。行元數據至少需要包括類型、名稱,當然還可能包括字段長度、精度等常見內容。

  行元數據不僅在執行的時候需要,而且在轉換設置的時候同樣需要。每個步驟的行元數據都會保存在.ktr文件或者數據庫里面,所以可以根據步驟名稱從TransMeta對象中獲取行元數據。

  行元數據的UML類圖結構如下所示,主要有單元格元數據組成行元數據。在現有的版本中,支持的數據類型有String、Date、BigNumber、Boolean、SerializableType、Binary、Integer、Numberic。

 

圖 9 行元數據UML類圖

3. Trans配置及開啟

 

圖 10 Trans執行時序圖

  在真正運行trans之前,還需要對運行模式進行一個設置。設置結果,會傳給TransGraph.start(executionConfiguration)。配置界面如下所示:

 

圖 11 執行轉換模式設置

實例化Trans的基本流程如下,Trans類時最后真正執行轉換的類。實例化之前需要配置啟動項,保持.ktr文件同步,然后實例化Trans類。最后,開啟后台程序,這樣不會影響UI的操作,真正的轉換在后台執行。

 

圖 12 實例化Trans流程圖

4. Trans執行

  trans類的執行有execute()負責,主要包含兩個步驟:轉換執行前的准備工作和所有線程的開啟。Trans每一個步驟都會對應一個獨立的線程,線程之間公國RowSet進行通信交互。

代碼 Trans執行代碼

1   public void execute(String[] arguments) throws KettleException {
2 
3        prepareExecution(arguments);
4 
5        startThreads();
6 
7 }

4.1.    執行准備(prepareExecution)

該步驟,主要完成對通信類的初始化,對步驟的包裝初始化。最后啟動各個步驟初始化線程,即調用各個步驟的init()方法。准備結束之后,步驟之間的通信機制完成了,各個步驟的初始化工作也完成了。具體的流程如下所示:

 

圖 13 准備執行流程圖

1.4.2.    轉換處理執行

Trans轉換執行引擎類,通過startThreads()啟動步驟線程。為所有步驟添加監聽器,在開啟監聽進程對所有線程進行監聽。具體的步驟如下所示

 

圖 14 啟動所有步驟線程

1.4.3.    步驟執行過程

  實現StepInterface的不同的step各個功能個不一樣,但是它們之間也有一定的規律性。下圖只列舉了兩個step,(TextInput)文本輸入和Uniquerow(去重)。BaseStep封裝了getRow()和putRow()方法,從上一個步驟獲取數據和將數據輸入到下一個步驟。

  基類BaseStep采取了統一的處理方式,調用子類processRow以行為單位處理,核心代碼如下。

  while (stepInterface.processRow(meta, data)&& !stepInterface.isStopped());

  processRow( )通用過程是:調用基類BaseStep 的getRow( )得到數據,對一行數據處理,處理之后調用基類putRow( )方法數據保存至outputRowSets(即next step的inputRowSets)

 

 

圖 15 TextInput與Uniquerow

1.4.4.    元數據與數據關系。

  Trans中的ETL過程(每個step)以行為單位處理,其中行的元數據信息RowMeta和數據信息統一保存在RowSet對象中。

  在RowSet中RowMeta的成員的調試結果如下。可見rowMeta儲存了每列數據的名稱和類型。第一列列名flag,數據是長度為1的String;第二列列名id…

 

RowSet的數據信息在queArray隊列中,調試結果如下:可以看出第一個數據元素是一個Object包含了3列,數據內容為(N,1,a…)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM