orcale增量全量實時同步mysql可支持多庫使用Kettle實現數據實時增量同步


 

1. 時間戳增量回滾同步

假定在源數據表中有一個字段會記錄數據的新增或修改時間,可以通過它對數據在時間維度上進行排序。通過中間表記錄每次更新的時間戳,在下一個同步周期時,通過這個時間戳同步該時間戳以后的增量數據。這是時間戳增量同步。

但是時間戳增量同步不能對源數據庫中歷史數據的刪除操作進行同步,我就使用orcale物化視圖的方式進行刪除更新操作

說明:

  • 源數據表 需要被同步的數據表
  • 目標數據表 同步至的數據表
  • 中間表 存儲時間戳的表

2. 前期准備

在兩個數據庫中分別創建數據表,並通過腳本在源數據表中插入500萬條數據,完成后再以每秒一條的速度插入新數據,模擬生產環境。

源數據表結構如下:

CREATE TABLE "OIM"."YG_TQ_FD_BASICINFO" 
   (    "ID" NUMBER(10,0) NOT NULL ENABLE, 
    "DECLAREDATE" VARCHAR2(16), 
    "UPDATEDATE" VARCHAR2(16), 
    "SECODE" VARCHAR2(40) NOT NULL ENABLE, 
    "FRONTSYMBOL" VARCHAR2(40), 
    "BACKSYMBOL" VARCHAR2(40), 
    "SYMBOLCOMP" VARCHAR2(40), 
    "SNAMECOMP" VARCHAR2(100), 
    "ENABLED" VARCHAR2(20) NOT NULL ENABLE, 
    "FDSNAME" VARCHAR2(100) NOT NULL ENABLE, 
    "FDTYPE" VARCHAR2(100), 
    "FDNAME" VARCHAR2(200), 
    "FDTOTUNIT" NUMBER(16,4), 
    "FDNATURE" VARCHAR2(40), 
    "INVESTSTYLE" VARCHAR2(40), 
    "INVESTGOAL" CLOB, 
    "INVRULE" CLOB, 
    "FDSTYLE" VARCHAR2(20), 
    "FOUNDDATE" VARCHAR2(16), 
    "LISTDATE" VARCHAR2(16), 
    "ENDDATE" VARCHAR2(16), 
    "KEEPERCODE" VARCHAR2(16), 
    "KEEPERNAME" VARCHAR2(200), 
    "TRUSTEECODE" VARCHAR2(16), 
    "TRUSTEENAME" VARCHAR2(200), 
    "MANAGERNAME" VARCHAR2(400), 
    "DECISIONRULE" CLOB, 
    "DECISIONPROC" CLOB, 
    "DISTRIBUTPRIN" CLOB, 
    "INVESTRANGE" CLOB, 
    "INVESTPOLICY" CLOB, 
    "INVESTSTD" CLOB, 
    "RISKTYPE" CLOB, 
    "RISKINDEX" CLOB, 
    "FDINTRO" CLOB, 
    "FDEVOLUTION" CLOB, 
    "DISCLOSUREPEOPLE" VARCHAR2(60), 
    "DISCLOSUREPHONE" VARCHAR2(100), 
    "EXISTBEGDATE" VARCHAR2(16), 
    "EXISTENDDATE" VARCHAR2(16), 
    "PRIEXISTENDDATE" VARCHAR2(16), 
    "EXSITPERIOD" NUMBER(16,2), 
    "EXCHANGE" VARCHAR2(20) NOT NULL ENABLE, 
    "FDSERIESCODE" VARCHAR2(12), 
    "FDSERIESNAME" VARCHAR2(200), 
    "TRASTYPE" VARCHAR2(20), 
    "EMONOVERDATE" VARCHAR2(20), 
    "SUBCONFDATE" VARCHAR2(20), 
    "REDCONFDATE" VARCHAR2(20), 
    "REDPAYDATE" VARCHAR2(20), 
    "OUTSUBBEGDATE" VARCHAR2(16), 
    "OUTSUBENDDATE" VARCHAR2(16), 
    "INSUBBEGDATE" VARCHAR2(16), 
    "INREDENDDATE" VARCHAR2(16), 
    "FDINVCATEGORY" VARCHAR2(20), 
    "FDMETHOD" VARCHAR2(20) NOT NULL ENABLE, 
    "MEMO" CLOB, 
    "TOTSHARE" NUMBER(18,6), 
    "ISSHARESTAT" NUMBER(10,0) NOT NULL ENABLE, 
    "ISSTAT" NUMBER(10,0) NOT NULL ENABLE, 
    "ISVALID" NUMBER(10,0) NOT NULL ENABLE, 
    "TMSTAMP" RAW(8) NOT NULL ENABLE, 
    "ENTRYDATE" DATE, 
    "ENTRYTIME" VARCHAR2(16), 
    "TRADPLACE" VARCHAR2(20), 
    "SECURITYID" VARCHAR2(40) NOT NULL ENABLE, 
    "NAVPUBTYPE" VARCHAR2(20), 
    "PROFITPAYTYPE" VARCHAR2(20), 
    "OPERATEPERIOD" NUMBER(10,0), 
    "OPERATEPERIODUNIT" VARCHAR2(20), 
    "FSYMBOL" VARCHAR2(40) NOT NULL ENABLE, 
    "KEEPERSNAME" VARCHAR2(100) NOT NULL ENABLE, 
    "KEEPERINITIALS" VARCHAR2(20) NOT NULL ENABLE, 
    "LIQUBEGINDATE" VARCHAR2(16), 
    "LIQUENDDATE" VARCHAR2(16), 
    "CREDITLINEID" NUMBER(10,0), 
     CONSTRAINT "PK_YG_TQ_FD_BASICINFO" PRIMARY KEY ("ID")
  USING INDEX PCTFREE 10 INITRANS 2 MAXTRANS 255 COMPUTE STATISTICS 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "OIMTS"  ENABLE
   ) SEGMENT CREATION IMMEDIATE 
  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 
 NOCOMPRESS LOGGING
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "OIMTS" 
 LOB ("INVESTGOAL") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVRULE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("DECISIONRULE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("DECISIONPROC") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("DISTRIBUTPRIN") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVESTRANGE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVESTPOLICY") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVESTSTD") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("RISKTYPE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("RISKINDEX") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("FDINTRO") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("FDEVOLUTION") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("MEMO") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT))

