使用Kettle實現數據實時增量同步


0. 前言

本文介紹了使用Kettle對一張業務表數據(500萬條數據以上)進行實時(10秒)同步,采用了時間戳增量回滾同步的方法。關於ETL和Kettle的入門知識大家可以閱讀相關的blog和文檔學習。
1. 時間戳增量回滾同步

假定在源數據表中有一個字段會記錄數據的新增或修改時間,可以通過它對數據在時間維度上進行排序。通過中間表記錄每次更新的時間戳,在下一個同步周期時,通過這個時間戳同步該時間戳以后的增量數據。這是時間戳增量同步。

但是時間戳增量同步不能對源數據庫中歷史數據的刪除操作進行同步,我們可以通過在每次同步時,把時間戳往前回滾一段時間,從而同步一定時間段內的刪除操作。這就是時間戳增量回滾同步,這個名字是我自己給取得,意會即可,就是在時間戳增量同步的同時回滾一定的時間段。

說明:

    源數據表 需要被同步的數據表
    目標數據表 同步至的數據表
    中間表 存儲時間戳的表

2. 前期准備

在兩個數據庫中分別創建數據表,並通過腳本在源數據表中插入500萬條數據,完成后再以每秒一條的速度插入新數據,模擬生產環境。

源數據表結構如下:

