kettle、pentaho 實現不同數據庫之間表數據同步


1、pentaho 下載 pdi-ce-9.1.0.0-324.zip 並解壓至 D:\data-integration

https://sourceforge.net/projects/pentaho/files/

2、運行  D:\data-integration\Spoon.bat   打開配置界面

3、找到並編輯  C:\Users\{用戶名}\.kettle\kettle.properties 文件,增加標紅內容后重新啟動Spoon.bat

windows server 系統文件路徑:C:\Windows\system32\config\systemprofile\.kettle\kettle.properties

linux系統文件路徑:/root/.kettle/kettle.properties

# This file was generated by Pentaho Data Integration version 9.1.0.0-324.
# 
# Here are a few examples of variables to set: 
#
# PRODUCTION_SERVER = hercules
# TEST_SERVER = zeus
# DEVELOPMENT_SERVER = thor
#
# Note: lines like these with a # in front of it are comments

#解決kettle把空字符串當成null的情況
KETTLE_EMPTY_STRING_DIFFERS_FROM_NULL=Y

4、在Spoon界面新建如下“轉換”:

 

4.1 源數據:

 

4.2 目標數據

 

 

4.3 合並記錄(標志字段bz為新定義的動態變量,不能出現在關鍵字或數據字段中)

 

4.4 數據同步

 

 

合並完成后,標志字段的值有4種,分別是:

“Identical” : 關鍵字段在新舊數據源中都存在,且域值相同

“changed” : 關鍵字段在新舊數據源中都存在,但域值不同

“new” : 舊數據源中沒有找到該關鍵字段

“deleted”: 新數據源中沒有找到關鍵字段

則數據同步的配置需要注意以下幾點:

(1) 不論是查詢的關鍵字,還是更新字段,都要把標志字段去掉(注意,去掉標志字段!);其他字段根據業務需求,進行設置;

(2) 高級標簽中的規則要定義好,否則會報“It was not possible to find operation field [null] in the input stream!”錯誤。

5、JAVA 中調用.ktr轉換配置文件

5.1 從D:\data-integration\lib中拷貝必要的jar包到工程lib下

    包括kettle-dbdialog-9.1.0.0-324.jar、kettle-engine-9.1.0.0-324.jar、kettle-core-9.1.0.0-324.jar、commons-vfs2-2.3.jar、pentaho-encryption-support-9.1.0.0-324.jar、metastore-9.1.0.0-324.jar、guava-17.0.jar

5.2 在工程src下新建 kettle-password-encoder-plugins.xml 文件

    內容如下:

<password-encoder-plugins>
  <password-encoder-plugin id="kettle">
     <description>kettle Password Encoder</description>
     <classname>org.pentaho.di.core.encryption.KettleTwoWayPasswordEncoder</classname>
  </password-encoder-plugin>
</password-encoder-plugins>

否則會出現如下錯誤:

Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath

5.3 JAVA 調用示例代碼

package com.xrh.extend.quartz.jobs;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.logging.Logger;

import org.pentaho.di.core.Const;
import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;

import com.xrh.base.job.BN_Job;
import com.xrh.core.util.ObjectUtil;
import com.xrh.extend.quartz.QuartzJob;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

/**
 * Kettle Job示例
 * @author 李小家
 *
 */
@DisallowConcurrentExecution
public class KettleJob implements QuartzJob {

    private static Logger logger = Logger.getLogger(KettleJob.class.getName());

