1 kettle.properties參數配置數據源連接和FTP連接
因為測試環境和生產環境中數據庫連接FTP等配置會在部署過程中變更,所以預先定義成配置項,在配置文件里改動。這樣測試和公布將會變得簡單,以下以數據庫為例說明這類配置的使用。
(1) 首先要找到配置文件,不同的操作系統路徑也不一樣。本人用win7進行開發,配置文件的路徑為“C:\Users\chenpeng\.kettle\kettle.properties”。例如以下:
(2) 配置文件里的詳細配置例如以下:
還能夠可視化設置:
(3) 詳細使用演示樣例
l 下方是數據庫連接配置:
l 下方是FTP連接配置:
1.1.2 kettle.properties參數設置和路徑及與正則配合使用
(1)在kettle.properties中設置變量值
(2)kettle.properties設置例如以下(win7下的路徑為:C:\Users\chenpeng\.kettle)
(4) 在輸出文件時使用參數中指定的路徑:
注:假設路徑出現錯誤,如把某個目錄刪除則會報錯。僅僅要是報錯,即使把目錄建好了也仍然會報目錄錯誤(好像有記憶功能),這時必須又一次啟動kettle才干正常執行。
l 在輸出文件時使用參數中和正則表達式混合使用場景:
1.1.3 kettle.properties參數在java代碼中的應用

