sqlserver cdc實現數據增量抽取


--創建測試庫
create database test;
--創建配置表
create table test..time_config(tb varchar(20) PRIMARY KEY,enddate binary(10));
--創建業務表
create table test..TB_s (ID INT PRIMARY KEY,NAME VARCHAR(20)); --原表
create table test..TB_t (ID INT PRIMARY KEY,NAME VARCHAR(20),ISDELETED INT); --目標表

--給配置表初始時間
insert into test..time_config
select 'TB_t' as tb,max(start_lsn) as enddate from test.[cdc].[lsn_time_mapping]

--開啟cdc
use TEST
GO
EXEC sys.sp_cdc_enable_db --開啟庫級別cdc
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'TB_s', @role_name = null; --開啟表cdc
GO


--增量實現存儲過程

create proc cdc_tb as
declare 
@time_begin binary(10),
@time_end binary(10)

select @time_begin=max(enddate) from test.dbo.time_config    --上次結束時間,即本次開始時間
select @time_end=max(start_lsn) from test.[cdc].[lsn_time_mapping]    --獲取最大時間,即本次結束時間

--抽取增量數據
select ID,NAME,CASE WHEN __$operation=1 then 1 else 0 end as isdeleted into #tb_import from(
select row_number()over(partition by id order by [__$start_lsn] desc,__$seqval,__$operation desc ) as rn,* 
from test.[cdc].[dbo_TB_s_CT] 
where [__$start_lsn]>@time_begin and [__$start_lsn]<=@time_end) t1 where rn=1;

delete from test..TB_t where exists (select * from #tb_import);
insert into test..TB_t select * from #tb_import;

update test.dbo.time_config set enddate= @time_end where tb='TB_t';--將本次結束時間存入配置表;
declare @count int
select @count=count(1) from #tb_import 
print('更新'+cast(@count as varchar(10))+'條記錄')

--測試添加:
insert into TB_s
select 1,'aaa'
--修改
update TB_s set name='bbb' where id=1
--刪除
delete from TB_s where id=1

select * from TB_s

exec cdc_tb --創建job 定時作業

select * from TB_t

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM