kettle插入更新流程


kettle轉換步驟工作組件

       這里有四個類構成了這個kettle 步驟/節點,每一個類都有其特定的目的及所扮演的角色。

TemplateStep:  步驟類實現了StepInteface接口,在轉換運行時,它的實例將是數據實際處理的位置。每一個執行線程都表示一個此類的實例。

 

TemplateStepData:  數據類用來存儲數據,當插件執行時,對於每個執行的線程都是唯一的。執行時里面存儲的東西主要包括數據庫連接、文件句柄、緩存等等其他東西。

 

TemplateStepMeta: 元數據類實現了StepMetaInterface接口。它的職責是保存和序列化特定步驟實例的配置,在我們這個例子中,它負責保存用戶設置的步驟名稱和輸出字段的名稱。

 

TemplateStepDialog:對話框類實現了該步驟與用戶交互的界面,它顯示一對話框,通過對話框用戶可以自己的喜好設定步驟的操作。對話框類與元數據類關系非常緊密,元數據類可以追蹤用戶的設置。

 

除了上面的代碼,還有一個plugin.xml,它設置好了插件的元數據,定義了步驟在kettle圖形工作台中的顯示效果。為了更好的讓大家理解,我將利用這個步驟設計一個轉換流程並執行它。對於插件的開發,我們將從plugin.xml配置文件開始講起,然后講講元數據和對話框類,最后再講講步驟類和數據類。

 

書寫你自己的plugin.xml:

       下面plugin.xml是我們這個插件里面的內容,它的功能是告訴kettle插件的元數據類,插件的名稱及描敘,還有需要加載的jar包。想要了解細節,可以查看文章:plug-in loading

<?xml version="1.0" encoding="UTF-8"?>

<plugin

   id="TemplatePlugin"

   iconfile="icon.png"

   description="Template Plugin"

   tooltip="Only there for demonstration purposes"

   category="Demonstration"

   classname="plugin.template.TemplateStepMeta">

   <libraries>

      <library name="templatestep.jar"/>

   </libraries>

</plugin>

 

ID:在kettle插件中必須全局唯一,因為被kettle序列化了,所以不要隨便改變

Iconfile: kettle中插件顯示的圖片,必須是png圖片

Description:插件描敘,顯示在樹形菜單里面。

Tooltip:樹形菜單中,鼠標滑過的時候顯示的提示信息

Category:插件顯示的父目錄

Classname:元數據類

Library:指明了插件需要加載所依賴的jar包

 

 

插件主要包含類介紹

一、元數據類:

         下面顯示了元數據的幾個關鍵的方法,注意元數據類里面用私有成員變量outputField 存儲了下一個步驟的輸出字段。

 // keep track of the step settings
public String getOutputField()
public void setOutputField(…)
public void setDefault()

// serialize the step settings to and from xml
public String getXML()
public void loadXML(…)

// serialize the step settings to and from a kettle repository
public void readRep(…)
public void saveRep(…)

// provide information about how the step affects the field structure of processed rows
public void getFields(…)

// perform extended validation checks for the step
public void check(…)

// provide instances of the step, data and dialog classes to Kettle
public StepInterface getStep(…)
public StepDataInterface getStepData()
public StepDialogInterface getDialog(…) 

TemplateStepMeta元數據類其實還有很多方面,不過大多被他的父類BaseStepMeta給默認實現了,這些默認的實現足以使我們的元數據類工作良好。想要了解更多,大家可以查查關於StepMetaInteface和BaseStepMeta的kettle官方文檔。

 

二、對話框類:

         TemeplateStepDialog為步驟實現了對話框的設置,kettle的用戶界面部件是使用的eclipse的swt框架,如果要開發比較復雜的對話框,你還必須熟悉大部分swt代碼。 Swt文檔大家可以從eclipse上的幫助菜單點擊在線獲取。在開發過程中,一個對話框對象擁有一個元數據對象,它記錄了應該從哪里讀取配置?應該把設置好的配置保存在哪里?它僅僅設置了輸出字段的名稱在我們這個模板步驟里面。一個繼承自BaseStepDialog特定的對話框類必須提供open(…)方法,這個方法必須返回這個步驟的名稱(發生改變時)或NULL(對話框被取消時)

 

 

