package com.woaiyitiaocai.util; import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; /** * @功能描述: 執行kettle * @項目版本: 1.0.0 * @相對路徑: com.woaiyitiaocai.util.ExecKettleUtil.java * @創建作者: woaiyitiaocai * @問題反饋: zhup@woaiyitiaocai.com * @創建日期: 2017年2月27日 下午3:36:10 */ public class ExecKettleUtil { private static Logger logger_info = Logger.getLogger("api-info"); private static Logger logger_error = Logger.getLogger("api-error"); /** * @功能描述: 執行job * @使用對象: woaiyitiaocai * @創建作者: woaiyitiaocai * @創建日期: 2017年2月27日 下午3:51:33 * @param initKettleParam job參數 * @param kjbFilePath job路徑 * @return */ public static boolean runKettleJob(Map<String,String> initKettleParam, String kjbFilePath) { String uuid = UUID.randomUUID().toString(); logger_info.info("ExecKettleUtil@runKettleJob:"+uuid+" {kjbFilePath:"+kjbFilePath+"}"); try { KettleEnvironment.init(); //初始化job路徑 JobMeta jobMeta = new JobMeta(kjbFilePath, null); Job job = new Job(null, jobMeta); //初始化job參數,腳本中獲取參數值:${variableName} for (String variableName : initKettleParam.keySet()) { job.setVariable(variableName, initKettleParam.get(variableName)); } job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { logger_info.info("ExecKettleUtil@runKettleJob:"+uuid+" 執行失敗"); }else{ logger_info.info("ExecKettleUtil@runKettleJob:"+uuid+" 執行成功"); } return true; } catch (Exception e) { logger_error.error("ExecKettleUtil@runKettleJob:"+uuid, e); return false; } } /** * @功能描述: 執行Transfer * @使用對象: woaiyitiaocai * @創建作者: woaiyitiaocai * @創建日期: 2017年2月27日 下午3:51:33 * @param initKettleParam Transfer參數 * @param ktrFilePath Transfer路徑 * @return */ public static boolean runKettleTransfer(Map<String,String> initKettleParam, String ktrFilePath) { Trans trans = null; String uuid = UUID.randomUUID().toString(); logger_info.info("ExecKettleUtil@runKettleTransfer:"+uuid+" {ktrFilePath:"+ktrFilePath+"}"); try { //初始化 KettleEnvironment.init(); EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(ktrFilePath); //轉換 trans = new Trans(transMeta); //初始化trans參數,腳本中獲取參數值:${variableName} for (String variableName : initKettleParam.keySet()) { trans.setVariable(variableName, initKettleParam.get(variableName)); } //執行轉換 trans.execute(null); //等待轉換執行結束 trans.waitUntilFinished(); if (trans.getErrors() > 0) { logger_info.info("ExecKettleUtil@runKettleTransfer:"+uuid+" 執行失敗"); }else{ logger_info.info("ExecKettleUtil@runKettleTransfer:"+uuid+" 執行成功"); } return true; } catch (Exception e) { logger_error.error("ExecKettleUtil@runKettleTransfer:"+uuid, e); return false; } } }
今天要做的項目中用到了在系統中執行kettle腳本,於是自己寫了一個,java調用kettle的job和transfer工具類。相關的jar包從etl的開發工具中復制出來就ok了,也可以去官網上面下載。此示例是由程序中出發執行kettle程序,如果不需要由程序觸發最簡單的就是直接寫到linux的定時器去執行了。那就不需要這個類了。
kettle在這里是做數據的抽取,清洗,用kettle開發更有效率,程序也更健壯。這里就不給出kettle的腳本了。java工具類代碼如上面所示。
網上也有很多各種各樣現成的示例,我的僅供參考,歡迎吐槽。
以下是相關maven依賴
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-vfs2</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>org.scannotation</groupId> <artifactId>scannotation</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>dom4j</groupId> <artifactId>dom4j</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>***</groupId> <artifactId>kettle-vfs</artifactId> <version>5.2.0.0</version> <classifier>pentaho</classifier> </dependency> <dependency> <groupId>***</groupId> <artifactId>kettle-engine</artifactId> <version>5.2.0.0</version> </dependency> <dependency> <groupId>***</groupId> <artifactId>kettle-core</artifactId> <version>5.2.0.0</version> </dependency>