關於數據同步的幾種實現
概述
關於數據同步主要有兩個層面的同步,一是通過后台程序編碼實現數據同步,二是直接作用於數據庫,在數據庫層面實現數據的同步。通過程序編碼實現數據同步,其主要的實現思路很容易理解,即有就更新,無則新增,其他情況日志記錄,就不做過多的介紹,這里主要講述的是第二個層面的數據同步,即在數據庫層面實現數據同步。
數據庫層面的數據庫同步主要有三種方式:通過發布/訂閱的方式實現同步,通過SQL JOB方式實現數據同步,通過Service Broker 消息隊列的方式實現數據同步。
下面分別就這三種數據同步方式,一一詳解。
1. 通過發布/訂閱的方式實現同步
發布/訂閱是Sql Server自帶的一種數據庫備份的機制,通過該機制可以快速的實現數據的備份同步,不用編寫任何的代碼。
此種數據同步的方式存在的以下的一些問題:
- 表結構不能更改,同步雙方的表結構必須一致,一旦表結構發生更改需要重新生成數據庫快照。
- 對於大數據量的同步沒有可靠的保證。
- 網絡不穩定的情況下同步也不能保證。
總的來說,這種數據備份同步的方式,在表結構一致、數據量不是特別大的情況下還是非常高效的一種同步方式。
網上有很多的關於如何使用發布/訂閱的方式實現數據同步的操作示例,這里就不再重復的演示了,有興趣想要了解的朋友可以參考下面這篇文章:
http://kb.cnblogs.com/page/103975/
2. 通過SQL JOB方式實現數據同步
通過Sql Job定時作業的方式實現同步其基本原理就是通過目標服務器和源服務器的連接,然后通過編寫Sql語句,從源服務器中讀取數據,再更新到目標服務器。
這種數據同步的方式比較靈活。創建過sql定時作業之后,主要需要執行以下關鍵的兩步。
2.1 創建數據庫連接(一般作為定時作業執行的第一步)
不同數據庫之間的連接可以通過系統的存儲過程實現。下面就直接用一個示例來講一下如何創建數據庫連接。
--添加一個連接
--系統存儲過程sp_addlinkedserver 參數:
----------------------1:目標服務器的IP或別名,本例中為:'WIN-S1PO3UA6J7I';----------------------2:'' (srvproduct,默認);
----------------------3:'SQLOLEDB'(provider,默認值);
----------------------4:目標服務器的IP或別名(datasrc),本例中為:'WIN-S1PO3UA6J7I'
exec sp_addlinkedserver 'WIN-S1PO3UA6J7I','','SQLOLEDB','WIN-S1PO3UA6J7I'
--添加登錄用戶連接
--系統存儲過程sp_addlinkedsrvlogin 參數:
----------------------1:目標服務器的IP或別名,本例中為:'WIN-S1PO3UA6J7I';
----------------------2:'false',默認值;
----------------------3:null,默認值;
----------------------4:'sa',登錄用戶名;
----------------------5:'pass@word1',登錄密碼;
exec sp_addlinkedsrvlogin 'WIN-S1PO3UA6J7I','false',null,'sa','pass@word1'
創建數據庫連接主要用到了以上的兩個存儲過程,但是在實際操作的過程中可能會遇到“仍有對服務器XXX的遠程登錄或連接登錄問題”這樣的問題,如果遇到此類問題,在執行上邊的添加連接和登錄用戶連接之前還需要先刪除某個已存在的鏈接,具體如下:
--系統存儲過程sp_droplinkedsrvlogin 參數:
----------------------1:目標服務器的IP或別名,本例中為:'WIN-S1PO3UA6J7I';----------------------2:null
exec sp_droplinkedsrvlogin 'WIN-S1PO3UA6J7I',null
--系統存儲過程sp_dropserver 參數:
----------------------1:目標服務器的IP或別名,本例中為:'WIN-S1PO3UA6J7I'
exec sp_dropserver 'WIN-S1PO3UA6J7I'
2.2 使用SQL 語句 實現數據同步
主要的同步思路:
1:在目標數據庫中先清空要同步的表的數據
2:使用insert into Table (Cloumn....) select Column..... from 服務器別名或IP.目標數據庫名.dbo.TableName 的語法將數據從源數據庫讀取並插入到目標數據庫
Truncate table Org_DepartmentsExt –刪除現有系統中已存在的部門表
insert into Org_DepartmentsExt –從名為WIN-S1PO3UA6J7I的服務器上的DBFrom數據庫上獲取源數據,並同步到目標數據庫中
(
[DeptID]
,[DeptStatus]
,[DeptTel]
,[DeptBrief]
,[DeptFunctions]
)
SELECT [DeptID]
,[DeptStatus]
,[DeptTel]
,[DeptBrief]
,[DeptFunctions]
FROM [WIN-S1PO3UA6J7I].[DBFrom].[dbo].[Org_DepartmentsExt]
以上這兩步便是通過SQL Job實現數據同步的關鍵步驟,在完成以上兩步之后,如果沒有其他的表要進行同步,則可創建同步計划以完善定時作業。帶作業創建完后,便可以執行。
這里主要只是演示了通過Sql Job方式實現數據同步的關鍵步驟。網上有很多具體的實例演示。有興趣的朋友可以參考以下文章進行練習檢驗:
http://www.cnblogs.com/tyb1222/archive/2011/05/27/2060075.html
3. 通過SQL Server Service Broker 消息隊列的方式實現數據同步
3.1 SQL Server Service Broker概述
SQL Server Service Broker 是數據庫引擎的組成部分,為 SQL Server 提供隊列和可靠的消息傳遞。既可用於使用單個 SQL Server 實例的應用程序,也可用於在多個實例間分發工作的應用程序。
在單個 SQL Server 實例內,Service Broker 提供了一個功能強大的異步編程模型。數據庫應用程序通常使用異步編程來縮短交互式響應時間,並增加應用程序總吞吐量。
在多個SQL Server實例之間Service Broker 還可以提供可靠的消息傳遞服務。Service Broker 可幫助開發人員通過稱為服務的獨立、自包含的組件來編寫應用程序。需要使用這些服務中所包含功能的應用程序可以使用消息來與這些服務進行交互。Service Broker 使用 TCP/IP 在實例間交換消息。Service Broker 中所包含的功能有助於防止未經授權的網絡訪問,並可以對通過網絡發送的消息進行加密。
3.2 具體的實現演示
在這一小節里,主要是通過一個完整的數據同步的流程向大家演示,如何實現同一個數據庫實例不同數據庫的數據同步。關於不同的數據庫實例間的數據庫的數據同步整體上跟同一個實例的數據庫同步是一樣的,只不過在不同的數據庫實例間同步時還需啟用傳輸安全、對話安全,創建路由、遠程服務綁定等額外的操作。
這里邊用到了大量的SQL Server XML的東西,如果有不理解的地方可以參考以下鏈接:http://www.cnblogs.com/Olive116/p/3355840.html
這是我在做技術准備時,自己的一點學習記錄。
下面就是具體的實現步驟:
3.2.1為數據庫啟動Service Broker活動
這一步主要是用來對要進行數據同步的數據啟用Service Broker 活動,並且授信。
USE master GO --如果數據庫DBFrom、DBTo不存在,則創建相應的數據庫 IF NOT EXISTS (SELECT name FROM sys.databases WHERE name ='DBFrom') CREATE DATABASE DBFrom GO IF NOT EXISTS (SELECT name FROM sys.databases WHERE name ='DBTo') CREATE DATABASE DBTo GO --分別為該數據庫啟用Service Broker活動並且授權信任 ALTER DATABASE DBFrom SET ENABLE_BROKER GO ALTER DATABASE DBFrom SET TRUSTWORTHY ON GO ALTER AUTHORIZATION ON DATABASE::DBFrom To sa GO ALTER DATABASE DBTo SET ENABLE_BROKER GO ALTER DATABASE DBTo SET TRUSTWORTHY ON GO ALTER AUTHORIZATION ON DATABASE::DBTo TO sa GO
3.2.2 創建數據庫主密匙
這一步主要用來創建數據庫主密匙,上邊有提到Service Broker可以對要發送的消息進行加密。
Use DBFrom go create master key encryption by password='pass@word1' go Use DBTo go create master key encryption by password='pass@word1' go
3.2.3 創建消息類型、協定
這里主要用來創建消息類型和消息協定,源數據庫和目標數據庫的消息類型和協定都要一致。
Use DBFrom go --數據同步—消息類型 create message type [http://oa.founder.com/Data/Sync] validation=well_formed_xml go --數據同步--錯誤反饋消息類型 create message type [http://oa.founder.com/Data/Sync/Error] validation=well_formed_xml go --數據同步協議 create contract[http://oa.founder.com/Data/SyncContract] ( [http://oa.founder.com/Data/Sync] sent by initiator, [http://oa.founder.com/Data/Sync/Error] sent by target ) go Use DBTo go --數據同步—消息類型 create message type [http://oa.founder.com/Data/Sync] validation=well_formed_xml go --數據同步--錯誤反饋消息類型 create message type [http://oa.founder.com/Data/Sync/Error] validation=well_formed_xml go --數據同步協議 create contract[http://oa.founder.com/Data/SyncContract] ( [http://oa.founder.com/Data/Sync] sent by initiator, [http://oa.founder.com/Data/Sync/Error] sent by target ) Go
創建過之后效果如下圖:
3.2.4 創建消息隊列
這里主要用來創建消息隊列,源數據庫和目標數據庫都要創建,隊列名字可以自主命名。
use DBFrom go create queue [DBFrom_DataSyncQueue] with status=on go use DBTo go create queue [DBFrom_DataSyncQueue] with status=on go
創建之后效果如下圖:
3.2.5 創建數據同步服務
這里我們通過利用上邊創建的消息協定和消息隊列來創建數據同步的服務。
use DBFrom go create service [http://oa.founder.com/DBFrom/Data/SyncService] on queue dbo.[DBFrom_DataSyncQueue]([http://oa.founder.com/Data/SyncContract]) go --數據同步服務 use DBTo go create service [http://oa.founder.com/DBTo/Data/SyncService] on queue dbo.[DBFrom_DataSyncQueue]([http://oa.founder.com/Data/SyncContract]) go
創建后效果如下圖:
3.2.6 在源數據庫上創建服務配置列表
這里需要在源數據庫上創建一個服務配置列表,主要用來保存之前創建過的服務名稱,本例只是用來演示,所以只創建了一個服務,只能是同步一個數據表,如果有多個數據表需要同步,則需創建多個服務,所以這里創建一個服務配置列表,用來存儲多個服務的服務名稱。
需要注意的是,下面的腳本在執行完創建表的操作之后又插入了一條數據,也就是上邊我們創建的服務名,如果有多個服務的話,依次插入該表即可。
use DBFrom go --同步數據--目標服務配置 create table SyncDataFarServices ( ServiceID uniqueidentifier, ServiceName nvarchar(256) ) go --將上邊創建的服務名,插入此表中 insert into SyncDataFarServices (ServiceID,ServiceName) values (NEWID(),'http://oa.founder.com/DBTo/Data/SyncService') go
效果如下圖:


3.2.7 發送數據同步消息
這里創建了一個存儲過程主要用來發送同步消息,該消息內容主要包括操作類型、主鍵、表名、正文內容,分別對應@DMLType,@PrimaryKeyField,@TableName,@XMLData。然后通過創建一個游標來條的讀取上邊創建的服務列表中的列表信息,向不同的服務發送消息。
Use DBFrom
go
--發送同步數據消息
Create procedure UP_SyncDataSendMsg
(
@PrimaryKeyField nvarchar(128),
@TableName nvarchar(128),
@DMLType char(1),
@XMLData xml
)
as
begin
SET @XMLData.modify('insert <DMLType>{sql:variable("@DMLType")}</DMLType> as first into /');
SET @XMLData.modify('insert <PrimaryKeyField>{sql:variable("@PrimaryKeyField")}</PrimaryKeyField> as first into /');
SET @XMLData.modify('insert <Table>{sql:variable("@TableName")}</Table> as first into /');
DECLARE FarServices CURSOR FOR SELECT ServiceName FROM SyncDataFarServices;
open FarServices
declare @FarServiceName nvarchar(256);
fetch FarServices into @FarServiceName;
while @@FETCH_STATUS=0
begin
begin Transaction
declare @Conv_Handler uniqueidentifier
begin DIALOG conversation @Conv_Handler --開始一個會話
from service [http://oa.founder.com/DBFrom/Data/SyncService]
to service @FarServiceName
on contract [http://oa.founder.com/Data/SyncContract];
send on conversation @Conv_Handler
Message type [http://oa.founder.com/Data/Sync](@XMLData);
fetch FarServices into @FarServiceName;
commit;
end
close FarServices;
deallocate FarServices;
end
go
3.2.8 創建數據同步異常信息記錄表
這里創建該表主要用來記錄在數據同步過程中出現的異常信息。
use DBFrom go create Table dbo.SyncException ( ErrorID uniqueidentifier, ConversationHandleID uniqueidentifier, ErrorNumber int, ErrorSeverity int, ErrorState int, ErrorProcedure nvarchar(126), ErrorLine int, ErrorMessage nvarchar(2048), MessageContent nvarchar(max), CreateDate DateTime ) go --修改異常信息記錄表 alter table dbo.SyncException add PrimaryKeyField nvarchar(128), TableName nvarchar(128), DMLType char(1), DBName nvarchar(128) Go
效果如下圖:
3.2.9 數據同步反饋
這里主要用來在源數據庫中接收隊列中的消息,將同時出錯的信息,解析一下,然后插入到異常信息記錄表里邊。
--數據同步回饋
use DBFrom
go
create procedure UP_SyncDataFeedback
as
begin
set nocount on
--會話變量聲明
declare @ConversationHandle uniqueidentifier;--會話句柄
declare @Msg_Body nvarchar(max);
declare @Msg_Type_Name sysname;
--變量賦值
while(1=1)
begin
begin transaction
--從隊列中接收消息
waitfor
(
receive top(1)
@Msg_Type_Name=message_type_name,
@ConversationHandle=[conversation_handle],
@Msg_Body=message_body
from dbo.[DBFrom_DataSyncQueue]
),timeout 1000
--如果接收到消息處理,否則跳過
if(@@ROWCOUNT<=0)
break;
if @Msg_Type_Name='http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
end conversation @ConversationHandle;
else if @Msg_Type_Name='http://oa.founder.com/Data/Sync/Error'
begin
declare @DataSource xml;
set @DataSource=Convert(xml,@Msg_Body);
insert into dbo.SyncException(ErrorID,ConversationHandleID,ErrorNumber,ErrorSeverity,ErrorState,ErrorProcedure,ErrorLine,ErrorMessage,
PrimaryKeyField,TableName,DMLType,MessageContent,DBName,CreateDate)
select
NEWID(),@ConversationHandle,
T.c.value('./@ErrNumber','INT'),
T.c.value('./@ErrSeverity','INT'),
T.c.value('./@ErrState','INT'),
T.c.value('./@ErrProcedure','Nvarchar(126)'),
T.c.value('./@ErrLine','INT'),
T.c.value('./@ErrMessage','nvarchar(2048)'),
T.c.value('./@PrimaryKeyField','nvarchar(128)'),
T.c.value('./@TableName','nvarchar(128)'),
T.c.value('./@DMLType','char(1)'),
T.c.value('./@MessageContent','nvarchar(max)'),
T.c.value('./@DBName','nvarchar(128)'),
GETDATE()
from @DataSource.nodes('/row') as T(c);
end
else if @Msg_Type_Name='http://schemas.microsoft.com/SQL/ServiceBroker/Error'
end conversation @ConversationHandle;
commit Transaction;
end
end
commit;
go
3.2.10對Service Broker隊列使用內部激活,並指定將調用的存儲過程
這里主要用來激活源數據庫的消息隊列,並為其指定調用的存儲過程,即上邊3.2.9 中創建的存儲過程。
--對Service Broker隊列使用內部激活,並指定將調用的存儲過程 use DBFrom go alter queue dbo.DBFrom_DataSyncQueue with activation ( status=on, max_queue_Readers=1, procedure_name=UP_SyncDataFeedback, execute as owner ); Go
3.2.11 在源數據庫中為需要同步的數據表創建觸發器
這里就以用戶表為例,具體操作如下,這里通過查詢系統的Inserted和Deleted臨時表來判斷執行同步的操作類型是更新(U)、新增(A)還是刪除(D),最后調用3.2.7 中創建的存儲過程來對數據進行處理並發送。
use DBFrom
Go
--用戶信息同步
Create Trigger UT_DataSync_Users
on dbo.Org_Users
after insert,update,delete
as
set nocount on ;
--變量聲明
declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@DMLType char(1);
declare @InsertCount int ,@DeleteCount int ;
declare @XMLData xml;
--變量賦值
set @PrimaryKeyField='ID' --組合主鍵,多個主鍵使用","隔開
set @TableName='Org_Users'
set @InsertCount=(select COUNT(*) from inserted)
set @DeleteCount=(select COUNT(*) from deleted)
if @InsertCount=@DeleteCount and @InsertCount<>0 ----Update
begin
select @XMLData=(select * from inserted For xml raw,binary base64,ELEMENTS XSINIL);
set @DMLType='U';
end
else if(@InsertCount<>0 and @DeleteCount=0) ----Insert
begin
select @XMLData=(select * from inserted for xml raw ,Binary base64,ELEMENTS XSINIL)
set @DMLType='A';
end
else----Delete
begin
select @XMLData=(select *from deleted for xml raw,binary base64,ELEMENTS XSINIL)
set @DMLType='D';
end
if(@XMLData is not null)
begin
exec UP_SyncDataSendMsg @PrimaryKeyField,@TableName,@DMLType,@XMLData;
end
go
3.2.12 目標數據庫中創建,字符分割函數
該函數主要是用來進行字符分割,用來處理主鍵有多個字段的情況。
--目標數據庫
use DBTo
go
--轉換用‘,'分割的字符串@str
create Function dbo.uf_SplitString
(
@str nvarchar(max),
@Separator nchar(1)=','
)
returns nvarchar(2000)
as
begin
declare @Fields xml;--結果字段列表
declare @Num int;-----記錄循環次數
declare @Pos int;-----記錄開始搜索位置
declare @NextPos int;--搜索位置臨時變量
declare @FieldValue nvarchar(256);--搜索結果
set @Num=0;
set @Pos=1;
set @Fields=CONVERT(xml,'<Fields></Fields>');
while (@Pos<=LEN(@Str))
begin
select @NextPos=CHARINDEX(@Separator,@Str,@Pos)
if(@NextPos=0 OR @NextPos is null)
select @NextPos=LEN(@Str)+1;
select @FieldValue=RTRIM(ltrim(substring(@Str,@Pos,@NextPos-@Pos)))
select @Pos=@NextPos+1
set @Num=@Num+1;
if @FieldValue<> ''
begin
set @Fields.modify('insert <Field>{sql:variable("@FieldValue")}</Field> as last into /Fields[1]');
end
end
return Convert(nvarchar(2000),@Fields);
end
go
3.2.13 將解析過的消息信息,根據操作類型的不同同步到數據表中
這是所有的數據同步中最關鍵也是最復雜的一步了,在整個開發的過程中,大部分時間都花在這上邊了,具體的操作都在下面解釋的很清楚了。
--將XML數據源中的數據同步到數據表中(包括增刪改)
Use DBTo
go
create function dbo.UF_XMLDataSourceToSQL
(
@DataSource XML,--數據源
@TableName varchar(128),--同步數據表名稱
@PrimaryKeyField varchar(128),--需要同步的表的主鍵,主鍵為多個時用‘,'隔開
@DMLType char(1) --A:新建;U:編輯;D:刪除
)
returns nvarchar(4000)
as
begin
--變量聲明及數據初始化
--聲明數據表@TableName列Column相關信息變量
declare @ColumnName nvarchar(128),@DataType nvarchar(128),@MaxLength int;
--聲明用於拼接SQL的變量
declare @FieldsList nvarchar(4000),@QueryStatement nvarchar(4000);
declare @Sql nvarchar(4000);
declare @StrLength int;
--變量初始化
set @FieldsList=' ';--初始化變量不為null,否則對變量使用'+='操作符無效
set @QueryStatement=' ';
--主鍵信息,根據參數求解如:<Fields><Field>ID1</Field><Field>ID2</Field></Fields>
declare @PKs xml;
--當前字段是否主鍵-在‘更新’,‘刪除’同步數據時使用
declare @IsPK nvarchar(128);
--初始化游標--游標內容包括目標數據表TableName列信息
DECLARE ColumnNameList CURSOR FOR SELECT COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@TableName AND
DATA_TYPE<>'xml';
--數據處理
if @DMLType='A'--插入數據
begin
open ColumnNameList
fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
while @@FETCH_STATUS=0
begin
--判斷數據源列中是否存在屬性:@ColumnName
--判斷數據源列中是否存在--元素:@ColumnName
If @DataSource.exist('/row/*[local-name()=sql:variable("@ColumnName")]')=1
begin
--拼接SQL
set @FieldsList+=(@ColumnName+',');
set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素讀取(包含空值情況)
if @MaxLength is not null and @MaxLength<>-1
begin
set @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';
end
else if @MaxLength=-1 and @DataType<>'xml'--已調整
begin
set @QueryStatement+='(MAX)';
end
set @QueryStatement+=(''') as '+@ColumnName+',');
end
fetch ColumnNameList into @ColumnName,@DataType,@MaxLength
end
close ColumnNameList;
deallocate ColumnNameList;
set @StrLength=LEN(@FieldsList);
--去掉@FieldsList結尾的’,'
set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);
set @StrLength=LEN(@QueryStatement);
--去掉@QueryStatement結尾的’,'
set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
set @Sql=N'insert into '+@TableName+'('+@FieldsList+') select '+@QueryStatement+' from @DataSource.nodes(''/row/'') as T(c)';
end
else if @DMLType='U'--更新數據
begin
--更新語句where 后的條件表達式
declare @Condition nvarchar(1000);
set @Condition=' ';
set @PKs=CONVERT(xml,dbo.uf_SplitString(@PrimaryKeyField,','));
Open ColumnNameList
fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
while @@FETCH_STATUS=0
begin
--判斷數據源列中是否存在元素:@ColumnName
if @DataSource.exist('/row/*[local-name()=sql:variable("@ColumnName")]')=1
begin
set @IsPK=null;
SELECT @IsPk=Fs.F FROM (SELECT T.c.value('.[text()]','Nvarchar(128)') AS F FROM @PKs.nodes('/Fields/Field') AS T(c))Fs Where Fs.F=@ColumnName
if @IsPK is null or @IsPK=''
begin
--非主鍵,更新字段值
set @FieldsList+=(@ColumnName+'=Source.'+@ColumnName+',');
end
else
begin
--主鍵,作為要更新條件
set @Condition+=@TableName+'.'+@ColumnName+'=Source.'+@ColumnName+' And ';
end
--XML查詢
set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素讀取(包含空值情況)
if @MaxLength is not null and @MaxLength<>-1
begin
set @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';
end
else if @MaxLength=-1 and @DataType<>'xml'
begin
set @QueryStatement+='(max)';
end
set @QueryStatement+=(''') as '+@ColumnName+',');
end
fetch ColumnNameList Into @ColumnName,@DataType,@MaxLength
end
close ColumnNameList;
Deallocate ColumnNameList;
--去掉@FieldsList結尾的','
set @StrLength=LEN(@FieldsList);
set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);
--去掉@QueryStatement結尾的','
set @StrLength=LEN(@QueryStatement);
set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
--去掉@Condition結尾的‘and'
set @StrLength=LEN(rtrim(@Condition));
set @Condition=SUBSTRING(rtrim(@Condition),1,@StrLength-3);
set @Sql=N'USE DBTo ; update '+@TableName+' set '+@FieldsList+' from (select '+@QueryStatement+'
from @DataSource.nodes(''/row'') as T(c)) Source where '+@Condition;
end
else if @DMLType='D' --刪除數據
begin
--更新語句where后的條件表達式
declare @LinkField nvarchar(1000);
set @LinkField=' ';
set @PKs=CONVERT(xml,dbo.uf_SplistString(@PrimaryKeyField,','));
open ColumnNameList
fetch ColumnNameList into @ColumnName,@DataType,@MaxLength;
while @@FETCH_STATUS=0
begin
if @DataSource.exist('row/*[local-name()=sql:variable("@ColumnName")]')=1
begin
set @IsPK=null;--初始化
--當前字段是否為主鍵
select @IsPK=Fs.F from (select T.c.value('.[text()]','nvarchar(128)') as F from @PKs.nodes('/Fields/Field') as T(c))Fs where Fs.F=@ColumnName
--主鍵
if @IsPK is not null and @IsPK<>''
begin
--主鍵刪除條件
set @LinkField+='Target.'+@ColumnName+'=Source.'+@ColumnName+' And ';
--XML 查詢
set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素讀取(包含空值情況)
if(@MaxLength is not null and @MaxLength<>-1)
begin
set @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';
end
else if @MaxLength=-1 and @DataType<>'xml'
begin
set @QueryStatement+='(max)';
end
set @QueryStatement+=(''') as '+@ColumnName+',');
end
end
fetch ColumnNameList into @ColumnName,@DataType,@MaxLength
end
close ColumnNameList;
deallocate ColumnNameList;
--去除@QueryStateMent結尾的','
set @StrLength=LEN(@QueryStatement);
set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);
--去除@LinkField 結尾的’Add‘
set @StrLength=LEN(rtrim(@LinkField));
set @LinkField=SUBSTRING(rtrim(@LinkField),1,@StrLength-3);
set @Sql=N'Delete from '+@TableName+' from '+@TableName+' as Target inner join (select '+@QueryStatement+ ' from @DataSource.nodes(''/row'') as T(c))
Source on '+@LinkField;
end
Return @Sql--'hello'
end
go
3.2.14 解析並處理從隊列中讀取的消息
這里主要用來讀取隊列中的消息,並將消息進行處理,最終處理成一定的格式,並調用3.2.13中的存儲過程,將數據同步到數據庫中。
--將數據同步到數據表中
create procedure UP_SyncDataToTable
as
begin
set nocount on
--會話變量聲明
declare @ConversationHandle uniqueidentifier;--會話句柄
declare @Msg_Body nvarchar(max);
declare @Msg_Type_Name sysname;
declare @ErrorNumber int ;
--變量賦值
while(1=1)
begin
begin transaction
--從隊列中接收消息
waitfor
(
receive top(1)
@Msg_Type_Name=message_type_name,
@ConversationHandle=[conversation_handle],
@Msg_Body=message_body
-- from dbo.[DBTo_DataSyncQueue]
from dbo.[DBFrom_DataSyncQueue]
),timeout 500
--如果接收到消息-處理,否則跳過
if @@ROWCOUNT<=0
begin
break;
end
if @Msg_Type_Name='http://oa.founder.com/Data/Sync'
begin
--聲明變量
declare @DMLType char(1);
declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@Sql nvarchar(4000);
declare @DataSource xml
--受影響的行數
declare @EffectRowCount int;
declare @ErrMsg xml;
begin try
--變量賦值
set @DataSource=convert(xml,@Msg_Body);--數據源
set @PrimaryKeyField=@DataSource.value('(/PrimaryKeyField)[1][text()]','nvarchar(128)');--主鍵列表
set @TableName=@DataSource.value('(/Table)[1][text()]','nvarchar(128)');--操作數據表
set @DMLType=@DataSource.value('/DMLType[1][text()]','char(1)');--操作類型
set @Sql=dbo.UF_XMLDataSourceToSQL(@DataSource,@TableName,@PrimaryKeyField,@DMLType);
exec sp_executesql @Sql,
N'@DataSource XML',
@DataSource;
end try
begin catch
declare @DBName nvarchar(128)
select @DBName=Name from master..SysDataBases where dbid=(select dbid from master..sysprocesses where spid=@@SPID)
set @ErrorNumber=ERROR_NUMBER();
set @ErrMsg=(select ERROR_NUMBER() as ErrNumber,
ERROR_SEVERITY() as ErrSeverity,
ERROR_STATE() as ErrState,
ERROR_PROCEDURE() as ErrProcedure,
ERROR_LINE() as ErrLine,
ERROR_MESSAGE() as ErrMessage,
@PrimaryKeyField as PrimaryKeyField,
@TableName as TableName,
@DMLType as DMLType,
@Msg_Body as MessageContent,
@DBName as DBName
for XML raw);
--GOTO 錯誤處理標簽
goto Err_Handle;
end catch
--結束會話
End Conversation @ConversationHandle
if @ErrorNumber is not null
begin
--錯誤處理區域
Err_Handle:
if @ErrMsg is not null
begin
declare @test nvarchar(128);
--發送失敗消息
send on conversation @ConversationHandle
message type [http://oa.founder.com/Data/Sync/Error](@ErrMsg)
end
--結束會話
end conversation @ConversationHandle
--break;
--回滾--不可回滾,否則將無法發送失敗消息
--GoTO Err_Lab;
end
end
commit transaction
end
end
go
3.2.15 對目標數據庫的消息隊列進行內部激活
這里主要是用來激活目標數據庫的消息隊列,主要用來實現數據的同步以及同步出錯的錯誤信息的反饋。
--對Service Broker隊列使用內部激活,並指定將要調用的存儲過程 use DBTo go --alter Queue dbo.[DBTo_DataSyncQueue] with activation alter Queue dbo.[DBFrom_DataSyncQueue] with activation ( status=on, max_queue_readers=1, Procedure_name=UP_SyncDataToTable, Execute as self ) Go
完成以上這些步驟以后,就可以實現同一數據庫實例上兩個不同的數據庫之間的數據同步。即如果DBFrom數據庫中的Org_Users中的某一條信息發生變化,會馬上的自動同步到DBTo數據庫中的Org_Users 表。如果是想要實現不同的數據庫實例間的數據庫的表的同步,則可以參考以下鏈接:
http://www.cnblogs.com/downmoon/archive/2011/05/05/2037830.html
在創建啟用傳輸安全、對話安全,創建路由、遠程服務綁定等額外的操作之后,剩下的操作跟在同一數據庫實例中的操作是一樣的。
此外,本文還參考了如下的鏈接:
http://www.cnblogs.com/downmoon/archive/2011/04/05/2005900.html
希望可以給大家一些啟發和幫助。具體的源碼有興趣的朋友可以留下郵箱。