三、步驟類:

         步驟類是實際的處理和轉換工作的地方。因為大部分樣本代碼已經由父類BaseStep提供了,大多數插件僅僅關注下面幾個特定的方法就行。

 // initialization and teardown
public boolean init(…)
public void dispose(..)

// processing rows
public void run()
public boolean processRow(..)

Init()方法在轉換執行前被kettle調用,轉換必須在所有步驟初始化成功時才真正執行。我們這個模板步驟沒有做任何事情,這里僅僅是拿出來讓大家了解了解。

dispose()方法是在步驟執行完之后執行(非轉換執行完哈),它完成資源的關閉,像文件句柄、緩存等等。

run()方法在實際處理記錄集的時候調用。里面其實是個調用processRow()方法處理記錄的小循環,當此步驟再沒有數據處理或轉換被停止時退出循環。

processRow()方法在處理單條記錄的時候被調用。這個方法通常通過調用getRow()來獲取需要處理的單條記錄。 這個方法如果有需要將會被阻塞,例如當此步驟希望放慢腳步處理數據時。processRow()隨后的流程將執行轉換工作並調用putRow()方法將處理過的記錄放到它的下游步驟。

注意:你的步驟可能會變記錄的結構,為了安全起見,一定要多熟悉包org.pentaho.di.core.row,特別是類RowMetaInterface和RowDataUtil。

  基類BaseStep對處理的記錄提供了第一次訪問的標識,在某些代碼只執行一次的時候可能非常有用,例如某個費時的查找,其實這就是緩存。

 

四、數據類:

         大多數步驟都需要臨時的緩沖或者臨時的數據。數據類就是這些數據合適的存放位置。每一個執行線程將得到其擁有的數據類實例,所以它能在獨立的空間里面運行。TemplateStepData繼承自BaseStepData,作為一個經驗法則,不要將non-constant字段放置BaseStepData類里面,如果你必須,請將它最好放置TemplateStepData數據類里面.

 

我們的步驟僅僅使用了一個數據對象來存儲記錄集輸出的結構,沒有用到其他的存儲介質,例如文件等等。

開發插件實例

1.       在kettle-steps.xml下添加如下節點

 

 

<step id="MyTest">

       <description>MyTest</description>

       <classname>mytest.MyTestMeta</classname>

       <category>插件測試</category>

       <tooltip>測試</tooltip>

       <iconfile>ui/images/TIP.png</iconfile>

    </step>

 

 

 

 

 

 

 

 

 

 

2.       創建插件類

 

MyTest類代碼

 

package mytest;

 

import org.pentaho.di.trans.Trans;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStep;

import org.pentaho.di.trans.step.StepDataInterface;

import org.pentaho.di.trans.step.StepInterface;

import org.pentaho.di.trans.step.StepMeta;

 

 

publicclass MyTest extends BaseStep implements StepInterface {

 

public MyTest(StepMeta stepMeta, StepDataInterface stepDataInterface,

        int copyNr, TransMeta transMeta, Trans trans) {

    super(stepMeta, stepDataInterface, copyNr, transMeta, trans);

    // TODO Auto-generated constructor stub

}

 

}

 

 

MyTestData類代碼

 

 

package mytest;

 

import org.pentaho.di.trans.step.BaseStepData;

import org.pentaho.di.trans.step.StepDataInterface;

 

publicclass MyTestData extends BaseStepData implements StepDataInterface {

 

}

 

MyTestMeta類代碼

 

package mytest;

 

import java.util.List;

import java.util.Map;

 

import org.pentaho.di.core.CheckResultInterface;

import org.pentaho.di.core.Counter;

import org.pentaho.di.core.database.DatabaseMeta;

import org.pentaho.di.core.exception.KettleException;

import org.pentaho.di.core.exception.KettleXMLException;

import org.pentaho.di.core.row.RowMetaInterface;

import org.pentaho.di.repository.ObjectId;

