1. Job機制
一個job項代表ETL控制流中的一項邏輯任務。Job項將會順序執行,每個job項會產生一個結果,能作為別的分支上job項的條件。

圖 1 job項示例
1.1. Job類圖簡介
圖 2 Job entry類圖結構
1.1.1. JobEntryInteface接口
JobEntryInterface是Job Entry插件的主要實現接口。主要包含以下功能:
1 保存Job Entry設置
實現類使用私有變量保存設置的參數,通過get、set方法獲取和設置。Dialog實現類會通過這些方法,保存或設置設置界面上的參數。同時,需要提供一個深度拷貝的方法,因為在一些保存參數且可能修改的地方會調用。
圖 3 JobEntryTrans配置界面
2 序列化插件
插件要實現對本插件的序列化,實現兩種方式xml與數據庫。
圖 4 轉換插件xml序列化結果
3 輸出信息提供
一個job entry支持三種類型的輸出:true、flase和無條件。這三種情況不是所有的job entry插件都會同時支持的,例如dummy job entry僅支持true和false。所以,插件必須顯現兩類函數,來查看支持哪種結果。
public boolean evaluates()//是否支持true、false
public boolean isUnconditional()//是否支持無條件執行
4 執行任務
負責工作的執行。
public Result execute()//執行具體的邏輯,需要結果和開始到該項的距離
prev_result.setNrErrors()//設置執行過程中的異常數
prev_result.setResult()//設置結果,如果不知道true/false,結果不設置
最后返回prev_result。
1.1.2. JobEntryDialogInteface接口
負責構建和打開參數設置對話框。Spoon通過調用open函數打開該對話框,spoon是使用swt框架的,所以對話框也應使用swt來實現。
1.2. Job entry交互通信類
1.2.1. Result
每一個jobEntryInterface的實現類在完成相應功能時,返回結果的類型。
主要成員變量:
1 private boolean result;執行是否出現異常 2 3 private int exitStatus; 執行結果狀態 4 5 private List<RowMetaAndData> rows;一個jobEntry完成處理后的數據(若存在) 6 7 private Map<String, ResultFile> resultFiles;
1.3. Job配置及開啟

圖 5 Job開啟時序圖
Job的開啟與Trans相類似,配置執行的參數,檢查.kjb文件是否發生變化,實例化一個Job對象,開啟該線程。
1.4. Job執行
1.4.1. 初始執行excute1()
主要工作是從JobMeta的JobHopMeta找到job入口jobentry信息,根據開始條件調用真正執行jobentry的execute方法2,代碼如下所示:
代碼 4 Job.excute()關鍵代碼
1 startpoint=jobMeta.findJobEntry(JobMeta.STRING_SPECIAL_START, 0, false);// 找到Job開始組件 2 JobEntrySpecial jes = (JobEntrySpecial) startpoint.getEntry(); 3 // JobEntrySpecial是啟動job的job項目 4 Result res = null; 5 while ( (jes.isRepeat() || isFirst) && !isStopped()){ 6 //符合開始條件時,調用execute方法2 7 isFirst = false; 8 res = execute(0, null, startpoint, null, 9 Messages.getString("Job.Reason.Started")); 10 }
1.4.2. 實際執行execute2()
execute()方法包含,的參數有執行次數(START不算,從0開始,順序執行)、接一個Entry執行結果、當前Entry的拷貝、前一個Entry拷貝和原因。
主要功能是根據參數startpoint,提取對應的jobentry,執行對應的jobentry操作,再根據JobMeta的hop信息依次得到下一個jobentry,遞歸調用。具體的執行步驟如下所示:
圖 6 Job執行步驟
1.5. JobEntry執行
1.5.1. JobEntry類
具體每個組件的執行體對應org.pentaho.di.job.entries包內每個entry的具體實現。
execute()方法2中調用jobEntry的execute()完成jobEntry的具體功能。
1.5.2. 不同jobEntry的實現
final Result result = cloneJei.execute(prevResult, nr, rep, this);
不同的Job項目(JobEntry)實現差別很大。
JobEntrySpecial
功能是開啟一個job,只是簡單地對傳遞來的preResult設置它的的result屬性值為true,(Job項目據此判斷前一結果執行完畢)。返回該對象即可。
JobEntryTableExit
功能是判斷一個table是否存在數據庫中。JobEntryTableExit Job項目有屬性tablename和DatabaseMeta(對數據庫的元數據信息描述)根據DatabaseMeta得到一個Dabase對象db,建立連接db.connect(); 調用db.checkTableExists(tablename)根據此返回值設置preResult的result屬性為否為true。返回preResult對象。
JobEntryTrans
JobEntryJob和JobEntryTrans是嵌套job或trans的Job項目(JobEntry)。它們是比較復雜的job項目。
作用是執行一個trans。首先實例化一個TransMeta,之后實例化Trans。調用trans.start(),當執行完畢后調用函數trans.getResult(),並把結果加到preResult中,返回該對象即可。
補充說明
Result中也可以有處理數據,這些處理數據可以作為下一個Job項目(JobEntry)的輸入。但是容量受內存容量限制。
2. 數據庫插件
PDI使用數據庫插件來進行數據庫的正確連接、執行SQL,同時也考慮現有數據的各種特殊功能和不同限制。
在PDI里面,已經集成了非常多的數據庫插件,大部分的插件都會繼承自BaseDatabaseMeta。下面所示的方法通常都需要被重寫,基類里面並沒有相關的實現。要實現的方法主要分成3大主題:連接信息、SQL方言和功能標記。
1 連接詳情
當PDI建立數據庫連接時將會調用這些函數,或者數據庫設置對話框里顯示與方言有關的內容時也會調用。
- public String getDriverClass()
- public int getDefaultDatabasePort()
- public int[] getAccessTypeList()
- public boolean supportsOptionsInURL()
- public String getURL()
2 SQL Generation
構建有效的SQL數據庫方言時會調用這些方法。
- public String getFieldDefinition()
- public String getAddColumnStatement()
- public String getSQLColumnExists()
- public String getSQLQueryFields()
3 功能標記
查詢使用的數據庫是否支持該功能。
- public boolean supportsTransactions()
- public boolean releaseSavepoint()
- public boolean supportsPreparedStatementMetadataRetrieval()
- public boolean supportsResultSetMetadataRetrievalOnly()
PS:Kettle源碼分析算是全部講完了,最后奉送自己做的PPT http://pan.baidu.com/share/link?shareid=3803402535&uk=3792525916。Kettle應該算是比較小眾的軟件,但是在業界還是非常有名氣的。我看到過好幾個中國的公司,包括上市公司,說自己最新的ETL工具或者數據共享交換工具都是Kettle的改版。