3. 作業流程

  1. 開始組件
  2. 建時間戳中間表
  3. 獲取中間表的時間戳,並設置為全局變量
  4. 抽取兩個數據表的時間戳及時間戳以后的數據進行比對,並根據比對結果進行刪除、新增或修改操作
  5. 刪除物化視圖中的需要刪除的數據
  6. 更新時間戳

4. 創建作業

作業的最終截圖如下:

 

 

 

 

4.1 創建作業和DB連接

打開Spoon工具,新建作業,然后在左側主對象樹DB連接中新建DB連接。創建連接並測試通過后可以在左側DB連接下右鍵共享出來。因為在單個作業或者轉換中新建的DB連接都是局域數據源,在其他轉換和作業中是不能使用的,即使屬於同一個作業下的不同轉換,所以需要把他們共享,這樣DB連接就會成為全局數據源,不用多次編輯。

4.2 建時間戳中間表

這一步是為了在目標數據庫建中間表ETL_POSITION,並插入初始的時間戳字段。因為該作業在生產環境是循環調用的,該步驟在每一個同步周期中都會調用,所以在建表時需要判斷該表是否已經存在,如果不存在才建表。

SQL代碼和組件配置截圖如下:

create table ${SCHEMA_NAME}.ETL_POSITION
(
  table_name         VARCHAR2(100),
  last_position_time TIMESTAMP(6),
  last_position    number(30),
  progress_history   VARCHAR2(200),
  current_progress   VARCHAR2(30),
  target_url         VARCHAR2(400), 
  target_table_name  VARCHAR2(100),
  schema_name        VARCHAR2(100)
);
-- Add comments to the table 
comment on table ${SCHEMA_NAME}.ETL_POSITION
  is 'ETL事件唯一標示';
-- Add comments to the columns 
comment on column ${SCHEMA_NAME}.ETL_POSITION.table_name
  is '表的名稱';
comment on column ${SCHEMA_NAME}.ETL_POSITION.last_position_time
  is '最后一次同步的日期';
comment on column ${SCHEMA_NAME}.ETL_POSITION.last_position_time
  is '最后一次同步的位置ID';
comment on column ${SCHEMA_NAME}.ETL_POSITION.progress_history
  is ' UNKNOW, MARK, FULLING, INCING, CLEAR, FAILED, SUCCESS;';
comment on column ${SCHEMA_NAME}.ETL_POSITION.current_progress
  is '當前狀態';
comment on column ${SCHEMA_NAME}.ETL_POSITION.target_url
  is '目標表的URL(為了多數據源復制)';
comment on column ${SCHEMA_NAME}.ETL_POSITION.target_table_name
  is '目標表表名(為了多數據源復制)';
comment on column ${SCHEMA_NAME}.ETL_POSITION.schema_name
  is '表所在空間';
-- Create/Recreate indexes 
create unique index ${SCHEMA_NAME}.ETL_POWEIYI on ${SCHEMA_NAME}.ETL_POSITION (TABLE_NAME, SCHEMA_NAME, TARGET_URL,TARGET_TABLE_NAME);

-- 判斷是否有事件表
select * from all_tables where TABLE_NAME = 'ETL_POSITION' AND OWNER='${SCHEMA_NAME}'

創建物化視圖

CREATE MATERIALIZED VIEW LOG ON ${SCHEMA_NAME}.${TABLE_NAME} with primary key, sequence;

判斷是否有物化視圖

select * from all_mview_logs where master = '${TABLE_NAME}' and log_owner = '${SCHEMA_NAME}'

 

 在作業中設置變量

 

 

 

 

 

 具體看代碼吧

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM