使用kettle來根據時間戳或者批次號來批量導入數據,達到增量的效果。


1、Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、Linux、Unix上運行,數據抽取高效穩定。下載圖形化界面的zip包格式的,直接解壓縮使用即可。安裝部署模式這里不說了,自己可以根據自己的需求安裝為單機模式或者集群模式。    
Kettle的社區官網:https://community.hitachivantara.com/docs/DOC-1009855        
Kettle的下載地址:https://sourceforge.net/projects/pentaho/files/Data%20Integration/
kettle國內鏡像下載:http://mirror.bit.edu.cn/pentaho/Data%20Integration/
2、由於這里只是演示了如何配置通過時間戳和批次號增量的導入數據,所以具體的操作不再敘述,具體的使用自己可以根據需求來使用。

Job如下所示:

思路。
批次量將一批數據從一個數據庫導入到另外一個數據庫,而且每批次的數據量不能重復。
這里使用時間戳,你也可以使用批次號。原理基本一樣,都是確定每一批次的數據量。
job步驟:
    第一步。start,可以設置定時或者手動點擊啟動job。
    第二步。執行轉換。
    第三步。將start_time=next_time。
    第四步。成功。        
        
1、Start,類型可以選擇不需要定時,時間間隔,天,周,月。
默認不需要定時,如果需要定時的話,首先把重復的框勾選。
然后如果選擇時間間隔的話,可以輸入以分鍾計算的間隔或者以秒計算的間隔。
如果按天,就選擇天,然后選擇每天幾天的幾分開始跑。
如果按照周,就選擇每周的每天幾點幾分開始跑job。
如果是每月的話,就選擇那一月的每天幾點幾分跑job。
        
2、轉換的作業項名稱,自己填自己的作業項名稱,
在轉換設置的tab然后自己填自己的轉換文件名core_table_name_down。
高級tab,設置日志tab,位置參數tab,
命名參數tab,如果自己需要的話可以自己使用和研究。
        
3、作業項名稱,自己填自己的,數據庫連接,自己新建和編輯即可。
SQL腳本,自己填上自己的sql腳本。
這個主要是批次量導入數據,所以使用時間戳來實現批次量導入數據。
所以每次批次量導入數據結束,將start_time=next_time。這樣下次
執行這個job,就是下一批的數據量了。
update 數據表名稱 set start_time=next_time where table_name='數據表名稱' and part=第幾步


轉換如下所示:

注意:
1)、由於是將上一步查詢的值插入到下一步?的地方,所以一定要注意。
將帶有?的步驟,替換SQL語句里面的變量,進行勾選。
從步驟插入數據,進行選擇上一步的名稱。
2)、步驟名稱,自己起自己的名字。
    數據庫連接,自己新建和編輯。
    目標模式,如果是Greenplum或者postgresql要輸入自己的模式。
    目標表就是自己的數據表。
    提交記錄數量,一般1000或者2000。下面主選項使用批量插入進行勾選。
    數據庫字段,自己獲取字段和映射,
    更新,用來查詢的關鍵字和更新字段。自行配置。
    Switch/Case,Switch字段和Case值數據類型和Case值。
        自己根據自己的字段和類型進行填寫。

change步驟:
    第一步。在數據源的庫表里面查詢出這批數據的最大時間或者最大的批次號。
    第二步。然后在自己的數據表里面獲取到開始時間或者最小的批次號
    (此數據表自己初始化好起始時間start_time或者最小批次號和查詢條件,比如第幾步和那一張表)。
    將第一步獲取到的最大時間或者最大的批次號傳遞到第二步。
    第三步。更新自己的初始化好的數據表,將自己初始化好的數據表的最大時間或者最大批次號字段修改。
        同時進行表輸入進行查詢出數據。然后將這一步查詢的數據傳遞到Switch/Case。
    第四步。Switch/Case。將上一步的數據根據Switch/Case。進行傳遞。
    第五步。進行各種數據表的輸出。        
        
第一步:
1)、select '數據表名稱' as table_name, 第幾步 as part, COALESCE(max(update_time), now()) as next_time from 數據表名稱
2)、postgresql,COALESCE()函數 
    主流數據庫系統都支持COALESCE()函數,這個函數主要用來進行空值處理,其參數格
式如下: COALESCE ( expression,value1,value2……,valuen) 。
    COALESCE()函數的第一個參數expression為待檢測的表達式,而其后的參數個數不定。
COALESCE()函數將會返回包括expression在內的所有參數中的第一個非空表達式。如果
expression不為空值則返回expression;否則判斷value1是否是空值,如果value1不為空值則返
回value1;否則判斷value2是否是空值,如果value2不為空值則返回value3;……以此類推,
如果所有的表達式都為空值,則返回NULL。
3)、MySQL,IFNULL函數是MySQL控制流函數之一,它接受兩個參數,如果不是NULL,則返回第一個參數。 
    否則,IFNULL函數返回第二個參數。IFNULL(expression_1,expression_2);
        
第二步:    
1)、select ? as table_name, ? as part, start_time, ? as next_time from 數據表名稱 where table_name='數據表名稱' and part=第幾步
2)、將第一步的三個參數,傳遞到第二步的三個問好的地方。
        
第三步:查詢出每個case所需要的值的數據。同時修改next_time最大時間或者最大批次號。
select *, CASE  WHEN "字段"='標識位' THEN (select to_char(to_number(COALESCE(max("Cd_batch"), to_char(now(), 'yyyyMMdd')||'00000'), '9999999999999') + 1, '9999999999999') 
from core_data.dn_data_reconciliation where "TableName"='數據表名稱' and "字段"='標識位') END AS "Cd_batch", 'I' as "Cd_operation"  
from 數據表名稱 where '數據表名稱'=? and 第幾步=?  and update_time>? and update_time<=?

 

待續.....


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM