Kettle實現數據庫遷移
需求:
做數據倉庫時,需要將業務系統CRM抽取到數據倉庫的緩沖層,業務系統使用的是SqlServer數據庫,數據倉庫的緩沖層使用的是mysql數據庫,為實現數據庫的遷移,即將SqlServer數據庫中的所有表與數據遷移到Mysql。
解決辦法: kettle設計一整套流程實現,讀取數據庫中表->創建表->表數據抽取
實現過程:
整套流程分為:2個job,4個trans。使用到的Trans插件:表輸入、字段選擇、復制記錄到結果、從結果獲取記錄、設置變量、java腳本、表輸出。
1、表數據抽取作業:
作用:首先獲取數據庫中所有的表名稱 然后調用子Job進行表的創建、數據抽取

2.表名稱獲取流程

要遷移的源庫表名稱獲取,並設置到結果集,為下面的job使用。 其中的表輸入使用的是show tables,復制數據庫中所有的表,也可以從表中或者excel中輸入,實現更加小粒度的控制。

show tables 結果為Tables_in_數據庫名稱,和具體數據庫有關,故需要改名

3、子作業: 實現單個表格的創建及抽取
4、表名稱變量設置
上一步的子轉換


5、入庫表結構創建
執行的Java代碼如下
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { // First, get a row from the default input hop Object[] r = getRow(); org.pentaho.di.core.database.DatabaseMeta dbmeta = null; System.out.println(123); System.out.println( getTrans().getRepository()); System.out.println(456); java.util.List list = getTrans().getRepository().readDatabases(); if(list != null && !list.isEmpty()) { for(int i=0;i<list.size();i++) { dbmeta = (org.pentaho.di.core.database.DatabaseMeta)list.get(i); //test1 為數據庫名稱 if("test1".equalsIgnoreCase(dbmeta.getName())) { break; } } } if(dbmeta!=null) { org.pentaho.di.core.database.Database db=new org.pentaho.di.core.database.Database(dbmeta); try { db.connect(); String tablename = getVariable("TABLENAME"); logBasic("開始創建表:" + tablename); if(tablename!=null && tablename.trim().length()>0) { String sql = db.getDDLCreationTable(tablename, data.inputRowMeta);//${TABLENAME} db.execStatement(sql.replace(";", "")); logBasic(sql); } } catch(Exception e) { logError("創建表出現異常",e); }finally{ db.disconnect(); } } return false; }

引用原文:
1、源表若存在有blob的表,會有問題,可能是由於表輸出沒有指定字段的原因
2、以上的操作使用的是倉庫,kettle repo會報錯
3、將原文中String sql = db.getDDL(tablename, data.inputRowMeta);函數名 getDDL 改為 getDDLCreationTable
3、將原文中String sql = db.getDDL(tablename, data.inputRowMeta);函數名 getDDL 改為 getDDLCreationTable
4、去除了原文中創建表之前表輸入一個操作,原文當有空表需要復制時候,會報錯
參考:
原文地址: 用Kettle的一套流程完成對整個數據庫遷移
data-integration\samples\jobs\process all tables 實現整個數據庫的遷移,
代碼下載
http://pan.baidu.com/s/1nt7LOj3