kettle轉換步驟工作組件
這里有四個類構成了這個kettle 步驟/節點,每一個類都有其特定的目的及所扮演的角色。
TemplateStep: 步驟類實現了StepInteface接口,在轉換運行時,它的實例將是數據實際處理的位置。每一個執行線程都表示一個此類的實例。
TemplateStepData: 數據類用來存儲數據,當插件執行時,對於每個執行的線程都是唯一的。執行時里面存儲的東西主要包括數據庫連接、文件句柄、緩存等等其他東西。
TemplateStepMeta: 元數據類實現了StepMetaInterface接口。它的職責是保存和序列化特定步驟實例的配置,在我們這個例子中,它負責保存用戶設置的步驟名稱和輸出字段的名稱。
TemplateStepDialog:對話框類實現了該步驟與用戶交互的界面,它顯示一對話框,通過對話框用戶可以自己的喜好設定步驟的操作。對話框類與元數據類關系非常緊密,元數據類可以追蹤用戶的設置。
除了上面的代碼,還有一個plugin.xml,它設置好了插件的元數據,定義了步驟在kettle圖形工作台中的顯示效果。為了更好的讓大家理解,我將利用這個步驟設計一個轉換流程並執行它。對於插件的開發,我們將從plugin.xml配置文件開始講起,然后講講元數據和對話框類,最后再講講步驟類和數據類。
書寫你自己的plugin.xml:
下面plugin.xml是我們這個插件里面的內容,它的功能是告訴kettle插件的元數據類,插件的名稱及描敘,還有需要加載的jar包。想要了解細節,可以查看文章:plug-in loading
<?xml version="1.0" encoding="UTF-8"?>
<plugin
id="TemplatePlugin"
iconfile="icon.png"
description="Template Plugin"
tooltip="Only there for demonstration purposes"
category="Demonstration"
classname="plugin.template.TemplateStepMeta">
<libraries>
<library name="templatestep.jar"/>
</libraries>
</plugin>
ID:在kettle插件中必須全局唯一,因為被kettle序列化了,所以不要隨便改變
Iconfile: kettle中插件顯示的圖片,必須是png圖片
Description:插件描敘,顯示在樹形菜單里面。
Tooltip:樹形菜單中,鼠標滑過的時候顯示的提示信息
Category:插件顯示的父目錄
Classname:元數據類
Library:指明了插件需要加載所依賴的jar包
插件主要包含類介紹
一、元數據類:
下面顯示了元數據的幾個關鍵的方法,注意元數據類里面用私有成員變量outputField 存儲了下一個步驟的輸出字段。
// keep track of the step settings
public String getOutputField()
public void setOutputField(…)
public void setDefault()
// serialize the step settings to and from xml
public String getXML()
public void loadXML(…)
// serialize the step settings to and from a kettle repository
public void readRep(…)
public void saveRep(…)
// provide information about how the step affects the field structure of processed rows
public void getFields(…)
// perform extended validation checks for the step
public void check(…)
// provide instances of the step, data and dialog classes to Kettle
public StepInterface getStep(…)
public StepDataInterface getStepData()
public StepDialogInterface getDialog(…)
TemplateStepMeta元數據類其實還有很多方面,不過大多被他的父類BaseStepMeta給默認實現了,這些默認的實現足以使我們的元數據類工作良好。想要了解更多,大家可以查查關於StepMetaInteface和BaseStepMeta的kettle官方文檔。
二、對話框類:
TemeplateStepDialog為步驟實現了對話框的設置,kettle的用戶界面部件是使用的eclipse的swt框架,如果要開發比較復雜的對話框,你還必須熟悉大部分swt代碼。 Swt文檔大家可以從eclipse上的幫助菜單點擊在線獲取。在開發過程中,一個對話框對象擁有一個元數據對象,它記錄了應該從哪里讀取配置?應該把設置好的配置保存在哪里?它僅僅設置了輸出字段的名稱在我們這個模板步驟里面。一個繼承自BaseStepDialog特定的對話框類必須提供open(…)方法,這個方法必須返回這個步驟的名稱(發生改變時)或NULL(對話框被取消時)
三、步驟類:
步驟類是實際的處理和轉換工作的地方。因為大部分樣本代碼已經由父類BaseStep提供了,大多數插件僅僅關注下面幾個特定的方法就行。
// initialization and teardown
public boolean init(…)
public void dispose(..)
// processing rows
public void run()
public boolean processRow(..)
Init()方法在轉換執行前被kettle調用,轉換必須在所有步驟初始化成功時才真正執行。我們這個模板步驟沒有做任何事情,這里僅僅是拿出來讓大家了解了解。
dispose()方法是在步驟執行完之后執行(非轉換執行完哈),它完成資源的關閉,像文件句柄、緩存等等。
run()方法在實際處理記錄集的時候調用。里面其實是個調用processRow()方法處理記錄的小循環,當此步驟再沒有數據處理或轉換被停止時退出循環。
processRow()方法在處理單條記錄的時候被調用。這個方法通常通過調用getRow()來獲取需要處理的單條記錄。 這個方法如果有需要將會被阻塞,例如當此步驟希望放慢腳步處理數據時。processRow()隨后的流程將執行轉換工作並調用putRow()方法將處理過的記錄放到它的下游步驟。
注意:你的步驟可能會變記錄的結構,為了安全起見,一定要多熟悉包org.pentaho.di.core.row,特別是類RowMetaInterface和RowDataUtil。
基類BaseStep對處理的記錄提供了第一次訪問的標識,在某些代碼只執行一次的時候可能非常有用,例如某個費時的查找,其實這就是緩存。
四、數據類:
大多數步驟都需要臨時的緩沖或者臨時的數據。數據類就是這些數據合適的存放位置。每一個執行線程將得到其擁有的數據類實例,所以它能在獨立的空間里面運行。TemplateStepData繼承自BaseStepData,作為一個經驗法則,不要將non-constant字段放置BaseStepData類里面,如果你必須,請將它最好放置TemplateStepData數據類里面.
我們的步驟僅僅使用了一個數據對象來存儲記錄集輸出的結構,沒有用到其他的存儲介質,例如文件等等。
開發插件實例
1. 在kettle-steps.xml下添加如下節點
<step id="MyTest"> <description>MyTest</description> <classname>mytest.MyTestMeta</classname> <category>插件測試</category> <tooltip>測試</tooltip> <iconfile>ui/images/TIP.png</iconfile> </step> |
2. 創建插件類
MyTest類代碼
package mytest;
import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.BaseStep; import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepMeta;
publicclass MyTest extends BaseStep implements StepInterface {
public MyTest(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans) { super(stepMeta, stepDataInterface, copyNr, transMeta, trans); // TODO Auto-generated constructor stub }
}
|
MyTestData類代碼
package mytest;
import org.pentaho.di.trans.step.BaseStepData; import org.pentaho.di.trans.step.StepDataInterface;
publicclass MyTestData extends BaseStepData implements StepDataInterface {
}
|
MyTestMeta類代碼
package mytest;
import java.util.List; import java.util.Map;
import org.pentaho.di.core.CheckResultInterface; import org.pentaho.di.core.Counter; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.repository.ObjectId; import org.pentaho.di.repository.Repository; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.BaseStepMeta; import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMetaInterface; import org.w3c.dom.Node;
publicclass MyTestMeta extends BaseStepMeta implements StepMetaInterface {
public MyTestMeta() { super(); } @Override publicvoid setDefault() { // TODO Auto-generated method stub
}
@Override publicvoid loadXML(Node stepnode, List<DatabaseMeta> databases, Map<String, Counter> counters) throws KettleXMLException { // TODO Auto-generated method stub
}
@Override publicvoid saveRep(Repository rep, ObjectId id_transformation, ObjectId id_step) throws KettleException { // TODO Auto-generated method stub
}
@Override publicvoid readRep(Repository rep, ObjectId id_step, List<DatabaseMeta> databases, Map<String, Counter> counters) throws KettleException { // TODO Auto-generated method stub
}
@Override publicvoid check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev, String[] input, String[] output, RowMetaInterface info) { // TODO Auto-generated method stub
}
@Override public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans) { // TODO Auto-generated method stub returnnull; }
@Override public StepDataInterface getStepData() { // TODO Auto-generated method stub returnnull; }
}
|
MyTestDialog類代碼
package mytest;
import org.eclipse.swt.SWT; import org.eclipse.swt.widgets.Display; import org.eclipse.swt.widgets.Shell; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.BaseStepMeta; import org.pentaho.di.trans.step.StepDialogInterface; import org.pentaho.di.ui.trans.step.BaseStepDialog;
publicclass MyTestDialog extends BaseStepDialog implements StepDialogInterface {
public MyTestDialog(Shell parent, Object in, TransMeta transMeta, String stepname) { super(parent, (BaseStepMeta)in, transMeta, stepname); // TODO Auto-generated constructor stub }
@Override public String open() { Shell parent = getParent(); Display display = parent.getDisplay();
shell = new Shell(parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN); props.setLook(shell); shell.open(); shell.setSize(200, 200); shell.setText("hello"); returnnull; }
}
|
除TestDialog類外的3個類的代碼都是在加入繼承基類和接口后更加eclipse插件提示自動生成的,也是我們自己需要實現的方法,TestDialog類中的MyTestDialog方法參數是固定的,根據生成的需要修改下,open函數是要我們自己實現的,運行效果如下
雙擊MyTest后彈出一個空白的窗口
調用use define java class 插件
以kettle中自帶的samples\transformations\User Defined Java Class - Calculate the date of Easter.ktr為例
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r=getRow();//從阻塞隊列中獲取一個數據對象(一行數據記錄) if (r==null)//如果沒有可獲取的數據,代表以處理完成 { setOutputDone();//設置處理完成標志 return false;//退出循環 }
if (first) {//第一次進入循環 //初始化動作 first=false;//設置為非第一次循環 } /* *處理函數 */ logBasic(r[0].toString());//打印 putRow(data.outputRowMeta, r);//輸出到阻塞隊列 return true; } |
use define java class 插件其實就是一個空插件,然后我們自己來實現processRow函數體,kettle通過while循環來調用processRow函數一行一行的處理數據,流程如下所示