第一步:(前提將kett中lib下的所有jar包拷貝到java項目lib目錄)創建並連接資源庫,如果只用這一個工作空間,可以將此段代碼放入靜態代碼塊,代碼如下:
KettleEnvironment.init();
//創建資源庫對象,此時的對象還是一個空對象
KettleDatabaseRepository repository = new KettleDatabaseRepository();
//創建資源庫數據庫對象,類似我們在spoon里面創建資源庫
//(數據庫連接名稱,數據庫類型,連接方式,IP,數據庫名,端口,用戶名,密碼)
DatabaseMeta dataMeta = new DatabaseMeta("數據庫連接名","數據庫類型(比如MSSQL)","連接方式(例如Native(JDBC))","IP","數據庫名","端口號",
"用戶名","密碼");
//資源庫元對象,選擇資源庫(ID,名稱,描述)
KettleDatabaseRepositoryMeta kettleDatabaseMeta =new KettleDatabaseRepositoryMeta("資源庫ID", "資源庫名稱", "這里是描述信息",dataMeta);
//給資源庫賦值
repository.init(kettleDatabaseMeta);
//連接資源庫
repository.connect("admin","admin");
第二步:啟動並運行job:
directoryInterface = repository.loadRepositoryDirectoryTree();
JobMeta jobMeta = repository.loadJob(jobName,directoryInterface,null,null);//從資源庫中加載一個job,jobName:job名
Job job = new Job(repository,jobMeta);
job.start();//啟動job,默認是線程執行。
job.waitUntilFinished();//等待job執行結束
第三步:獲取job狀態或者停止job:
job.stopAll();//停止job
String status=job.getStatus();//獲取運行狀態。
至此,java調用job的基本功能已經實現,如果現在有需求是生成web service接口,提供四個方法供外界訪問,分別是傳入串job名並分別執行這些job(比如“jobA,jobB,JobC”)、傳入一個job名獲取這個job 的當前狀態、傳入一個job名停止運行這個job,分享代碼如下:
package com.test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import com.util.DBUtil;
public class TestOaJobX {
private static Connection con = null;
private static PreparedStatement pst = null;
private static ResultSet rs = null;
private static Repository repository;
private static List<Job> jobs;
private static String[] jobNameArray;
public TestOaJobX() {
// TODO Auto-generated constructor stub
}
static{
try {
KettleEnvironment.init();
//創建資源庫對象,此時的對象還是一個空對象
KettleDatabaseRepository repository = new KettleDatabaseRepository();
//創建資源庫數據庫對象,類似我們在spoon里面創建資源庫
//(數據庫連接名稱,數據庫類型,連接方式,IP,數據庫名,端口,用戶名,密碼)
DatabaseMeta dataMeta =new
DatabaseMeta("數據庫連接名","數據庫類型(比如MSSQL)","連接方式(例如Native(JDBC))","IP","數據庫名","端口號",
"用戶名","密碼");
//資源庫元對象,選擇資源庫(ID,名稱,描述)
KettleDatabaseRepositoryMeta kettleDatabaseMeta =new KettleDatabaseRepositoryMeta("資源庫ID", "資源庫名稱", "這里是描述信息",dataMeta);
//給資源庫賦值
repository.init(kettleDatabaseMeta);
//連接資源庫
repository.connect("admin","admin");
TestOaJobX.repository=repository;
} catch (KettleException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setJod(String jobNames) {
//資源庫目錄
RepositoryDirectoryInterface directoryInterface;
try {
if(TestOaJobX.jobs==null){
directoryInterface = repository.loadRepositoryDirectoryTree();
jobNameArray=jobNames.split(",");
List<JobMeta> jobmetalist=new ArrayList<JobMeta>();
for(int i=0;i<jobNameArray.length;i++){
JobMeta jobMeta = repository.loadJob(jobNameArray[i],directoryInterface,null,null);//從資源庫中加載一個job
jobmetalist.add(jobMeta);
}
TestOaJobX.jobs=new ArrayList<Job>();
for(int i=0;i<jobmetalist.size();i++){
Job job = new Job(repository,jobmetalist.get(i));
TestOaJobX.jobs.add(job);
}
}
} catch (KettleException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void runJob(String jobNames){
setJod(jobNames);
String[] tmpname=jobNames.split(",");
for(int i=0;i<jobs.size();i++){
System.out.println("第"+(i+1)+"項作業:"+tmpname[i]+" 開始執行");
//jobs.get(i).start();
//jobs.get(i).run();
try {
jobs.get(i).execute(i, null);
} catch (KettleException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jobs.get(i).waitUntilFinished();
System.out.println("第"+(i+1)+"項作業:"+tmpname[i]+" 執行完畢");
if(jobs.get(i).getErrors()>0){
System.out.println("第"+(i+1)+"項作業:"+tmpname[i]+" 執行失敗!");
}else{
System.out.println("第"+(i+1)+"項作業:"+tmpname[i]+" 執行成功!");
}
}
}
public void stopJob(String jobName) {
if(jobNameArray!=null){
int m=0;
for(int i=0;i<jobNameArray.length;i++){
if(jobName.equals(jobNameArray[i])){
m=i;
}
}
jobs.get(m).stopAll();
if(jobs.get(m).getErrors()>0){
System.out.println("第"+(m+1)+"項作業:"+jobName+" 停止異常!");
}else{
System.out.println("第"+(m+1)+"項作業:"+jobName+" 停止成功!");
}
}else{
System.out.println("該作業未運行");
}
}
public String getStatus(String jobName) {
if(jobNameArray!=null){
int m=0;
for(int i=0;i<jobNameArray.length;i++){
if(jobName.equals(jobNameArray[i])){
m=i;
}
}
String status=jobs.get(m).getStatus();
return status;
}else{
System.out.println("該作業未運行");
return null;
}
}
public void errorView(String jobName) {
con=DBUtil.getConnection();
String sql="select * from ERROR_LOG where KJBNAME = ? order by ERROETIME desc";
try {
pst=con.prepareStatement(sql);
pst.setString(1,jobName);
rs=pst.executeQuery();
if(rs!=null){
while(rs.next()){
System.out.println(rs.getString("KJBNAME")+" "+
rs.getString("KTRNAME")+" "+
rs.getString("ERRORINFO")+" "+
rs.getString("ERROETIME"));
}
}
pst.close();
rs.close();
con.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
然后生成接口就行了。
現在有一個問題有待於解決,就是job的start、run、和execute方法都可以執行job,但是用job.start后不能第二次執行這個方法,但是run和execute可以