KETTLE實現數據的刪除和更新


一、實現目標

  源數據庫的數據更新或者刪除之后,目標數據庫的數據跟着更新或刪除,整體流程截圖如下:

  

一、准備工作

源數據庫ORACLE  目標數據庫MongoDB,在源數據庫添加刪除、更新觸發器

二、操作步驟

  1. 添加表輸入組件,連接ORACLE觸發器記錄表
  2. 添加JAVA代碼組件,進行步驟跳轉,根據輸入的數據判斷是刪除或者更新,如果是刪除,則跳轉至MongoDB Delete步驟中,如果是更新的話,跳轉至字段選擇步驟中。JAVA代碼中的詳細信息如下:
  3. import java.util.List;
    
    import org.pentaho.di.core.exception.KettleException;
    import org.pentaho.di.core.row.RowDataUtil;
    import org.pentaho.di.core.row.RowMeta;
    import org.pentaho.di.core.row.RowMetaInterface;
    import org.pentaho.di.core.row.ValueMeta;
    import org.pentaho.di.trans.Trans;
    import org.pentaho.di.trans.TransMeta;
    
    private Object[] previousRow;//上一行
    private RowSet t1 = null;//業務表步驟
    private RowSet t2 = null;//刪除步驟
    
    
    
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
    {
        Object[] r = getRow(); //獲取輸入行
    
        if ( first ) {
          if ( getInputRowMeta() == null ) {
            setOutputDone();//設置輸出完成
            return false;
          }
        }
    
        if ( r == null ) { // 如果當前行為null
          if ( previousRow != null ) {//如果上一行不為null
            //是最后一行
    
            boolean valid=true;
            previousRow = createOutputRow(previousRow, data.outputRowMeta.size());
            Trans trans=getTrans();//獲取轉換實例
            if (trans != null){
                String sync_val = get(Fields.In, "ID").getString(previousRow);//獲取ID
                trans.setVariable("LAST_SYNC_VAL", sync_val);//設置變量的值
            }
            String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//獲取操作類型是刪除還是更新
            String keyid= get(Fields.In, "DATAID").getString(previousRow);//獲取操作類型是刪除還是更新
            
            //Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
            //get(Fields.Out, "KEYID").setValue(rowData,keyid);
            //putRowTo(data.outputRowMeta, previousRow,t2);
    
            if(OpType.equals("UPDATE")){//驗證通過            
                putRowTo(data.outputRowMeta, previousRow,t1);
            }
            else
            {
                putRowTo(data.outputRowMeta, previousRow,t2);
            }
                
          }
          setOutputDone();//設置輸出完成
          return false;//返回false表示不用再繼續處理processRow
        }
    
        if ( !first ) {//不是第一次執行,因為第一次執行時previousRow一定是Null
            //不是最后一行
            boolean valid=true;    
            String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//獲取操作類型是刪除還是更新
            String keyid= get(Fields.In, "DATAID").getString(previousRow);//獲取操作類型是刪除還是更新
    
            //Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
            //get(Fields.Out, "KEYID").setValue(rowData,keyid);
            //putRowTo(data.outputRowMeta, previousRow,t2);
            if(OpType.equals("UPDATE")){
                putRowTo(data.outputRowMeta, previousRow,t1);
            }
            else
            {
                putRowTo(data.outputRowMeta, previousRow,t2);
            }
        }
        previousRow = r;//把當前行設為下一次執行的上一行
        if ( first ) {//如果是首次執行
              first = false;
            t1 = findTargetRowSet("dataupdate");//業務表步驟
            t2 = findTargetRowSet("datadelete");//數據刪除步驟
        }
    
        return true;//返回true表示還要繼續處理processRow
    }

    3.如果跳轉至了MongoDB Delete,則根據ID對目標庫進行刪除。Mongodb delete組件配置如下:

  JSON query中的{ID:"?{DATAID}"}表示刪除ID等於傳進來的參數DATAID的所有數據,Execute for each row要選擇上,表示執行每一行數據。          

  4.如果通過JAVA代碼2判斷為更新的話,則流程將跳轉至字段選擇組件,只獲取主鍵ID,此步驟非常重要,因為要根據ID去源表中獲取等更新的那條數據。

5.選擇表輸入組件,該步驟是根據上一步傳入的ID獲取待更新的那一條數據

PS:獲取SQL查詢語句:此處寫入SQL語句,里邊的?是變量替換,下邊要勾選上"替換SQL語句里的變量",從步驟插入數據要選擇上一步,勾選上執行每一行。

6.下邊的步驟:流查詢、JAVA代碼是對數據進行清洗,字典替換,此處不再解釋

7.最后一步:Mongodb output輸出需要詳細設置

output options選項卡勾選update  modifier update

Mongo文檔字段配置:ID為主鍵匹配字段,匹配字段更新為Y 修改器設置為N/A表示不對主鍵更新

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM