1、Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、Linux、Unix上運行,數據抽取高效穩定。下載圖形化界面的zip包格式的,直接解壓縮使用即可。安裝部署模式這里不說了,自己可以根據自己的需求安裝為單機模式或者集群模式。
Kettle的社區官網:https://community.hitachivantara.com/docs/DOC-1009855
Kettle的下載地址:https://sourceforge.net/projects/pentaho/files/Data%20Integration/
kettle國內鏡像下載:http://mirror.bit.edu.cn/pentaho/Data%20Integration/
2、由於這里只是演示了如何配置通過時間戳和批次號增量的導入數據,所以具體的操作不再敘述,具體的使用自己可以根據需求來使用。
Job如下所示:
思路。 批次量將一批數據從一個數據庫導入到另外一個數據庫,而且每批次的數據量不能重復。 這里使用時間戳,你也可以使用批次號。原理基本一樣,都是確定每一批次的數據量。 job步驟: 第一步。start,可以設置定時或者手動點擊啟動job。 第二步。執行轉換。 第三步。將start_time=next_time。 第四步。成功。 1、Start,類型可以選擇不需要定時,時間間隔,天,周,月。 默認不需要定時,如果需要定時的話,首先把重復的框勾選。 然后如果選擇時間間隔的話,可以輸入以分鍾計算的間隔或者以秒計算的間隔。 如果按天,就選擇天,然后選擇每天幾天的幾分開始跑。 如果按照周,就選擇每周的每天幾點幾分開始跑job。 如果是每月的話,就選擇那一月的每天幾點幾分跑job。 2、轉換的作業項名稱,自己填自己的作業項名稱, 在轉換設置的tab然后自己填自己的轉換文件名core_table_name_down。 高級tab,設置日志tab,位置參數tab, 命名參數tab,如果自己需要的話可以自己使用和研究。 3、作業項名稱,自己填自己的,數據庫連接,自己新建和編輯即可。 SQL腳本,自己填上自己的sql腳本。 這個主要是批次量導入數據,所以使用時間戳來實現批次量導入數據。 所以每次批次量導入數據結束,將start_time=next_time。這樣下次 執行這個job,就是下一批的數據量了。 update 數據表名稱 set start_time=next_time where table_name='數據表名稱' and part=第幾步
轉換如下所示:
注意: 1)、由於是將上一步查詢的值插入到下一步?的地方,所以一定要注意。 將帶有?的步驟,替換SQL語句里面的變量,進行勾選。 從步驟插入數據,進行選擇上一步的名稱。 2)、步驟名稱,自己起自己的名字。 數據庫連接,自己新建和編輯。 目標模式,如果是Greenplum或者postgresql要輸入自己的模式。 目標表就是自己的數據表。 提交記錄數量,一般1000或者2000。下面主選項使用批量插入進行勾選。 數據庫字段,自己獲取字段和映射, 更新,用來查詢的關鍵字和更新字段。自行配置。 Switch/Case,Switch字段和Case值數據類型和Case值。 自己根據自己的字段和類型進行填寫。 change步驟: 第一步。在數據源的庫表里面查詢出這批數據的最大時間或者最大的批次號。 第二步。然后在自己的數據表里面獲取到開始時間或者最小的批次號 (此數據表自己初始化好起始時間start_time或者最小批次號和查詢條件,比如第幾步和那一張表)。 將第一步獲取到的最大時間或者最大的批次號傳遞到第二步。 第三步。更新自己的初始化好的數據表,將自己初始化好的數據表的最大時間或者最大批次號字段修改。 同時進行表輸入進行查詢出數據。然后將這一步查詢的數據傳遞到Switch/Case。 第四步。Switch/Case。將上一步的數據根據Switch/Case。進行傳遞。 第五步。進行各種數據表的輸出。 第一步: 1)、select '數據表名稱' as table_name, 第幾步 as part, COALESCE(max(update_time), now()) as next_time from 數據表名稱 2)、postgresql,COALESCE()函數 主流數據庫系統都支持COALESCE()函數,這個函數主要用來進行空值處理,其參數格 式如下: COALESCE ( expression,value1,value2……,valuen) 。 COALESCE()函數的第一個參數expression為待檢測的表達式,而其后的參數個數不定。 COALESCE()函數將會返回包括expression在內的所有參數中的第一個非空表達式。如果 expression不為空值則返回expression;否則判斷value1是否是空值,如果value1不為空值則返 回value1;否則判斷value2是否是空值,如果value2不為空值則返回value3;……以此類推, 如果所有的表達式都為空值,則返回NULL。 3)、MySQL,IFNULL函數是MySQL控制流函數之一,它接受兩個參數,如果不是NULL,則返回第一個參數。 否則,IFNULL函數返回第二個參數。IFNULL(expression_1,expression_2); 第二步: 1)、select ? as table_name, ? as part, start_time, ? as next_time from 數據表名稱 where table_name='數據表名稱' and part=第幾步 2)、將第一步的三個參數,傳遞到第二步的三個問好的地方。 第三步:查詢出每個case所需要的值的數據。同時修改next_time最大時間或者最大批次號。 select *, CASE WHEN "字段"='標識位' THEN (select to_char(to_number(COALESCE(max("Cd_batch"), to_char(now(), 'yyyyMMdd')||'00000'), '9999999999999') + 1, '9999999999999') from core_data.dn_data_reconciliation where "TableName"='數據表名稱' and "字段"='標識位') END AS "Cd_batch", 'I' as "Cd_operation" from 數據表名稱 where '數據表名稱'=? and 第幾步=? and update_time>? and update_time<=?
待續.....