1.1.4 作業中變量使用並用javascript設置變量值
上面的樣例都是kettle.properties中聲明的變量的應用。這些都是全局范圍內通用的,但非常多時間。子作業須要有內部專用的變量參數。這時就不能使用kettle.properties中聲明的變量了。須要在流程中聲明變量,並把作用域設置為當前作業有效。下面應用場景業務例如以下:文件名稱要命名成當前日期格式的。所以在作業級別定義了一個變量,但無法給它賦值。如是採用了javascript腳本方式給該變量賦值,然后在輸出文件名稱的位置應用該變量就可以,后面文件的刪除上傳都是公用部分都須要用到這個變量做為接口參數來做處理。
(1) 主流程例如以下:
(2) 定義變量:
(3) 子過程中調用javascript腳本改動值:
Date.prototype.Format = function (fmt) { //author: meizz var o = { "M+": this.getMonth() + 1, //月份 "d+": this.getDate(), //日 "h+": this.getHours(), //小時 "m+": this.getMinutes(), //分 "s+": this.getSeconds(), //秒 "q+": Math.floor((this.getMonth() + 3) / 3), //季度 "S": this.getMilliseconds() //毫秒 }; if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length)); for (var k in o) if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr (("" + o[k]).length))); return fmt; } var dateTime = new Date().Format("yyyyMMdd"); // gives back today at yyyy/MM/dd HH:mm:ss.000 setVariable("curdate",dateTime,"s");
另外,假設是用上一節點的字段值,改動變量值則更為簡單,例如以下:
(4) 使用變量(按常規使用):
1.1.5 Java代碼訪問變量調用jar包並生成驗證文件
這是一個較為綜合性的演示樣例。首先定義了一條記錄(可能理解為非常多個變量),然后通過java代碼來調用jar包,計算出記錄數、文件大小、MD5值等,賦值給對應記錄字段。並輸出到文件,形成數據文件的校驗文件。
(1) 主流程例如以下:
(2) 生成記錄
(3) Java處理
//導入在eclipse中編輯好的包。主要用於計算文件行數、MD5值 import cgb.tools.KettleHelper; import java.io.File; import java.io.IOException; //kettle中已定義好的行處理方法,每行記錄都會運行一次 public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //(1)獲取到上一個步驟的輸入行 Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //(2)讀取出參數變量值 String kettleoutputdir = getVariable("kettleoutputdir", ""); String hbprovince_code = getVariable("hbprovince_code", ""); String item_code = getVariable("item_code", ""); String curdate = getVariable("curdate", ""); String filename = ""; String onlyfilename = ""; String recordcount = ""; String bytecount = ""; String md5code = ""; //(3)調用jar包計算出MD5值、行數、字節數 try { filename = kettleoutputdir + "\\" + hbprovince_code + "_" + item_code + "_day_" + curdate + ".csv"; onlyfilename = hbprovince_code + "_" + item_code + "_day_" + curdate + ".csv"; md5code = KettleHelper.getFileMD5String(filename); recordcount = String.valueOf(KettleHelper.getFileRecordCount(filename, true)); bytecount = String.valueOf(KettleHelper.getFileByteCount(filename)); } catch (IOException e) { e.printStackTrace(); } //(4)把計算好的值放入到輸出記錄中 get(Fields.Out, "filename").setValue(r, onlyfilename); get(Fields.Out, "recordcount").setValue(r, recordcount); get(Fields.Out, "byteCount").setValue(r, bytecount); get(Fields.Out, "md5code").setValue(r, md5code); //(5)輸出到下一個節點做處理 putRow(data.outputRowMeta, r); return true; }
(4) Jar包的開發和導出
當中調用的jar包,能夠選擇用eclipse來生成。例如以下:
package XXX.tools; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class KettleHelper { /** * 默認的密碼字符串組合。用來將字節轉換成 16 進制表示的字符,apache校驗下載的文件的正確性用的就是默認的這個組合 */ private static char hexDigits[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; private static MessageDigest messagedigest = null; static { try { messagedigest = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException nsaex) { System.err.println("MD5Util.class.getName()" + "初始化失敗,MessageDigest不支持MD5Util。"); nsaex.printStackTrace(); } } /** * 生成字符串的md5校驗值 * * @param s * @return */ private static String getMD5String(String s) { return getMD5String(s.getBytes()); } /** * 推斷字符串的md5校驗碼是否與一個已知的md5碼相匹配 * * @param password 要校驗的字符串 * @param md5PwdStr 已知的md5校驗碼 * @return */ public static boolean checkPassword(String password, String md5PwdStr) { String s = getMD5String(password); return s.equals(md5PwdStr); } /** * 生成文件的md5校驗值 * * @param file * @return * @throws IOException */ public static String getFileMD5String(String fileName) throws IOException { File file = new File(fileName); InputStream fis; fis = new FileInputStream(file); byte[] buffer = new byte[1024]; int numRead = 0; while ((numRead = fis.read(buffer)) > 0) { messagedigest.update(buffer, 0, numRead); } fis.close(); return bufferToHex(messagedigest.digest()); } /** * 獲取文件的記錄條數 * * @param file * @return * @throws IOException */ public static int getFileRecordCount(String fileName,boolean hasHeadRow) throws IOException { File inFile = new File(fileName); // 讀取的CSV文件 @SuppressWarnings("unused") String inString = ""; int count = 0; try { BufferedReader reader = new BufferedReader(new FileReader(inFile)); while((inString = reader.readLine())!= null){ count++; } reader.close(); } catch (FileNotFoundException ex) { System.out.println("沒找到文件!"); } catch (IOException ex) { System.out.println("讀寫文件出錯!
"); } if(hasHeadRow) { count--; } return count; } /** * 生成文件的md5校驗值 * * @param file * @return * @throws IOException */ public static String getFileMD5String(File file) throws IOException { InputStream fis; fis = new FileInputStream(file); byte[] buffer = new byte[1024]; int numRead = 0; while ((numRead = fis.read(buffer)) > 0) { messagedigest.update(buffer, 0, numRead); } fis.close(); return bufferToHex(messagedigest.digest()); } private static String getMD5String(byte[] bytes) { messagedigest.update(bytes); return bufferToHex(messagedigest.digest()); } private static String bufferToHex(byte bytes[]) { return bufferToHex(bytes, 0, bytes.length); } private static String bufferToHex(byte bytes[], int m, int n) { StringBuffer stringbuffer = new StringBuffer(2 * n); int k = m + n; for (int l = m; l < k; l++) { appendHexPair(bytes[l], stringbuffer); } return stringbuffer.toString(); } private static void appendHexPair(byte bt, StringBuffer stringbuffer) { char c0 = hexDigits[(bt & 0xf0) >> 4];// 取字節中高 4 位的數字轉換, >>> 為邏輯右移,將符號位一起右移,此處未發現兩種符號有何不同 char c1 = hexDigits[bt & 0xf];// 取字節中低 4 位的數字轉換 stringbuffer.append(c0); stringbuffer.append(c1); } /** * 獲取文個把的字節數 * @param fileName * @return * @throws IOException */ public static long getFileByteCount(String fileName) throws IOException { File file = new File(fileName); return file.length(); } }
導出jar包
放入到kettle的jar包文件夾,它會自載入。操作系統不同,文件夾也會不同。本人使用的是win7,文件夾例如以下:
(5) 文件輸出
(6) 終於生成文件的效果例如以下:
1.1.6 SQL中使用變量
以下的查詢語句用問號占位符,當開始日期(第一個?號)和結束日期(第二個?號)綁定到SQL的問號占位符,在查詢入職日期在一定期間的總統信息:
SELECTname,took_office FROMpresidents WHEREtook_officeBETWEEN? AND?
演示樣例中。首先使用生成行步驟(“Generdate Rows”)生成一行帶有兩個字段的記錄,分別按順序取代表輸入SQL語句中的占位符。
實際場景中,通常使用動態處理結果產生期望值取代生成行步驟。
接下來是表輸入步驟,當中配置SQL查詢語句。包括問號占位符,通過在“Insert Data Step”的下拉框中選擇前一步驟,來替換問號的值。
通過傳輸不同的值多次運行查詢
假設你想循環運行查詢。使用不同值替換占位符。就須要占位符生產步驟生成多行數據,並把表輸入的選項“Execute for each row”選中。本演示樣例文件名為placeholders_in_loop.ktr。
占位符的局限性
盡管通過給占位符綁定值查詢很有效,但也有一些場景不能使用,以下一些SQL不能使用占位符。
這些演示樣例都很通用。可是不能使用占位符。
不能用占位符取代表名詞,否則查詢將不運行。
SELECT some_fieldFROM ?
不能使用占位符取代查詢的字段名稱,以下的查詢能夠成功綁定參數,但僅僅是作為一個常量,而不是字段的名稱。
SELECT ? asmy_field FROM table
不能使用占位符綁定逗號分隔的多個列表項值;假設你綁定“1,2,3″給以下的查詢語句,將得到意外的結果。
SELECT * FROM testWHERE id IN(?
)
你期望得到的結果是:
SELECT * FROM testWHERE id IN("1,2,3")
可是執行的結果卻是這樣,傳輸一個字符串,卻得到三個值,而實際情況全然不確定有幾個值傳輸進來。
SELECT * FROM testWHERE id IN(1,2,3)
為了解決這些場景的問題,須要使用kettle的變量動態構造查詢文本。以下具體說明。
SQL查詢中使用kettle變量
表輸入步驟支持替換查詢中的變量或參數。如果有一系列結構全然相關的表,各自是: mammals, birds, insects(動物、鳥、昆蟲),能夠使用kettle變量作為表的名稱。
如果我們有一個變量。名稱為:ANIMALS_TABLE,賦值為birds,我們設置“Replace Vaiables”選項選中。假設我們寫以下的查詢:
SELECT name,population FROM${ANIMALS_TABLE}
在運行一定被成功的替換成:
SELECT name,population FROM birds
假設設置變量的值為“mammals”或“insects”。則將動態查詢不同的表。當占位符不能勝任是,使用變量技術能夠幫助我們解決。演示樣例的名稱為variables.ktr。執行時不要忘了給parameter(命名參數)賦值進行測試。
變量和占位符一起使用
假設有必要。我們能夠混合這兩種技術;本演示樣例中使用變量作為表名詞,同一時候使用占位符作為前面步驟的輸入值。
演示樣例文件variables_and_placeholders.ktr。
SELECT name, population FROM${ANIMALS_TABLE}WHERE population > ?