import org.pentaho.di.repository.Repository;

import org.pentaho.di.trans.Trans;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStepMeta;

import org.pentaho.di.trans.step.StepDataInterface;

import org.pentaho.di.trans.step.StepInterface;

import org.pentaho.di.trans.step.StepMeta;

import org.pentaho.di.trans.step.StepMetaInterface;

import org.w3c.dom.Node;

 

publicclass MyTestMeta extends BaseStepMeta implements StepMetaInterface {

 

    public MyTestMeta()

    {

       super();

    }

    @Override

    publicvoid setDefault() {

       // TODO Auto-generated method stub

 

    }

 

    @Override

    publicvoid loadXML(Node stepnode, List<DatabaseMeta> databases,

           Map<String, Counter> counters) throws KettleXMLException {

       // TODO Auto-generated method stub

 

    }

 

    @Override

    publicvoid saveRep(Repository rep, ObjectId id_transformation,

           ObjectId id_step) throws KettleException {

       // TODO Auto-generated method stub

 

    }

 

    @Override

    publicvoid readRep(Repository rep, ObjectId id_step,

           List<DatabaseMeta> databases, Map<String, Counter> counters)

           throws KettleException {

       // TODO Auto-generated method stub

 

    }

 

    @Override

    publicvoid check(List<CheckResultInterface> remarks, TransMeta transMeta,

           StepMeta stepMeta, RowMetaInterface prev, String[] input,

           String[] output, RowMetaInterface info) {

       // TODO Auto-generated method stub

 

    }

 

    @Override

    public StepInterface getStep(StepMeta stepMeta,

           StepDataInterface stepDataInterface, int copyNr,

           TransMeta transMeta, Trans trans) {

       // TODO Auto-generated method stub

       returnnull;

    }

 

    @Override

    public StepDataInterface getStepData() {

       // TODO Auto-generated method stub

       returnnull;

    }

 

}

 

 

 

MyTestDialog類代碼

 

package mytest;

 

import org.eclipse.swt.SWT;

import org.eclipse.swt.widgets.Display;

import org.eclipse.swt.widgets.Shell;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStepMeta;

import org.pentaho.di.trans.step.StepDialogInterface;

import org.pentaho.di.ui.trans.step.BaseStepDialog;

 

publicclass MyTestDialog extends BaseStepDialog implements StepDialogInterface {

   

    public MyTestDialog(Shell parent, Object in,

           TransMeta transMeta, String stepname) {

       super(parent, (BaseStepMeta)in, transMeta, stepname);

       // TODO Auto-generated constructor stub

    }

 

    @Override

    public String open() {

       Shell parent = getParent();

       Display display = parent.getDisplay();

 

       shell = new Shell(parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN);

       props.setLook(shell);

       shell.open();

       shell.setSize(200, 200);

       shell.setText("hello");

       returnnull;

    }

 

}

 

 

除TestDialog類外的3個類的代碼都是在加入繼承基類和接口后更加eclipse插件提示自動生成的,也是我們自己需要實現的方法,TestDialog類中的MyTestDialog方法參數是固定的,根據生成的需要修改下,open函數是要我們自己實現的,運行效果如下

 

雙擊MyTest后彈出一個空白的窗口

 

調用use define java class 插件

以kettle中自帶的samples\transformations\User Defined Java Class - Calculate the date of Easter.ktr為例

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException

{

  Object[] r=getRow();//從阻塞隊列中獲取一個數據對象(一行數據記錄)

  if (r==null)//如果沒有可獲取的數據,代表以處理完成

  {

    setOutputDone();//設置處理完成標志

    return false;//退出循環

  }

 

  if (first) {//第一次進入循環

    //初始化動作

     first=false;//設置為非第一次循環

  }

 /*

*處理函數

*/

  logBasic(r[0].toString());//打印

  putRow(data.outputRowMeta, r);//輸出到阻塞隊列

  return true;

}

use define java class 插件其實就是一個空插件,然后我們自己來實現processRow函數體,kettle通過while循環來調用processRow函數一行一行的處理數據,流程如下所示



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM