1、pentaho 下載 pdi-ce-9.1.0.0-324.zip 並解壓至 D:\data-integration
https://sourceforge.net/projects/pentaho/files/
2、運行 D:\data-integration\Spoon.bat 打開配置界面
3、找到並編輯 C:\Users\{用戶名}\.kettle\kettle.properties 文件,增加標紅內容后重新啟動Spoon.bat
windows server 系統文件路徑:C:\Windows\system32\config\systemprofile\.kettle\kettle.properties
linux系統文件路徑:/root/.kettle/kettle.properties
# This file was generated by Pentaho Data Integration version 9.1.0.0-324. # # Here are a few examples of variables to set: # # PRODUCTION_SERVER = hercules # TEST_SERVER = zeus # DEVELOPMENT_SERVER = thor # # Note: lines like these with a # in front of it are comments #解決kettle把空字符串當成null的情況 KETTLE_EMPTY_STRING_DIFFERS_FROM_NULL=Y
4、在Spoon界面新建如下“轉換”:
4.1 源數據:
4.2 目標數據
4.3 合並記錄(標志字段bz為新定義的動態變量,不能出現在關鍵字或數據字段中)
4.4 數據同步
合並完成后,標志字段的值有4種,分別是:
“Identical” : 關鍵字段在新舊數據源中都存在,且域值相同
“changed” : 關鍵字段在新舊數據源中都存在,但域值不同
“new” : 舊數據源中沒有找到該關鍵字段
“deleted”: 新數據源中沒有找到關鍵字段
則數據同步的配置需要注意以下幾點:
(1) 不論是查詢的關鍵字,還是更新字段,都要把標志字段去掉(注意,去掉標志字段!);其他字段根據業務需求,進行設置;
(2) 高級標簽中的規則要定義好,否則會報“It was not possible to find operation field [null] in the input stream!”錯誤。
5、JAVA 中調用.ktr轉換配置文件
5.1 從D:\data-integration\lib中拷貝必要的jar包到工程lib下
包括kettle-dbdialog-9.1.0.0-324.jar、kettle-engine-9.1.0.0-324.jar、kettle-core-9.1.0.0-324.jar、commons-vfs2-2.3.jar、pentaho-encryption-support-9.1.0.0-324.jar、metastore-9.1.0.0-324.jar、guava-17.0.jar
5.2 在工程src下新建 kettle-password-encoder-plugins.xml 文件
內容如下:
<password-encoder-plugins> <password-encoder-plugin id="kettle"> <description>kettle Password Encoder</description> <classname>org.pentaho.di.core.encryption.KettleTwoWayPasswordEncoder</classname> </password-encoder-plugin> </password-encoder-plugins>
否則會出現如下錯誤:
Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath
5.3 JAVA 調用示例代碼
package com.xrh.extend.quartz.jobs; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.logging.Logger; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleClientEnvironment; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import com.xrh.base.job.BN_Job; import com.xrh.core.util.ObjectUtil; import com.xrh.extend.quartz.QuartzJob; import net.sf.json.JSONArray; import net.sf.json.JSONObject; /** * Kettle Job示例 * @author 李小家 * */ @DisallowConcurrentExecution public class KettleJob implements QuartzJob { private static Logger logger = Logger.getLogger(KettleJob.class.getName()); public String run (JobExecutionContext context) throws Exception { StringBuffer runInfo = new StringBuffer(); BN_Job job = (BN_Job) context.getJobDetail().getJobDataMap().get("job"); logger.info(job.getOpName() + "[" + job.getId() + "] run======"); String jobParam = job.getJobParam(); if (ObjectUtil.isNull(jobParam)){ logger.warning("調度附加參數(JSON) 不能為空!"); runInfo.append("調度附加參數(JSON) 不能為空!"); return runInfo.toString(); } JSONObject paramJson = JSONObject.fromObject(jobParam); String ktrFilePath = paramJson.optString("ktrFilePath"); //轉換文件完整路徑 JSONArray argumentsJSONArray = paramJson.optJSONArray("arguments"); String[] arguments = null; if (ObjectUtil.isNull(ktrFilePath)) { logger.warning("調度附加參數(JSON) 必須包含轉換文件路徑'ktrFilePath'參數!"); runInfo.append("調度附加參數(JSON) 必須包含轉換文件路徑'ktrFilePath'參數!"); return runInfo.toString(); } if (!new File(ktrFilePath).exists()) { logger.warning("系統找不到轉換文件["+ktrFilePath+"]!"); runInfo.append("系統找不到轉換文件["+ktrFilePath+"]!"); return runInfo.toString(); } if (argumentsJSONArray != null) { Object[] objArr = argumentsJSONArray.toArray(new Object[] {}); if (objArr.length > 0) { arguments = new String[objArr.length]; for (int i = 0 ; i < objArr.length; i ++) { arguments[i] = objArr[i].toString(); } } } Trans trans = null; try { initKettleProperties(); KettleEnvironment.init();// 初始化 //EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(ktrFilePath); // 轉換 trans = new Trans(transMeta); // 執行轉換 trans.execute(arguments); // 等待轉換執行結束 trans.waitUntilFinished(); // 拋出異常 if (trans.getErrors() > 0) { runInfo.append("There are errors during transformation exception!(傳輸過程中發生異常)"); throw new Exception( "There are errors during transformation exception!(傳輸過程中發生異常)"); } } catch (Exception e) { e.printStackTrace(); runInfo.append(e.getMessage()); return runInfo.toString(); } runInfo.append("執行完畢了, 未發現異常!"); return runInfo.toString(); } /** * 解決kettle無法寫入空字符串的問題 * window環境中,需要在C:\Users\wangll\.kettle\kettle.properties中寫入如下配置; * linux環境中,需要在/root/.kettle/kettle.properties中寫入如下配置。 * 故為了方便直接使用它自帶的方法去生成上述文件 */ public static void initKettleProperties() { String directory = Const.getKettleDirectory(); String kpFile = directory + Const.FILE_SEPARATOR + "kettle.properties"; logger.info("kpFile===" + kpFile); if (!new File(kpFile).exists()) { File dir = new File(directory); dir.mkdirs(); KettleClientEnvironment.createKettleHome(); File file = new File(kpFile); FileWriter fw = null; BufferedWriter bw = null; try { fw = new FileWriter(file); bw = new BufferedWriter(fw); bw.write("KETTLE_EMPTY_STRING_DIFFERS_FROM_NULL=Y"); bw.flush(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (bw != null) { try { bw.close(); } catch (IOException e) { // TODO Auto-generated catch block } } if (fw != null) { try { fw.close(); } catch (IOException e) { // TODO Auto-generated catch block } } } } } }
6、常見問題
6.1 在使用轉換mysql的tinyint(1)字段類型時,會將tinyint(1)類型當成Boolean類型來處理
解決方法:通過拼接字符串,如select columnName+ "" as columnName
6.2 執行轉換時出現以下錯誤:
2021/05/31 14:24:24 - 合並記錄.0 - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : Unexpected error
2021/05/31 14:24:24 - 合並記錄.0 - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : java.lang.NullPointerException
2021/05/31 14:24:24 - 合並記錄.0 - 完成處理 (I=0, O=0, R=0, W=0, U=0, E=1)
2021/05/31 14:24:24 - zl_products - 轉換被檢測
2021/05/31 14:24:24 - zl_products - 轉換正在殺死其他步驟!
2021/05/31 14:24:24 - 源數據.0 - Finished reading query, closing connection.
2021/05/31 14:24:24 - 源數據.0 - 完成處理 (I=2, O=0, R=0, W=0, U=0, E=0)
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 錯誤被檢測到!
2021/05/31 14:24:24 - Spoon - 轉換完成!!
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 錯誤被檢測到!
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 錯誤被檢測到!
解決辦法:確認連接處於生效狀態(灰色表示未生效)
6.3 將.ktr轉換文件部署生產環境:
修改該文件connection數據源配置,其中<password>Encrypted 2be98afc86aa7f2e4cb79ff228dc6fa8c</password>紅色部分為數據庫密碼加密后的內容,可通過執行“JavaScript代碼”獲得加密后的值,如下圖所示:
加密腳本:
//Script here var setValue; setValue = Packages.org.pentaho.di.core.encryption.Encr.encryptPassword('123456');
解密腳本:
//解密 var setValue1; setValue1 = org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted('Encrypted 2be98afc86aa7f2e4cb79ff228dc6fa8c');