架構設計
特點:
- 支持sql-server / oracle / mysql 等jdbc支持的數據庫之間互導
- 支持數據庫與solr搜索引擎之間互導
- 采用http協議傳送數據,在網絡環境復雜和連接不穩定的情況下能正常工作,也可以擴展成集群、轉發、負載均衡等
- 網絡不穩定、數據庫連接不穩定的情況下,有重連、重試機制
- 復雜的數據處理和異構,自定義Query-SQL和Insert/Delete/Update-SQL
- 分布式事務、數據一致性保護。導入錯誤的情況下,兩邊數據都不會發生更改
- 在工作異常的情況下,可以發送短信或郵件通知
- 可以通過http網頁形式隨時查看工作狀態和cpu 內存使用情況,方便監控
下面假設一個應用場景:
- 在db1上有商品TB_DEMO2_PROD、價格TB_DEMO2_PRICE、庫存TB_DEMO2_STORAGE。總共3張表格
- 在db2上有商品及價格表TB_MY_DEMO2_PROD,庫存TB_MY_DEMO2_STORAGE表2張表格
- 有一個solr服務器,集中了商品、價格、庫存等所有信息
- 當db1中有數據更改時,同步到db2的表中,並從db2同步到solr搜索服務器
- db1到solr的同步延遲控制在5秒以內
- 當同步過程中有任何異常時,即可發送短信
配置步驟
db1上建立測試表格
create table TB_DEMO2_PROD --商品表 ( prod_id VARCHAR2(200) not null, --商品ID prod_code VARCHAR2(200), --商品編號 branchid VARCHAR2(3), --分公司編號 prod_name VARCHAR2(200), --商品名稱 prod_unit VARCHAR2(50) --計量單位 ); alter table TB_DEMO2_PROD add constraint PK_TB_DEMO2_PROD primary key (PROD_ID); --------------- create table TB_DEMO2_PRICE --價格表 ( prod_id VARCHAR2(200) not null, --商品ID price1 NUMBER(20,5), --價格1 price2 NUMBER(20,5), --價格2 price3 NUMBER(20,5) --價格3 ); alter table TB_DEMO2_PRICE add constraint PK_TB_DEMO2_PRICE primary key (PROD_ID); --------------- create table TB_DEMO2_STORAGE --庫存表 ( prod_id VARCHAR2(200) not null, --商品ID amount NUMBER(18) --庫存量 ); alter table TB_DEMO2_STORAGE add constraint PK_TB_DEMO2_STORAGE primary key (PROD_ID);
db2上建立測試表格
create table TB_MY_DEMO2_PROD --商品表 ( prod_id VARCHAR2(200) not null, --商品ID prod_code VARCHAR2(200), --商品編號 branchid VARCHAR2(3), --分公司編號 prod_name VARCHAR2(200), --商品名稱 prod_unit VARCHAR2(50), --計量單位 price1 NUMBER(20,5), --價格1 price2 NUMBER(20,5) --價格2 ); alter table TB_MY_DEMO2_PROD add constraint PK_TB_MY_DEMO2_PROD primary key (PROD_ID); --------------- create table TB_MY_DEMO2_STORAGE --庫存表 ( prod_id VARCHAR2(200) not null, --商品ID amount NUMBER(18) --庫存量 ); alter table TB_MY_DEMO2_STORAGE add constraint PK_TB_MY_DEMO2_STORAGE primary key (PROD_ID);
建立DataX的系統事件表
如果db1上還沒有DX_DATA_EVENT和DX_DATA_EVENT_STAGE表,就用下面的語句來執行建表操作
create table DX_DATA_EVENT_STAGE ( SYNC_NAME VARCHAR2(50) not null, --同步方案名 EVENT_ID NUMBER(22) not null --事件ID ); alter table DX_DATA_EVENT_STAGE add constraint PK_DX_DATA_EVENT_STAGE primary key (SYNC_NAME); create table DX_DATA_EVENT ( EVENT_ID NUMBER(22) not null, --事件ID SYNC_NAME VARCHAR2(50) not null, --同步方案名 ROW_ID VARCHAR2(128), --數據主鍵值 OPT_TYPE VARCHAR2(1) not null, --操作類型(U;D;I;) CREATE_TIME DATE not null --更新時間 ); alter table DX_DATA_EVENT add constraint PK_DX_DATA_EVENT primary key (EVENT_ID); create bitmap index IDX_DX_DATA_EVENT_SYNC_NAME on DX_DATA_EVENT (SYNC_NAME); create sequence SEQ_DX_DATA_EVENT minvalue 1 maxvalue 999999999999999999999999999 start with 1 increment by 1 cache 20;
編寫同步方案的SQL語句
現在我們要開始做同步了,首先明確同步的方法,規定一個同步方案名(SyncName)
這是按照目標服務器的表格數來定義的,比如:J44_demo2Prod, J44_demo2Storage
編寫同步源(source)的查詢語句
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="demoOrder1_source" > <select id="fullQuery" resultType="java.util.HashMap"><![CDATA[ select t.order_code, c.danw_bh, t.modify_time, t.create_time from TB_ORDER_MAIN_PARTITION t inner join TB_CUST_MAIN c on t.cust_id = c.cust_id where t.branch_id = 'J44' ]]></select> <select id="deltaQuery" resultType="java.util.HashMap"> select t.order_id, t.order_code, c.danw_bh, t.modify_time, t.create_time from TB_ORDER_MAIN_PARTITION t inner join TB_CUST_MAIN c on t.cust_id = c.cust_id where t.order_id in <foreach item="item" index="index" collection="list" open="(" separator="," close=")"> #{item} </foreach> </select> </mapper>
編寫同步目標(target)的插入語句
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="demoOrder1_target" > <insert id="insertSum"> <selectKey resultType="java.lang.Long" keyProperty="T1_ID" order="BEFORE" > SELECT SEQ_DX_TABLE1_SUM.NEXTVAL AS id FROM DUAL </selectKey> insert into dx_table1_sum ( t1_id , billid , cust_code , last_modify , create_time ) values ( #{T1_ID, jdbcType=DECIMAL} , #{ORDER_CODE, jdbcType=VARCHAR} , #{DANW_BH, jdbcType=VARCHAR} , #{MODIFY_TIME, jdbcType=DATE} , #{CREATE_TIME, jdbcType=DATE} ) </insert> <update id="updateSum"> update dx_table1_sum set billid = #{BILLID, jdbcType=VARCHAR}, cust_code = #{CUST_CODE, jdbcType=VARCHAR}, last_modify = #{LAST_MODIFY, jdbcType=DATE}, create_time = #{CREATE_TIME, jdbcType=DATE} where t1_id = #{entry.rowId, jdbcType=VARCHAR} </update> <delete id="deleteSum"> delete from dx_table1_sum where t1_id = #{entry.rowId, jdbcType=VARCHAR} </delete> <delete id="clearSum"> delete from dx_table1_sum </delete> <insert id="insertDet"> <selectKey resultType="java.lang.Long" keyProperty="custId" order="BEFORE" > SELECT SEQ_DX_TABLE1_DET.NEXTVAL AS id FROM DUAL </selectKey> ... </insert> </mapper>
在db1上編寫觸發器
----------- 表格 TB_DEMO2_PROD 對應同步方案是 sync_demo2Prod create or replace trigger TRG_DX_TB_DEMO2_PROD after insert or update or delete on DX_TB_DEMO2_PROD for each row begin if inserting then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Prod', :new.prod_id, 'I', sysdate); elsif updating then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Prod', :old.prod_id, 'U', sysdate); elsif deleting then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Prod', :old.prod_id, 'D', sysdate); end if; end TRG_DX_TB_DEMO2_PROD; ----------- 表格 TB_DEMO2_PRICE 對應同步方案是 sync_demo2Prod create or replace trigger TRG_DX_TB_DEMO2_PRICE after insert or update or delete on TB_DEMO2_PRICE for each row begin if inserting then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Prod', :new.prod_id, 'I', sysdate); elsif updating then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Prod', :old.prod_id, 'U', sysdate); elsif deleting then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Prod', :old.prod_id, 'D', sysdate); end if; end TRG_DX_TB_DEMO2_PRICE; ----------- 表格 DX_TB_DEMO2_STORAGE 對應同步方案是 sync_demo2Price create or replace trigger TRG_DX_TB_DEMO2_STORAGE after insert or update or delete on TB_DEMO2_STORAGE for each row begin if inserting then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Price', :new.prod_id, 'I', sysdate); elsif updating then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Price', :old.prod_id, 'U', sysdate); elsif deleting then insert into DX_DATA_EVENT values(SEQ_DX_DATA_EVENT.NEXTVAL, 'sync_demo2Price', :old.prod_id, 'D', sysdate); end if; end TRG_DX_TB_DEMO2_STORAGE;
編寫spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" default-autowire="byName" default-lazy-init="false"> <bean id="syncTarget_J44_demoOrder1" class="com.jzt.datax.core.SyncTargetServiceImpl"> <property name="targetConfig" ref="syncTarget_J44_demoOrder1_config" /> </bean> <bean id="syncSource_J44_demoOrder1" class="com.jzt.datax.core.SyncSourceServiceImpl"> <property name="sourceConfig" ref="syncSource_J44_demoOrder1_config" /> </bean> <bean id="syncTarget_J44_demoOrder1_config" class="com.jzt.datax.core.SyncTargetConfigration"> <property name="syncName" value="J44_demoOrder1" /> <property name="ibatisInsertData" value="demoOrder1_target.insertSum" /> <property name="ibatisUpdateData" value="demoOrder1_target.updateSum" /> <property name="ibatisDeleteData" value="demoOrder1_target.deleteSum" /> <property name="ibatisBeforeFullSyncData" value="demoOrder1_target.clearSum" /> </bean> <bean id="syncSource_J44_demoOrder1_config" class="com.jzt.datax.core.SyncSourceConfigration" > <!-- 名稱(必須唯一) --> <property name="syncName" value="J44_demoOrder1" /> <!-- 調度頻率(cron表達式) --> <property name="tiggerCron" value="0/3 * * * * ?" /> <!-- 事件檢查動作 --> <property name="eventLookup" ref="defaultEventCheck" /> <!-- 全量查詢動作 --> <property name="ibatisFullQuery" value="demoOrder1_source.fullQuery" /> <!-- 增量查詢動作 --> <property name="ibatisDeltaQuery" value="demoOrder1_source.deltaQuery" /> <!-- 查詢結果中的主鍵字段名 --> <property name="identityField" value="ORDER_ID" /> <!-- 同步管道 --> <property name="channel" ref="syncSource_J44_demoOrder1_channel" /> </bean> <!-- 這里定義了一個同步管道,用http協議傳輸數據 --> <bean id="syncSource_J44_demoOrder1_channel" class="com.jzt.datax.core.channel.HttpPostChannel"> <!-- 當上傳數據達到某個閥值時開啟壓縮 -1代表永不壓縮 0代表總是壓縮 --> <property name="zipSize" value="-1" /> <property name="dataTarget" value="http://127.0.0.1:9280/sync/J44_demoOrder1.json" /> </bean> </beans>
測試和調試項目