    public String run (JobExecutionContext context) throws Exception {
        StringBuffer runInfo = new StringBuffer();
        BN_Job job = (BN_Job) context.getJobDetail().getJobDataMap().get("job");
        logger.info(job.getOpName() + "[" + job.getId() + "] run======");
        
        String jobParam = job.getJobParam();
        if (ObjectUtil.isNull(jobParam)){
            logger.warning("調度附加參數(JSON) 不能為空!");
            runInfo.append("調度附加參數(JSON) 不能為空!");
            return runInfo.toString();
        }
        
        JSONObject paramJson = JSONObject.fromObject(jobParam);
        String ktrFilePath = paramJson.optString("ktrFilePath"); //轉換文件完整路徑
        JSONArray argumentsJSONArray = paramJson.optJSONArray("arguments");
        String[] arguments = null;
        if (ObjectUtil.isNull(ktrFilePath)) {
            logger.warning("調度附加參數(JSON) 必須包含轉換文件路徑'ktrFilePath'參數!");
            runInfo.append("調度附加參數(JSON) 必須包含轉換文件路徑'ktrFilePath'參數!");
            return runInfo.toString();
        }
        if (!new File(ktrFilePath).exists()) {
            logger.warning("系統找不到轉換文件["+ktrFilePath+"]!");
            runInfo.append("系統找不到轉換文件["+ktrFilePath+"]!");
            return runInfo.toString();
        }
        if (argumentsJSONArray != null) {
            Object[] objArr = argumentsJSONArray.toArray(new Object[] {});
            if (objArr.length > 0) {
                arguments = new String[objArr.length];
                for (int i = 0 ; i < objArr.length; i ++) {
                    arguments[i] = objArr[i].toString();
                }
            }
        }
        
        Trans trans = null;  
        try {  
            initKettleProperties();
            KettleEnvironment.init();// 初始化  
            //EnvUtil.environmentInit();  
            TransMeta transMeta = new TransMeta(ktrFilePath);  
            // 轉換  
            trans = new Trans(transMeta);  
            // 執行轉換  
            trans.execute(arguments);  
            // 等待轉換執行結束  
            trans.waitUntilFinished();  
            // 拋出異常  
            if (trans.getErrors() > 0) {  
                runInfo.append("There are errors during transformation exception!(傳輸過程中發生異常)");
                throw new Exception(  
                        "There are errors during transformation exception!(傳輸過程中發生異常)");  
            }
        } catch (Exception e) {  
            e.printStackTrace(); 
            runInfo.append(e.getMessage());
            return runInfo.toString();
        } 
        runInfo.append("執行完畢了, 未發現異常!");
        
        return runInfo.toString();
    }
    
    /**
     * 解決kettle無法寫入空字符串的問題
     * window環境中,需要在C:\Users\wangll\.kettle\kettle.properties中寫入如下配置;
     * linux環境中,需要在/root/.kettle/kettle.properties中寫入如下配置。
     * 故為了方便直接使用它自帶的方法去生成上述文件
     */
    public static void initKettleProperties() {
        String directory = Const.getKettleDirectory();
        String kpFile = directory + Const.FILE_SEPARATOR + "kettle.properties";
        logger.info("kpFile===" + kpFile);
        if (!new File(kpFile).exists()) {
            File dir = new File(directory);
            dir.mkdirs();
            KettleClientEnvironment.createKettleHome();
            
            File file = new File(kpFile);
            FileWriter fw = null;
            BufferedWriter bw = null;
            try {
                fw = new FileWriter(file);
                bw = new BufferedWriter(fw);
                bw.write("KETTLE_EMPTY_STRING_DIFFERS_FROM_NULL=Y");
                bw.flush();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (bw != null) {
                    try {
                        bw.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                    }
                }
                if (fw != null) {
                    try {
                        fw.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                    }
                }
            }
        } 
    }
}

6、常見問題

6.1 在使用轉換mysql的tinyint(1)字段類型時,會將tinyint(1)類型當成Boolean類型來處理

     解決方法:通過拼接字符串,如select columnName+ "" as columnName

6.2 執行轉換時出現以下錯誤:

2021/05/31 14:24:24 - 合並記錄.0 - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : Unexpected error
2021/05/31 14:24:24 - 合並記錄.0 - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : java.lang.NullPointerException
2021/05/31 14:24:24 - 合並記錄.0 - 完成處理 (I=0, O=0, R=0, W=0, U=0, E=1)
2021/05/31 14:24:24 - zl_products - 轉換被檢測
2021/05/31 14:24:24 - zl_products - 轉換正在殺死其他步驟!
2021/05/31 14:24:24 - 源數據.0 - Finished reading query, closing connection.
2021/05/31 14:24:24 - 源數據.0 - 完成處理 (I=2, O=0, R=0, W=0, U=0, E=0)
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 錯誤被檢測到!
2021/05/31 14:24:24 - Spoon - 轉換完成!!
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 錯誤被檢測到!
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 錯誤被檢測到!

解決辦法:確認連接處於生效狀態(灰色表示未生效)

 6.3  將.ktr轉換文件部署生產環境

     修改該文件connection數據源配置,其中<password>Encrypted 2be98afc86aa7f2e4cb79ff228dc6fa8c</password>紅色部分為數據庫密碼加密后的內容,可通過執行“JavaScript代碼”獲得加密后的值,如下圖所示:

加密腳本:

//Script here
var setValue;
setValue = Packages.org.pentaho.di.core.encryption.Encr.encryptPassword('123456');

解密腳本:

//解密
var setValue1;
setValue1 = org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted('Encrypted 2be98afc86aa7f2e4cb79ff228dc6fa8c');

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM