A庫a表(紅色為抽取字段):

關聯用戶表:

B庫b表(紅色為抽取字段):

關聯用戶表

C目標庫SYS_OPLOG表(c表)

利用kettle抽取A庫a表(具體名稱見上圖),B庫b表的上面紅色框起來的字段到C庫c表。由於c表LOG_ID為主鍵且類型為varachar類型,而A庫a表與B庫b表的主鍵f_operation_id列為int類型(自增),
所以抽取時,我將"數據庫名_f_operation_id"組織成c表的LOG_ID,在C表中為了區分不同系統,我將"數據庫名"作為c表的F_XTBH列。每次抽取時,只抽取c表中不存在的數據,解決方案如下:
1、在目標庫建立OPKETTLE表,其中F_XTBH為待抽取表的數據庫名,maxindex為當前已經抽取到的表的f_operation_id位置,(maxindex初始值為0)

2、從OPKETTLE表中查詢A數據庫的maxindex的值
SELECT maxindex f_operation_id FROM OPKETTLE where F_XTBH = 'sipctask';
3、抽取A數據庫a表的數據,從maxindex+1開始抽取
SELECT
u.f_username F_ZGBH
, CONCAT('sipctask_',o.f_operation_id) LOG_ID
, o.f_ip F_IP
, o.f_time F_STIME
, o.f_name F_GNBH
, o.f_operation_id F_MAXID
,'sipctask' F_XTBH
FROM st_operation_log o,st_user u
WHERE o.f_user_id = u.f_user_id and o.f_name != 'login.success' and o.f_name != 'login.logout' and o.f_name != 'login.failure' and o.f_operation_id > ?;
在kettle轉換中,通過替換SQL語句中的變量可以獲取到第2步sql語句里查詢到的當前項目的maxindex值,之后將查詢到的數據插入到目標庫c表中。
4、更新OPKETTLE表的maxindex的值
SELECT MAX(F_MAXID) maxindex,F_XTBH FROM SYS_OPLOG where F_XTBH = 'sipcportal';
在c表中查詢出當前系統已經抽取到的最大編號,之后維護OPKETTLE表的數據
抽取B庫b表的過程與上面一致,不在贅余。
抽取A庫a表數據到C表c庫數據轉換圖如下:

抽取最大編號:

表輸入:

表輸出:

更新OPKETTLE表轉換圖如下:

表輸入:

更新:

新建任務(其中"轉換"為抽取A庫a表數據到C表c庫數據轉換圖,"轉換2"為更新OPKETTLE轉換圖):

Linux定時任務(3分鍾執行一次,KETTLE JOB也可以執行定時任務,但是網上說Linux定時任務性能比較好,穩定):
[root@localhost data]# cat kettle_login_ontime.sh #!/bin/sh source /etc/profile ROOT_TOPDIR=/home/data/kettle/ rm -r /home/data/kettle/data-integration/system/karaf/data1 /home/data/kettle/data-integration/kitchen.sh -file=/home/data/kettle/oplogjob.kjb >> /home/data/kettle/data-integration/logs/portaloplog$(date +%Y%m%d).log rm -r /home/data/kettle/data-integration/system/karaf/data1 /home/data/kettle/data-integration/kitchen.sh -file=/home/data/kettle/sipctaskoplogjob.kjb >> /home/data/kettle/data-integration/logs/sipctaskoplog$(date +%Y%m%d).log