CREATE TABLE `im_message` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `sender` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息發送者:SYSTEM',
  `send_time` datetime(6) NOT NULL,
  `receiver` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息接受者',
  `content` varchar(255) COLLATE utf8_bin NOT NULL COMMENT '消息內容',
  `is_read` tinyint(4) NOT NULL COMMENT '消息是否被讀取:0-未讀;非0-已讀',
  `read_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='消息表'

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

3. 作業流程

    開始組件
    建時間戳中間表
    獲取中間表的時間戳,並設置為全局變量
    刪除目標表中時間戳及時間戳以后的數據
    抽取兩個數據表的時間戳及時間戳以后的數據進行比對,並根據比對結果進行刪除、新增或修改操作
    更新時間戳

4. 創建作業

作業的最終截圖如下:
作業截圖
4.1 創建作業和DB連接

打開Spoon工具,新建作業,然后在左側主對象樹DB連接中新建DB連接。創建連接並測試通過后可以在左側DB連接下右鍵共享出來。因為在單個作業或者轉換中新建的DB連接都是局域數據源,在其他轉換和作業中是不能使用的,即使屬於同一個作業下的不同轉換,所以需要把他們共享,這樣DB連接就會成為全局數據源,不用多次編輯。
4.2 建時間戳中間表

這一步是為了在目標數據庫建中間表etl_temp,並插入初始的時間戳字段。因為該作業在生產環境是循環調用的,該步驟在每一個同步周期中都會調用,所以在建表時需要判斷該表是否已經存在,如果不存在才建表。

SQL代碼和組件配置截圖如下:

CREATE TABLE IF NOT EXISTS etl_temp(id int primary key,time_stamp timestamp);
INSERT IGNORE INTO etl_temp (id,time_stamp) VALUES (1,'2018-05-22 00:00:00');

    1
    2

中間配置截圖

    我把該作業時間戳的ID設為1,在接下來的步驟中也是通過這個ID查詢我們想要的時間戳

4.2 獲取時間戳並設為變量

新建一個轉換,在轉換中使用表輸入和設置變量兩個組件
表輸入

SQL代碼和組件配置截圖如下

在Kettle中設置的變量都是字符串類型,為了便於比較。我在SQL語句把查出的時間戳進行了格式轉換

select date_format(time_stamp , '%Y-%m-%d %H:%i:%s') time_stamp from etl_temp where id='1'

    1

這里寫圖片描述
設置變量

變量活動類型可以為該變量設置四種有效活動范圍,分別是JVM、該Job、父Job和祖父Job

這里寫圖片描述
4.3 刪除目標表中時間戳及時間戳以后的數據

這樣做有兩個好處:

    避免在同步中重復或者遺漏數據。例如當時間戳在源數據表中不是唯一的,上一次同步周期最后一條數據的時間戳是2018-05-25 18:12:12,那么上一次同步周期結束后中間表中的時間戳就會更新為2018-05-25 18:12:12。如果在下一個同步周期時源數據表中仍然有時間戳為2018-05-25 18:12:12的新數據,那么同步就會出現數據不一致。采用大於時間戳的方式同步就會遺漏數據,采用等於時間戳的方式同步就會重復同步數據。
    增加健壯性 當作業異常結束后,不用做任何多余的操作就可以重啟。因為會刪除目標表中時間戳及時間戳以后的數據,所以不用擔心數據一致性問題

    2018-09-29:對增加健壯性進行補充:在一次同步周期中腳本異常中斷,這時候中間表的時間戳沒有更新,但是目標表已經同步了部分數據,當再次啟動腳本就會出現數據重復的情況,而且在很多時候因為主鍵的存在,腳本啟動會報錯

在組件中使用了上一步驟設置的變量,所以必須勾選使用變量替換

delete from test_kettle.im_message where send_time>='${TIME_STAMP}'

    1

這里寫圖片描述
4.4 抽取、比對和更新數據

這一步才是真正的數據同步步驟,完成了數據的抽取、比對,並根據不同的比對結果刪除、更新、插入或不做任何操作。
正如前文所說,為了同步刪除操作,在原始表輸入和目標表輸入步驟中回滾了一定時間段。其中回滾的時間段設置為了全局的參數。左右空白處右鍵即可設置參數,該作業下的所有作業和轉換都能使用,設置如下圖

這里寫圖片描述

轉換截圖如下

這里寫圖片描述
原始表輸入

SELECT
  id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM ueqcsd.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

這里寫圖片描述
目標表輸入

SELECT
  id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM test_kettle.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

這里寫圖片描述

    注意兩個組件的數據庫鏈接是不同的,當然它們也就這個和名字不同

比對記錄

對兩個表輸入查出的數據進行比對,並把比對的結果寫進輸入流,傳遞給后面的組件。
比對的結果有三種:

    new
    changed
    deleted

標注字段表示比對結果的字段名,后面有用。關鍵字段表示比對的字段,在這個作業中我們比較兩個的主鍵ID。

這里寫圖片描述
Switch

該步驟對上一步驟產生的標注字段進行路由,不同的結果路由到不同的步驟。其中目標步驟表示下一步驟的名字。

這里寫圖片描述
插入

Kettle有一個插入/更新組件,但是據網友介紹這個組件性能低下,每秒最多只能同步幾百條數據,所有我對插入和更新分別作了不同的處理。插入使用表輸出組件;更新使用更新組件。
為了進一步提升同步效率,我在表輸出組件使用了多線程(右鍵>改變開始復制的數量),使同步速度達到每秒12000條。Switch組件和表輸出組件中間的虛擬組件(空操作)也是為了使用多線程添加的。

這里寫圖片描述
這里寫圖片描述

    勾選批量插入,可以極大提高同步速度

更新和刪除

這里寫圖片描述

這里寫圖片描述
4.5 更新時間戳

set @new_etl_start_time_stamp = (SELECT SEND_TIME FROM test_kettle.im_message ORDER BY SEND_TIME DESC LIMIT 1);
update etl_temp set time_stamp=@new_etl_start_time_stamp where id='1';

    1
    2

這里寫圖片描述
4.6 發送郵箱

關於發送郵件組件網上有很多資料,就不多做介紹。特別強調一點,郵箱密碼是 單獨的授權碼,而不是郵箱登錄密碼。
運行

在開發環境點擊Spoon界面左上角三角符號運行作業即可。

在第一次運行時,為了提高同步效率,可以先不創建目標表的索引。在第一此同步完成后,再創建索引。然后在START組件中編輯調度邏輯,再次啟動。

如下圖所示

這里寫圖片描述

運行日志如下圖

這里寫圖片描述

這樣,一個使用時間戳增量回滾同步數據的作業就完成了,如果對這個job有什么建議,可以給我發郵件 zhaodongxx@outlook.com 或者提issue。
案例下載地址

awesome-kettle 是我建立的一個倉庫,用來索引和存放與kettle相關的資料,歡迎大家加星關注 和 push

    01_使用Kettle實現數據實時增量同步

參考資料

    Kettle中ETL的效率優化 簡書
    Kettle手冊(九)- 發送郵件
————————————————
版權聲明:本文為CSDN博主「zhaodongxx」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/dora_310/java/article/details/80511793


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM