由於kettle服務器時不時斷電重啟,部署的那些任務需要手動一個一個點比較麻煩,所以希望能通過代碼一把部署多個任務或轉換到服務器上。導入了相關jar包,但有些和項目原有的沖突,然后一點點試過來導入了一部分。
在本地運行任務和轉換的能找到很多參考代碼,然后部署到遠程服務器上的相較少一些,找了許久,單獨部署轉換到服務器上的時候測試成功,但是任務中包含轉換的就不行,會一直報找不到轉換,無論我把轉換的路徑寫成絕對路徑還是${Internal.Job.Filename.Directory}這種都不行,我想着在部署job之前把trans先部署上去但是這樣也不行... 好多相關文檔都是下載要幣的
然后我就下載了源碼,首先在kettle中遇到這種問題,是因為運行環境的send resources to this server沒有打鈎,我就照着這種思路去找源碼中的這個參數。
我覺得這個參數肯定是個boolean,首先搜索send resource等找到了疑似,然后想是不是有什么RunConfiguration的類需要在部署到服務器上之前進行設置,但找了半天沒找到
后來又去搜DefaultRunConfiguration這個類,最終在DefaultRunConfigurationExecutor.java中找到了一行設置這個類的sendresource變量的。。。而且這個JobExecutionConfiguration剛好是部署到服務器上市用到的配置類,這才有了下面代碼的180行。成功了~啊~解決了真的好開心哦~
1 package org.ssh.hip.job.util; 2 3 import org.pentaho.di.cluster.SlaveServer; 4 import org.pentaho.di.core.KettleEnvironment; 5 import org.pentaho.di.core.database.DatabaseMeta; 6 import org.pentaho.di.core.exception.KettleException; 7 import org.pentaho.di.job.Job; 8 import org.pentaho.di.job.JobExecutionConfiguration; 9 import org.pentaho.di.job.JobMeta; 10 import org.pentaho.di.repository.ObjectId; 11 import org.pentaho.di.repository.RepositoryDirectoryInterface; 12 import org.pentaho.di.repository.kdr.KettleDatabaseRepository; 13 import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta; 14 import org.pentaho.di.trans.Trans; 15 import org.pentaho.di.trans.TransExecutionConfiguration; 16 import org.pentaho.di.trans.TransMeta; 17 import org.slf4j.Logger; 18 import org.slf4j.LoggerFactory; 19 20 public class KettleEnvironments2 { 21 22 static final Logger logger = LoggerFactory.getLogger(KettleEnvironments2.class); 23 24 25 public static KettleDatabaseRepository repository; 26 public static DatabaseMeta databaseMeta; 27 public static RepositoryDirectoryInterface directory; 28 public static SlaveServer remoteServer; 29 30 /** 31 * 配置資源庫環境 並接連接的資源庫 32 * @return 33 * 34 * @return 35 * @throws KettleException 36 */ 37 public static void repositoryCon(String repositoryName,String repositoryType,String repositoryConnectType,String repositoryIp,String repositoryDBName,String repositoryDBPort,String 38 repositoryDBUsername,String repositoryDBPassword, String repositoryUsername, String repositoryPassword) throws KettleException { 39 // 初始化 40 KettleEnvironment.init(); // 感覺這句話很重要 41 // 數據庫連接元對象 42 // (kettle數據庫連接名稱(KETTLE工具右上角顯示),資源庫類型,連接方式,IP,數據庫名,端口,用戶名,密碼) //cgmRepositoryConn 43 // databaseMeta = new DatabaseMeta("ETL", "Oracle", "Native", "192.168.1.28","orcl", "1521", 44 // "etl", "etl");//資源庫數據庫地址,我這里采用oracle數據庫 45 databaseMeta = new DatabaseMeta(repositoryName, repositoryType, repositoryConnectType, repositoryIp, repositoryDBName, repositoryDBPort, 46 repositoryDBUsername, repositoryDBPassword);//資源庫數據庫地址,我這里采用oracle數據庫 47 // 數據庫形式的資源庫元對象 48 KettleDatabaseRepositoryMeta kettleDatabaseMeta = new KettleDatabaseRepositoryMeta();
kettleDatabaseMeta.setName(repositoryName); // 資源庫名字,這里不寫會導致轉換里java代碼通過名字獲取資源庫進而獲取數據庫連接獲取不到 49 kettleDatabaseMeta.setConnection(databaseMeta); 50 // 數據庫形式的資源庫對象 51 repository = new KettleDatabaseRepository(); 52 // 用資源庫元對象初始化資源庫對象 53 repository.init(kettleDatabaseMeta); 54 // 連接到資源庫 55 repository.connect(repositoryUsername, repositoryPassword);// 默認的連接資源庫的用戶名和密碼 56 directory = repository.loadRepositoryDirectoryTree(); 57 if (repository.isConnected()) { 58 System.out.println("連接成功"); 59 } else { 60 System.out.println("連接失敗"); 61 } 62 } 63 64 /** 65 * 加載子服務器 66 */ 67 public static void initRemoteServer(String serverHostname, String serverPort, 68 String serverName, String serverUsername, String serverPassword, boolean serverMaster) { 69 remoteServer = new SlaveServer(); 70 remoteServer.setHostname(serverHostname); 71 remoteServer.setPort(serverPort); 72 remoteServer.setName(serverName); 73 remoteServer.setUsername(serverUsername); 74 remoteServer.setPassword(serverPassword); 75 remoteServer.setMaster(serverMaster); // 是否是主服務器 76 } 77 78 /** 79 * 調用Transformation示例 80 * 81 * @param rep 82 * @param transName 83 */ 84 public static void runTrans(String path, String transName) { 85 86 try { 87 // 根據指定的字符串路徑 找到目錄,在 r_transformation中id_directory=0的代表根目錄 88 RepositoryDirectoryInterface dir = repository.findDirectory(path); 89 // 根據轉換名稱獲取轉換id 90 // 這個值就是從SELECT ID_TRANSFORMATION FROM `r_transformation` where name='fromto';來的 91 ObjectId id = repository.getTransformationID(transName, dir); 92 System.out.println("------ObjectId=" + id); 93 // 根據轉換id查詢與其關聯的step和hop 94 // select name from r_step where ID_STEP=(SELECT ID_STEP_FROM FROM `r_trans_hop` where 95 // ID_TRANSFORMATION='1'); 96 // select name from r_step where ID_STEP=(SELECT ID_STEP_TO FROM `r_trans_hop` where ID_TRANSFORMATION='1'); 97 TransMeta transMeta = repository.loadTransformation(id, null); 98 // 99 Trans trans = new Trans(transMeta); 100 trans.execute(null);// 執行轉換 101 trans.waitUntilFinished(); // 等待轉換執行結束 102 if (trans.getErrors() != 0) { 103 System.out.println("trans executed failed"); 104 } else { 105 System.out.println("trans executed OK"); 106 } 107 } catch (Exception e) { 108 e.printStackTrace(); 109 } 110 } 111 112 113 /** 114 * 本地運行job 115 * @param path 116 * @param jobName 117 * @throws KettleException 118 */ 119 public static void runJob(String path, String jobName) throws KettleException { 120 // jobname 是Job腳本的路徑及名稱 121 // new JobMeta(jobName, null); // new出來的路徑變成了D盤的 122 // new JobMeta(jobName, repository); // new出來的路徑變成了D:\workspace\ekhip_git\sshapp-ekhips\sshapp-ekhips-auth\sshapp-hip-app\sshapp-hip-job\ 123 RepositoryDirectoryInterface dir = repository.findDirectory(path); 124 125 // 方法1 126 JobMeta jobMeta = repository.loadJob(jobName, dir, null, null); 127 Job job = new Job(repository, jobMeta); 128 129 // 方法2 130 // ObjectId id = repository.getJobId(jobName, dir); 131 // JobMeta jobMeta = repository.loadJob(id, null); 132 // Job job = new Job(repository, jobMeta); 133 134 // 向Job 腳本傳遞參數,腳本中獲取參數值:${參數名} 135 // job.setVariable(paraname, paravalue); 136 137 // try { 138 // SlaveServerConfig config = new SlaveServerConfig("192.168.1.208", 9000, true);//ip與端口與通過Carte.bat啟動時指定的參數一樣 139 // Carte.runCarte(config); 140 // } catch (Exception e) { // 通過該靜態方法啟動carte 141 // e.printStackTrace(); 142 // } 143 144 job.start(); 145 // job.waitUntilFinished(); // 這個是等待job執行完,但是有的任務是定時的重復的 146 if (job.getErrors() > 0) { 147 System.out.println("job executed failed"); 148 } else { 149 System.out.println("job executed OK"); 150 } 151 } 152 153 /** 154 * 遠程服務器上執行job 155 * @param dir 156 * @param jobName 157 * @throws KettleException 158 */ 159 public static void executeJobRemote(String path, String jobName) throws KettleException { 160 161 RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree(); // Default 162 163 RepositoryDirectoryInterface dir = repository.findDirectory(path); 164 165 // JobMeta jobMeta = repository.loadJob(jobName, jobdir, null, null); // reads 166 167 // 方法2 168 ObjectId id = repository.getJobId(jobName, dir); 169 JobMeta jobMeta = repository.loadJob(id, null); 170 171 System.out.println("------ObjectId=" + id); 172 173 // last 174 // version 175 JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); 176 // RunConfiguration run = new RunConfiguration(); 177 jobExecutionConfiguration.setRemoteServer(remoteServer); 178 jobExecutionConfiguration.setRepository(repository); // 這里不設置不影響部署,但是轉換中通過java代碼獲取資源庫會獲取不到,所以要寫 179 // jobExecutionConfiguration.setRunConfiguration("28"); // 沒有差別 180 jobExecutionConfiguration.setPassingExport(true); // send resources to this server 181 String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null); 182 System.out.println(lastCarteObjectId); 183 184 // SlaveServerJobStatus jobStatus = null; 185 // do { 186 // Thread.sleep(5000); 187 // jobStatus = remoteServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0); 188 // } while (jobStatus != null && jobStatus.isRunning()); 189 // Result oneResult = new Result(); 190 // System.out.println(jobStatus); 191 // if (jobStatus.getResult() != null) { 192 // // 流程完成,得到結果 193 // oneResult = jobStatus.getResult(); 194 // System.out.println("Result:" + oneResult); 195 // } else { 196 // System.out.println("取到空了"); 197 // } 198 199 // SlaveServerJobStatus jobStatus = null; 200 // Result oneResult = new Result(); 201 // while (true) { 202 // try { 203 // jobStatus = remoteServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0); 204 // if (jobStatus.getResult() != null) { 205 // // The job is finished, get the result... 206 // oneResult = jobStatus.getResult(); 207 // break; 208 // } 209 // } catch (Exception e1) { 210 // oneResult.setNrErrors(1L); 211 // break; // Stop looking too, chances are too low the server 212 // // will 213 // // come back on-line 214 // } 215 // } 216 } 217 218 /** 219 * 遠程服務器上執行trans 220 * @param dir 221 * @param jobName 222 * @throws KettleException 223 */ 224 public static void executeTransRemote(String path, String transName) throws KettleException { 225 226 RepositoryDirectoryInterface dir = repository.findDirectory(path); 227 // 根據轉換名稱獲取轉換id 228 // 這個值就是從SELECT ID_TRANSFORMATION FROM `r_transformation` where name='fromto';來的 229 ObjectId id = repository.getTransformationID(transName, dir); 230 System.out.println("------ObjectId=" + id); 231 // 根據轉換id查詢與其關聯的step和hop 232 // select name from r_step where ID_STEP=(SELECT ID_STEP_FROM FROM `r_trans_hop` where 233 // ID_TRANSFORMATION='1'); 234 // select name from r_step where ID_STEP=(SELECT ID_STEP_TO FROM `r_trans_hop` where ID_TRANSFORMATION='1'); 235 TransMeta transMeta = repository.loadTransformation(id, null); 236 237 // last 238 // version 239 TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration(); 240 241 transExecutionConfiguration.setRemoteServer(remoteServer); 242 243 transExecutionConfiguration.setRepository(repository); 244 245 String lastCarteObjectId = Trans.sendToSlaveServer(transMeta, transExecutionConfiguration, repository, null); 246 System.out.println(lastCarteObjectId); 247 248 } 249 250 }