環境:windows7,jvm內存設置14G,kettle5.1后來升級到5.4,oracle作為資源庫。
問題背景:我們通過web頁面管理kettle的job運行,這只是一個管理界面,即使web項目停掉也不會影響job的運行情況,實際運行job的是后台程序,隨着job數量的增多,達到三四百個時,job的運行速度也達到了難以接受的程度。
方案1:
針對出現的問題,經測試發現,job一經運行就不會再重新從資源庫讀取了(針對定時運行的job),job中的轉換則每次都會重新從資源庫中讀取,我找到了org.pentaho.di.job.entries.trans.JobEntryTrans這個類,這是一個作業控件,代表一個轉換,調試跟蹤代碼就會發現他確實會在每次運行時重新讀取資源庫加載轉換,因為kettle每次運行時都是克隆了這個控件,然后運行時重新加載,具體還是調試跟蹤代碼才能更清楚的了解,對於他為什么要克隆,我也不是很了解,當然也不敢亂動相關代碼,估計因為里面有些狀態屬性吧。
解決問題的思路是盡量從底層,具體的問題點解決問題,一開始就想過優化具體讀取轉換的過程,但沒有去嘗試,而是采用了更具針對性,更簡單的方式,在JobEntryTrans這個類里面建了個靜態Map,用於緩存讀取過的轉換,這個緩存的生命周期與對應的job差不多,因為每次從數據庫讀取JobEntryTrans這個控件時,都會清除該控件引用的轉換的緩存,若多個job都引用了這個轉換,那他的生命周期比job還短。
這個方案需要改的代碼量很小,讀取轉換時,先查看緩存,沒有就讀取資源庫,然后緩存,下次就直接用緩存了,其次就是在JobEntryTrans類的讀取資源庫的方法中加上清除對應轉換緩存的代碼。
緩存??到底以什么形式緩存?為了盡量減小對原有邏輯的影響我先緩存了xml,就是調用了TransMeta的getXml方法,下次讀取時就直接使用xml,這個方式在測試環境是沒問題的,但在正式環境中始終有個轉換有問題,運行異常,正式環境不方便調試,於是我就改為直接緩存TransMeta對象,每次就直接用了,這對寫代碼來說肯定更簡單了,但這個對象里面到底有哪些東西,我沒有精力去仔細分析,抱着試一試的態度實現了,經測試效果還是很不錯的,緩存xml時的問題不存在了,經過一段時間的運行,沒有發現什么大問題,就是記錄的日志感覺有點問題,該方案現在依然在使用中,基本沒有影響kettle完成它的工作。
這個代碼量不大,就貼出來,就是修改了kettle5.4中TransMeta這個類的兩個方法:
// Load the jobentry from repository // public void loadRep( Repository rep, IMetaStore metaStore, ObjectId id_jobentry, List<DatabaseMeta> databases, List<SlaveServer> slaveServers ) throws KettleException { try {
//.................這里的源碼與官方一致。
passingAllParameters = rep.getJobEntryAttributeBoolean( id_jobentry, "pass_all_parameters", true ); if(transMetaMap.containsKey(getDirectory()+"/"+getName())){ logBasic( "該轉換已經緩存,馬上移除緩存:" + getDirectory()+"/"+getName() ); transMetaMap.remove(getDirectory()+"/"+getName()); } } catch ( KettleDatabaseException dbe ) { throw new KettleException( "Unable to load job entry of type 'trans' from the repository for id_jobentry=" + id_jobentry, dbe ); } } public TransMeta getTransMeta( Repository rep, IMetaStore metaStore, VariableSpace space ) throws KettleException { try { TransMeta transMeta = null; switch( specificationMethod ) { case FILENAME: long start = new Date().getTime(); String filename = space.environmentSubstitute( getFilename() ); logBasic( "Loading transformation from XML file [" + filename + "]" ); transMeta = new TransMeta( filename, metaStore, null, true, this, null ); log.logBasic(transMeta.getName()+",從文件讀取轉換耗時:"+(new Date().getTime()-start)); break; case REPOSITORY_BY_NAME: if(transMetaMap.containsKey(getDirectory()+"/"+getName())){ logBasic( "該轉換已經緩存,直接使用緩存:" + getDirectory()+"/"+getName() ); transMeta = transMetaMap.get(getDirectory()+"/"+getName()); }else{ String transname = space.environmentSubstitute( getTransname() ); String realDirectory = space.environmentSubstitute( getDirectory() ); logBasic( BaseMessages.getString( PKG, "JobTrans.Log.LoadingTransRepDirec", transname, realDirectory ) ); if ( rep != null ) { // // It only makes sense to try to load from the repository when the // repository is also filled in. // // It reads last the last revision from the repository. // RepositoryDirectoryInterface repositoryDirectory = rep.findDirectory( realDirectory ); transMeta = rep.loadTransformation( transname, repositoryDirectory, null, true, null ); transMetaMap.put(getDirectory()+"/"+getName(), transMeta); logBasic( "從資源庫獲取轉換並緩存:" + getDirectory()+"/"+getName() ); } else { throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.NoRepDefined" ) ); } } break; case REPOSITORY_BY_REFERENCE: if ( transObjectId == null ) { throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.ReferencedTransformationIdIsNull" ) ); } if ( rep != null ) { // Load the last revision // transMeta = rep.loadTransformation( transObjectId, null ); } break; default: throw new KettleException( "The specified object location specification method '" + specificationMethod + "' is not yet supported in this job entry." ); } if ( transMeta != null ) { // copy parent variables to this loaded variable space. // transMeta.copyVariablesFrom( this ); // Pass repository and metastore references // transMeta.setRepository( rep ); transMeta.setMetaStore( metaStore ); } return transMeta; } catch ( Exception e ) { throw new KettleException( BaseMessages.getString( PKG, "JobTrans.Exception.MetaDataLoad" ), e ); } }
方案1總結:
1.該方案解決了轉換重復加載的問題,在一次加載后,進行了緩存,並給出了清除緩存重新加載的機制,實際使用效果是:第一次運行仍然很慢,但之后就是飛一般的感覺,之前沒有數據每次都要運行十來分鍾,現在就只需幾秒鍾,證明這個問題就是導致kettle運行慢的唯一原因,這里我們不完美的解決了他。
2.該方案第一次運行job仍然很慢,日志記錄可能也有問題,對時效性要求高的話,可能該方案還不能完全解決問題,因為重啟后台程序時,job的延遲仍然可能達到一個多小時,當然也就剛重啟那一下的事,就看你的業務能不能接受了。
方案2:
方案1中重啟后台程序導致的高延遲,我們的業務上仍然不能忍受,於是開始思考其他解決方案。
通過嘗試解決這個問題,認為想通過改進kettle從數據庫讀取轉換的過程來優化是行不通的,因為kettle讀取資源庫的過程很復雜,涉及很多邏輯(仔細看過相關代碼,並測試了各個關鍵操作的耗時,對測試出的耗時操作進行分析后認為很難優化),另外經測試,文件資源庫讀取轉換要比數據庫資源庫快近100倍,於是決定采用如下方式解決:
1.創建job等過程不做任何改變,所有操作都對數據庫資源庫。
2.后台job在獲取一些job信息時也使用數據庫資源庫,但在最后一步運行job時,獲取文件資源庫中的job並運行,這里就要求job啟動時,兩個資源庫相關job要相同,特別是job名稱和路徑,這是確認同一個job的方式,實際運行的就是文件資源庫中的job了。
3.這里就涉及到文件資源庫與數據庫資源庫的同步問題了:
1)人工手動將數據庫資源庫導出到指定的文件資源庫。
2)頁面控制job的啟動與停止,只在頁面請求啟動job時后台自動將該job相關信息同步到文件資源庫(待實現),其他地方不做任何改變,即使后台程序重啟,也不用同步job,這樣重啟后台程序時就不會出現幾十分鍾job都沒有運行起來的情況了。
3)當我們修改了job,確認要運行了,這時就在頁面先停止job,再啟動就實現同步了。
以上只是問題解決思路,具體實現也實現了個大概,資源庫同步還有問題,通過以上方式實際並沒有解決數據庫資源庫讀取慢的問題,這需要對kettle有更深入的了解的技術大牛來解決。
這個解決方案達到的效果:
1.解決了后台程序重啟時高延遲的問題(主要問題)。
2.間接減輕了數據庫資源庫的壓力,這樣我們創建修改job時就會快一些。
3.算是備份了一下資源庫。
4.給出了同步資源庫的方案,其實這里的文件資源庫與上面的緩存很相似,只是緩存得更徹底。