基於SQL Server的簡單數據同步方案


軟件系統中經常需要進行數據同步,如 C/S 程序為了支持離線應用和服務端之間雙向同步數據、數據庫集群中主服務器向從服務器同步數據、業務子系統之間同步共用的數據等。

不同需求下的數據同步方法:

a)在 C/S 客戶端只有數據查詢,而且同步的數據比較少時

1、可以在每次同步時先清空客戶端(目標)數據庫表(truncate(不會記錄刪除日志且不會觸發觸發器)/delete(會記錄行刪除日志))的數據,然后直接從服務器寫入最新的所有數據到客戶端客戶端數據庫表。

優點:程序簡單,不需要區分哪些是新增的數據哪些是修改的數據。

缺點:當數據量很大時,select 全表掃描會產生性能問題。

2、可以在每次同步時先清空客戶端(目標)數據庫表(truncate(不會記錄刪除日志且不會觸發觸發器)/delete(會記錄行刪除日志))的數據,然后把服務器端大量的數據按分頁的方式寫到客戶端數據庫表。 

    public class SynchronizeSqlStr
    {
        public static readonly string TruncateSql = null;
        public static readonly string[] GetRecordCountStr = null;
        public static readonly string[] AllSynchronizeSqlStr = null;        

        static SynchronizeSqlStr()
        {
            TruncateSql = @"TRUNCATE TABLE T_Target;";

            GetRecordCountStr = new string[] {
                "SELECT COUNT(1) AS Co FROM T_Source;",
            };

            AllSynchronizeSqlStr = new string[] //注意:這里每次都要排序,會影響查詢效率,如果改成 ID 范圍會更好!!
            {
                @"INSERT INTO dbo.T_Target(TId, TName, DeptId)
                    SELECT S.TId, S.TName, S.DeptId FROM (SELECT ROW_NUMBER() OVER (ORDER BY TId) AS ROWNUM, TId, TName, DeptId FROM dbo.T_Source) AS S
                    WHERE S.ROWNUM >= {0} AND S.ROWNUM < {1};",
            };            
        }
    }
    public class HelloJob : IJob
    {
        private static readonly NLog.Logger log = NLog.LogManager.GetCurrentClassLogger();
        private static readonly string connStr = ConfigurationManager.AppSettings["ConnectionString"].ToString();
        int pageSize = 1000; // 默認每次同步1000條記錄                

        public virtual Task Execute(IJobExecutionContext context)
        {
            try
            {
                DbHelperSQL.connectionString = DESEncrypt.Decrypt(connStr);

                // 步驟一:清理客戶端的歷史數據(注意:照片表采取增量同步方式,且不刪除歷史記錄)
                DbHelperSQL.ExecuteSqlTran(SynchronizeSqlStr.TruncateSql);

                // 步驟二:依次同步服務器的數據到客戶端
                for (int i = 0; i < SynchronizeSqlStr.AllSynchronizeSqlStr.Length; i++)
                {
                    try
                    {
                        log.Info(string.Format("准備異步執行腳本:" + i.ToString() + "  - {0}", DateTime.Now));

                        Thread thread = new Thread(SynchronizeDataByPager);
                        thread.IsBackground = true;
                        thread.Start(SynchronizeSqlStr.GetRecordCountStr[i] + "~" + SynchronizeSqlStr.AllSynchronizeSqlStr[i]);
                    }
                    catch (Exception e)
                    {
                        log.Error("異步執行腳本:" + i.ToString() + " 時出錯:\r\n" + e.ToString() + " " + DateTime.Now + "\r\n");
                        break;
                    }

                    Thread.Sleep(200);
                }

                log.Info(string.Format("Synchronize 完成! - {0}", DateTime.Now));

                return Task.FromResult(true);
            }
            catch (Exception ex)
            {
                log.Error("執行Execute方法時出錯:" + ex.ToString() + " " + DateTime.Now + "\r\n");
                return Task.FromResult(false);
            }
        }
        
        /// <summary>
        /// 按記錄總條數進行分頁,然后每次同步一頁的數據,防止數據庫操作超時
        /// </summary>
        private void SynchronizeDataByPager(object querySqlAndUpdateSql)
        {
            try
            {
                string sql = Convert.ToString(querySqlAndUpdateSql);
                if (string.IsNullOrEmpty(sql))
                    return;

                string querySql = sql.Split(new char[] { '~' })[0];
                string updateSql = sql.Split(new char[] { '~' })[1];

                DataSet ds = DbHelperSQL.Query(querySql);
                if (ds == null || ds.Tables == null || ds.Tables.Count < 1 || ds.Tables[0].Rows == null || ds.Tables[0].Rows.Count < 1)
                    return;

                int recordCount = 0;
                int.TryParse(ds.Tables[0].Rows[0]["Co"].ToString(), out recordCount);
                if (recordCount < 1)
                    return;

                int pageCount = recordCount / pageSize + 1;

                for (int i = 0; i < pageCount; i++)
                {
                    string eachPageSql = string.Format(updateSql, i * pageSize, i * pageSize + pageSize);
                    DbHelperSQL.ExecuteSqlTran(eachPageSql);

                    Thread.Sleep(TimeSpan.FromMilliseconds(100));
                }

            }
            catch (Exception ex)
            {
                log.Error("執行分頁同步操作時出錯:" + ex.ToString() + " " + DateTime.Now + "\r\n");
            }
        }
    }

b)在 C/S 客戶端既有數據查詢也有數據修改,而且的數據比較多時(如:照片表可能比較大)

1.如果源數據庫中的表設計規范,即表中包含 ID(自增長,當數據重復時這個字段很有用)、CreateTime、UpdateTime、IsDelete 字段,而且沒有物理刪除,這時在每次同步時可以先取目的表中最新的 UpdateTime或數據庫的Timespan 值,保存到配置文件中,然后從源表中篩選大於或等於 UpdateTime 的記錄,然后通過比對 Id 來區分哪些是新增記錄,哪些是修改記錄(從源表中篩選大於目標表最大 Id 的記錄,即為新增的記錄,其他的則是修改的記錄),然后分別新增或修改到目的表中。

1.1 利用存儲過程來實現

CREATE PROCEDURE [DBO].[P_SynchronizeData]
AS
BEGIN    
    SET NOCOUNT ON;
    DECLARE @Max_UpdateTime NVARCHAR(100);
    DECLARE @SQL1 NVARCHAR(1000);
    DECLARE @SQL2 NVARCHAR(1000);
    
    SELECT @Max_UpdateTime = MAX(UpdateTime) FROM T_Target;
    IF (@Max_UpdateTime IS NULL) 
    BEGIN
        INSERT INTO T_Target(Id,Name)
        SELECT * FROM OPENQUERY(HTIMSDB,'SELECT Id,Name FROM T_Source ');
    END
    ELSE
    BEGIN
        --新增的記錄
        SET @SQL1 = 'SELECT Id,Name FROM T_Source WHERE UpdateTime >'''+ @Max_UpdateTime + ''''; 
        SET @SQL2 = 'INSERT INTO T_Target(Id,Name) VALUES(@SQL1)'            
        EXEC(@SQL2)
        --修改的記錄(修改時可以先進行物理刪除然后進行新增
     --刪除的記錄
END END

1.2 利用 c# 程序來實現

// 同步 HT_POWER 表的記錄
// 步驟一:先把源表的數據同步到目的表,不管RECORD_COUNTER字段的值(注意:這里把 RECORD_COUNTER 賦值為 0,是為了后面方便更新其值)
//門禁庫中全部的權限記錄
DataSet sourceData = SQLHelper.Query(_connStr, "SELECT ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID as sourceId FROM [SKEP_DAS].[dbo].[DAS_DoorPoint];");
List<string> sourceDataIds = new List<string>();
if (!IsNullDataSet(sourceData))
{
    int count = sourceData.Tables[0].Rows.Count;
    for (int i = 0; i < count; i++)
    {
        try
        {
            sourceDataIds.Add(sourceData.Tables[0].Rows[i]["sourceId"].ToString());
        }
        catch (Exception e) { }
    }
}

//門禁庫中最近更新過的的記錄集合
object lastUpdateTime = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, "SELECT max(InsertDateTime) as InsertDateTime FROM [HT_ACCESS].[dbo].[HT_POWER];");
string lastUpdateTimeStr = "1900-01-01 00:00:00";
if (lastUpdateTime != null)
    lastUpdateTimeStr = Convert.ToDateTime(lastUpdateTime).ToString("yyyy-MM-dd HH:mm:ss");
DataSet sourceUpdateData = SQLHelper.Query(_connStr, "SELECT ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID as sourceId FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] where InsertDateTime > '" + lastUpdateTimeStr + @"'");
List<string> sourceUpdateDataIds = new List<string>();
if (!IsNullDataSet(sourceUpdateData)) //IsNullDataSet方法見文末
{
    int count = sourceUpdateData.Tables[0].Rows.Count;
    for (int i = 0; i < count; i++)
    {
        try
        {
            sourceUpdateDataIds.Add(sourceUpdateData.Tables[0].Rows[i]["sourceId"].ToString());
        }
        catch (Exception e) { }
    }
}

//中間庫中全部(未刪除)的權限記錄
DataSet targetData = SQLHelper.Query(_connStr, "SELECT DOOR_NO+EMP_NO as targetId FROM [HT_ACCESS].[dbo].[HT_POWER] WHERE [DELETE_FLAG] = 0;");
List<string> targetDataIds = new List<string>();
if (!IsNullDataSet(targetData))
{
    int count = targetData.Tables[0].Rows.Count;
    for (int i = 0; i < count; i++)
    {
        try
        {
            targetDataIds.Add(targetData.Tables[0].Rows[i]["targetId"].ToString());
        }
        catch (Exception e) { }
    }
}

try
{                        
    //1.已刪除的記錄(這里不刪只修改DELETE_FLAG=1)
    List<string> deleteIds = targetDataIds.Where(t => !sourceDataIds.Contains(t)).ToList();
    foreach (var id in deleteIds) //依次添加防止RECORD_COUNTER重復
    {
        string clearOldDatas = @"UPDATE [HT_ACCESS].[DBO].HT_POWER SET DELETE_FLAG = 1, RECORD_COUNTER = (SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER])
                                where DOOR_NO+EMP_NO = '" + id + @"' AND [DELETE_FLAG] = 0;";
        SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearOldDatas);
    }

    //2.新增的記錄
    List<string> addIds = sourceUpdateDataIds.Where(t => !targetDataIds.Contains(t)).ToList();
    foreach (var id in addIds)
    {
        string addSql = @"INSERT INTO [HT_ACCESS].[DBO].HT_POWER (POWER_ID,DOOR_NO,EMP_NO,POWER_ZONE_NO,IS_HOLIDAY_ENABLED,InsertDateTime,RECORD_COUNTER)
                                SELECT DoorPointID, ControllerID + '00' + cast(DoorNo as varchar(35)) as [DOOR_ID], StaffID,DoorTztNo,IsHolidayEnabled,InsertDateTime,(SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER]) as RECORD_COUNTER
                                FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] 
                                where ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID = '" + id + @"'";
        SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, addSql);
    }

    //3.需要修改的記錄(由於刪除操作會影響RECORD_COUNTER的賦值,所以把刪除語句放在insert語句后面執行!否則先執行deleteSql的話,RECORD_COUNTER的值總會少1)
    List<string> updateIds = sourceUpdateDataIds.Where(t => targetDataIds.Contains(t)).ToList();
    //3.1刪除舊記錄
    foreach (var id in updateIds)
    {                            
        string clearNeedUpdateData = @"DELETE FROM [HT_ACCESS].[DBO].HT_POWER
                                where DOOR_NO+EMP_NO = '" + id + @"' AND ISNULL(DOOR_NO,'')<>'' AND ISNULL(EMP_NO,'')<>'';";
        SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearNeedUpdateData);

    }
    //3.2插入最新的記錄
    foreach (var id in updateIds)
    {                            
        string insertUpdateData = @"INSERT INTO [HT_ACCESS].[DBO].HT_POWER(POWER_ID,DOOR_NO,EMP_NO,POWER_ZONE_NO,IS_HOLIDAY_ENABLED,InsertDateTime,RECORD_COUNTER)
                                SELECT DoorPointID,ControllerID + '00' + cast(DoorNo as varchar(35)) as [DOOR_ID], StaffID,DoorTztNo,IsHolidayEnabled,InsertDateTime,(SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER]) as RECORD_COUNTER
                                FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] 
                                where ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID = '" + id + @"';";
        SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, insertUpdateData);
    }
}
catch (Exception e)
{
    logger.LogError("同步 HT_POWER 表的記錄時出錯:\r\n" + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
}  

 2.如果源數據庫表設計粗糙,沒有 UpdateTime 字段,而且還可能存在物理刪除,這時需分兩種情況:

2.1如果不允許在源數據庫中創建觸發器,由於無法區分新增還是修改而且無法直接找到被刪除的記錄,則只能進行全表比對或者按 1. 中的方法按分頁的方式進行同步了。

2.2 如果允許在源數據庫中創建觸發器和表,則可以在源表中創建相應的觸發器來監控增刪改操作,然后把操作的表名、類型、主鍵等關鍵字段值保存到新建的中間表中,最后創建對應的目的表(包含 Id、CreateTime、UpdateTime、IsDelete、RecordCount 字段,這樣如果需要同步這里的數據會簡單很多),通過后台程序定時同步數據到目的表中。

問題:當提交批量sql語句,包括增刪改時,觸發器無法區分操作順序,順序錯亂的話會導致數據同步不一致!

 

如下所示:

 創建相關的數據庫和表:

USE [TestDb]
GO
/****** Object:  StoredProcedure [dbo].[P_TriggerCommon]    Script Date: 2019/7/5 13:59:42 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

-- =============================================
-- Author:        wzl
-- Create date: 2019/03/01
-- Description:    把各觸發器中的公共操作部分提取出來(用於基礎信息庫)
-- =============================================
CREATE PROCEDURE [dbo].[P_TriggerCommon]
    @SOURCE_TABLE_NAME nvarchar(200),
    @SOURCE_PRIMARYKEY_NAME nvarchar(200),
    @SOURCE_PRIMARYKEY_VALUE nvarchar(200),
    @TYPE SMALLINT
AS
BEGIN
    SET NOCOUNT ON;

    if(not exists(select 1 from [TestDb].[DBO].[OperateData] 
                    where [TABLENAME] = @SOURCE_TABLE_NAME 
                            and [TYPE] = @TYPE 
                            and [PrimaryFeildValue] = @SOURCE_PRIMARYKEY_VALUE))
    begin
        INSERT INTO [TestDb].[DBO].[OperateData]([TABLENAME],[TYPE],[PrimaryFeild],[PrimaryFeildValue],[CreateTime]) 
        VALUES(@SOURCE_TABLE_NAME, @TYPE, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, GETDATE());
    end
    else
    begin
        UPDATE [TestDb].[DBO].[OperateData] 
        SET [CreateTime] = GETDATE() 
        where [TABLENAME] = @SOURCE_TABLE_NAME 
                and [TYPE] = @TYPE
                and [PrimaryFeild] = @SOURCE_PRIMARYKEY_NAME
                and [PrimaryFeildValue] = @SOURCE_PRIMARYKEY_VALUE;
    end
END

GO
/****** Object:  Table [dbo].[OperateData]    Script Date: 2019/7/5 13:59:42 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[OperateData](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [TableName] [nvarchar](200) NOT NULL,
    [Type] [smallint] NOT NULL,
    [PrimaryFeild] [nvarchar](200) NOT NULL,
    [PrimaryFeildValue] [nvarchar](200) NOT NULL,
    [CreateTime] [datetime] NULL,
 CONSTRAINT [PK_OperateData] PRIMARY KEY CLUSTERED 
(
    [Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

GO
/****** Object:  Table [dbo].[Student]    Script Date: 2019/7/5 13:59:42 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[Student](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [Number] [nvarchar](200) NOT NULL,
    [Name] [nvarchar](200) NULL,
    [Sex] [bit] NULL,
    [Age] [smallint] NULL,
    [Version] [timestamp] NULL,
 CONSTRAINT [PK_Student] PRIMARY KEY CLUSTERED 
(
    [Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

GO
/****** Object:  Table [dbo].[Student_Log]    Script Date: 2019/7/5 13:59:42 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[Student_Log](
    [Id] [int] NOT NULL,
    [Number] [nvarchar](200) NOT NULL,
    [Name] [nvarchar](200) NULL,
    [Sex] [bit] NULL,
    [Age] [smallint] NULL,
    [Version] [timestamp] NULL,
    [Source] [nvarchar](50) NULL
) ON [PRIMARY]

GO

 在需要同步數據的表上創建觸發器:

USE [TestDb]
GO

/****** Object:  Trigger [dbo].[T_Student]    Script Date: 2019/7/5 14:00:28 ******/
SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE TRIGGER [dbo].[T_Student]  
ON [dbo].[Student]
AFTER INSERT,DELETE,UPDATE
AS
BEGIN
     --觸發器對應的源表名
    DECLARE @SOURCE_TABLE_NAME nvarchar(200);
    SET @SOURCE_TABLE_NAME = '[TestDb].[DBO].[Student]';
    --源表中的主鍵名稱
    DECLARE @SOURCE_PRIMARYKEY_NAME nvarchar(200);
    SET @SOURCE_PRIMARYKEY_NAME = 'Number';
    --源表中的主鍵的值
    DECLARE @SOURCE_PRIMARYKEY_VALUE nvarchar(200);
    SET @SOURCE_PRIMARYKEY_VALUE = '';
    --觸發類型(1 新增、2 刪除、3 修改)
    DECLARE @TYPE SMALLINT;    
    
    --DML觸發器使用了deleted和inserted表,它們保存了被用戶修改的行的新值和原來的值
    --新增(只有inserted中有記錄,說明發生在[DAS_TenantTimezoneTable]表上的所有操作都是新增)
    IF(EXISTS(SELECT 1 FROM INSERTED) AND NOT EXISTS(SELECT 1 FROM DELETED))
    BEGIN 
        SET @TYPE = 1;

        --test
        insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'INSERTED' from INSERTED
        
        declare MyCursor cursor 
            for SELECT Number FROM INSERTED;    
        open MyCursor;
        fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
        while @@FETCH_STATUS = 0
        begin
            EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
        end
        
        close MyCursor;
        deallocate MyCursor;
    END    
    --刪除(只有deleted中有記錄,說明發生在[DAS_TenantTimezoneTable]表上的所有操作都是刪除)
    ELSE IF(EXISTS(SELECT 1 FROM DELETED) AND NOT EXISTS(SELECT 1 FROM INSERTED))
    BEGIN
        SET @TYPE = 2;

        --test
        insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'DELETED' from DELETED
        
        declare MyCursor cursor 
            for SELECT Number FROM DELETED;        
        open MyCursor;
        fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
        while @@FETCH_STATUS = 0
        begin
            EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
        end
        
        close MyCursor;
        deallocate MyCursor;
    END
    --新增、刪除、修改(inserted和deleted中都有記錄,說明發生在[DAS_TenantTimezoneTable]表上的操作可能包含:新增或刪除或修改)
    ELSE IF(EXISTS(SELECT 1 FROM INSERTED) AND EXISTS(SELECT 1 FROM DELETED))
    BEGIN
        --新增的記錄
        SET @TYPE = 1;

        --test
        insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'INSERTED' from INSERTED
        --test
        insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'DELETED' from DELETED
                
        declare MyCursor cursor 
            for select Number from inserted I where Number not in (select Number from deleted);
        open MyCursor;
        fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
        while @@FETCH_STATUS = 0
        begin
            EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
        end        
        close MyCursor;
        deallocate MyCursor;

        --刪除的記錄
        SET @TYPE = 2;
        declare MyCursor cursor 
            for select Number from deleted I where Number not in (select Number from inserted);
        open MyCursor;
        fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
        while @@FETCH_STATUS = 0
        begin
            EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
        end        
        close MyCursor;
        deallocate MyCursor;

        --修改的記錄
        SET @TYPE = 3;
        declare MyCursor cursor 
            for select Number from inserted I where Number in (select Number from deleted);
        open MyCursor;
        fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
        while @@FETCH_STATUS = 0
        begin
            EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
        end        
        close MyCursor;
        deallocate MyCursor;    
    END
END

GO

 測試腳本:

--批量執行同類型的語句
INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091001','張三',1,20);
INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091002','李四',1,18);

--批量執行不同類型的語句
UPDATE Student SET Name = Name + '1' WHERE Number = '091001';
DELETE FROM Student WHERE Number = '091002';
INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091003','王五',1,21);

 --查看結果
 select * FROM [TestDb].[dbo].[Student]
 select * FROM [TestDb].[dbo].Student_Log
 select * FROM [TestDb].[dbo].OperateData

 delete FROM [TestDb].[dbo].[Student]
 delete FROM [TestDb].[dbo].Student_Log
 delete FROM [TestDb].[dbo].OperateData

 dbcc CHECKIDENT('Student',reseed,0)
 --dbcc CHECKIDENT('Student_Log',reseed,0) 
 dbcc CHECKIDENT('OperateData',reseed,0)

 C#后台同步程序(根據操作跟蹤表的數據來同步數據):

    public class SynchronizeSqlStr
    {
        public static readonly string TriggerTrackSql = "SELECT [TableName],[Type],[PrimaryKey] FROM [SKEP_DAS].[dbo].[SKEP_SYNC]";
        public static readonly string checkHasDataSql = "SELECT 1 FROM [HT_ACCESS].[dbo].[HT_PHOTO]";
        public static readonly string initDataSql = "";
        public static readonly string updateRecordCounterSql = "";
public static Dictionary<string, string> TableMappings = new Dictionary<string, string>(); public static readonly string InsertSQLs = @"INSERT INTO {0} ({1}) SELECT {2} FROM {3} WHERE {4} = '{5}';"; public static readonly string DeleteSQLs = "DELETE FROM {0} WHERE {1} = '{2}';"; public static readonly string UpdateSQLs = ""; // DeleteSQLs + "\r\n" + InsertSQLs; public static Dictionary<string, string[]> BaseParams = new Dictionary<string, string[]>(); static SynchronizeSqlStr() { int RecordDay = 1; int.TryParse(ConfigurationManager.AppSettings["RecordDay"].ToString(), out RecordDay); string getRecordDay = DateTime.Now.AddDays(-1 * RecordDay).ToShortDateString(); // 首次啟動同步程序時,先初始化已有的數據 initDataSql = @"INSERT INTO [HT_ACCESS].[DBO].HT_PHOTO(EMP_NO,DEPT_NO,EMP_NAME,PHOTO_CONTENT,IS_QUALIFIED) SELECT StaffID,TenantID,StaffDisplayName,StaffPhotoImg,1 FROM [SKEP_DAS].[dbo].[DAS_Staff]; // RECORD_COUNTER 用於標記最新被修改過的數據,在初始化數據時由於 ID 是從1開始自增長的,所以可以用 ID 的值來初始 RECORD_COUNTER,(而首次用 "SELECT MAX(RECORD_COUNTER) + 1 FROM xxx" 插入的是 NULL) updateRecordCounterSql = @"UPDATE HT_ACCESS.DBO.HT_PHOTO SET RECORD_COUNTER = ID;"; // 門禁庫表和中間庫表的映射關系 TableMappings.Add("DAS_Staff", "HT_PHOTO"); BaseParams.Add("HT_PHOTO", new string[] { "[HT_ACCESS].[DBO].HT_PHOTO", "EMP_NO,DEPT_NO,EMP_NAME,PHOTO_CONTENT", "StaffID,TenantID,StaffDisplayName,StaffPhotoImg", "EMP_NO", "StaffID" }); } }

C#同步程序邏輯:
using HT.IMS.DataSync;
using IMS.BaseFramework.Logging;
using IMS.DBHelper;
using Quartz;
using System;
using System.Configuration;
using System.Data;
using System.Text;
using System.Threading.Tasks;

namespace SynchronizeTask
{
    public class HelloJob : IJob
    {
        private static LoggerAdapter<Program> logger = new LoggerAdapter<Program>();
        private static string _connStr = ConfigurationManager.ConnectionStrings["SQLConnectionStr"].ToString();
        string dtNow = "1900-01-01";
        private static readonly object _syncRoot = new object(); // 注意必須是 static 類型,否則每個實例都有自己的 _syncRoot,達不到全局互斥效果

        public virtual Task Execute(IJobExecutionContext context)
        {
            try
            {
                #region 初始化數據庫表中的數據

                dtNow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff");
                DataSet checkData = SQLHelper.Query(_connStr, SynchronizeSqlStr.checkHasDataSql);
                try
                {
                    if (IsNullDataSet(checkData))
                    {
                        // 由於各主鍵ID是自增長的,因此為了簡便,在初始化數據后,直接讓 RECORD_COUNTER = ID(注意:updateRecordCounterSql 里排除里 RECORD 表,因為它只有一條記錄)
                        string initDataSql = Convert.ToString(SynchronizeSqlStr.initDataSql + " " + SynchronizeSqlStr.updateRecordCounterSql);

                        // 這里不能用異步操作,因為后面的操作要在它執行成功后才能執行
                        SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, initDataSql);
                    }
                }
                catch (Exception e)
                {
                    logger.LogError("初始化數據庫表中的數據時出錯:\r\n" + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                    return Task.FromResult(false);
                }

                #endregion

                #region 同步表的數據(根據門禁觸發器的記錄)

                // 獲取門禁數據庫服務器上的觸發器生成的待同步的數據
                DataSet trackData = SQLHelper.Query(_connStr, SynchronizeSqlStr.TriggerTrackSql);

                if (IsNullDataSet(trackData))
                {
                    return Task.FromResult(true);
                }

                DataTable dt = trackData.Tables[0];
                int rowCount = trackData.Tables[0].Rows.Count;

                StringBuilder dataSyncSqls = new StringBuilder("");
                StringBuilder clearSqls = new StringBuilder("");

                //這樣寫有個問題,如果同一個表有多條記錄,由於語句沒有執行,則取maxrecordCount都是同一個值
                //改成每次拼接一條語句就執行一次
                for (int i = 0; i < rowCount; i++)
                {
                    try
                    {
                        // 提取表名(由於表名在各個數據庫中都保持一樣,這里提取表名后方便后面獲取 Dictionary 中的參數)                        
                        string tableName = dt.Rows[i]["TableName"].ToString();
                        string tableCache = tableName;
                        tableName = tableName.Substring(tableName.LastIndexOf('[') + 1).TrimEnd(new char[] { ']' });
                        tableName = SynchronizeSqlStr.TableMappings[tableName];

                        string[] tempParams = SynchronizeSqlStr.BaseParams[tableName];

                        string getMaxRecordCountSql = string.Format("SELECT MAX([RECORD_COUNTER]) FROM {0}", tempParams[0]);
                        object maxRecordCount = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, getMaxRecordCountSql);
                        string maxCount = Convert.ToString(maxRecordCount);

                        if (string.IsNullOrWhiteSpace(maxCount))
                        {
                            maxCount = "1";
                        }
                        else
                        {
                            maxCount = Convert.ToString(Convert.ToInt32(maxCount) + 1);
                        }

                        if (dt.Rows[i]["Type"].ToString() == "1") // 新增
                        {
                            dataSyncSqls.Append(
                                string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                    + "\r\n" + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}' AND DELETE_FLAT<>1", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                     + ";"
                                );
                        }
                        else if (dt.Rows[i]["Type"].ToString() == "2") // 刪除(由於是軟刪除,所以和修改處理方式類似)
                        {
                            // dataSyncSqls.Append(string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString()));
                            dataSyncSqls.Append(
                                    string.Format("UPDATE {0} SET [DELETE_FLAG] = 1, [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + "  WHERE {1} = '{2}' AND DELETE_FLAT<>1", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                    + ";"
                                  );
                        }
                        else if (dt.Rows[i]["Type"].ToString() == "3") // 修改
                        {
                            //由於找刪除是把照片改成默認的照片,而12所開發人員要求把刪除的照片標記為刪除,這里對照片的修改做特殊處理
                            if (tempParams[0] == "[HT_ACCESS].[DBO].HT_PHOTO")
                            {
                                string getPhotoContenSql = string.Format("SELECT TOP 1 [StaffPhoto] FROM [SKEP_DASPHOTO].[dbo].[DAS_StaffPhoto] WHERE [PhotoID] = '{0}' AND DELETE_FLAG<>1", dt.Rows[i]["PrimaryKey"].ToString());
                                object photoContent = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, getPhotoContenSql);
                                byte[] pc = null;
                                StringBuilder sb = new StringBuilder("0x");
                                try
                                {
                                    pc = ((byte[])photoContent);
                                    foreach (var b in pc)
                                    {
                                        sb.Append(b.ToString("X2"));
                                    }
                                }
                                catch (Exception ex) { }

                                //如果修改后的照片等於默認照片則還要標記為刪除
                                if (sb.ToString().Equals(DefaultImage.EmpPhoto))
                                {
                                    dataSyncSqls.Append(
                                        string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                        + "\r\n" + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                         + "\r\n" + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + ", DELETE_FLAG = 1 WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                           + ";"
                                        );
                                }
                                else
                                {
                                    dataSyncSqls.Append(
                                        string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                        + "\r\n" + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                         + "\r\n" + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                           + ";"
                                        );
                                }
                            }
                            else
                            {
                                // 注意:這里更新 RECORD_COUNTER 時不能用 MAX([RECORD_COUNTER]) +1,因為這里是先刪除原記錄(如果刪除的記錄的 RECORD_COUNTER 的值剛好最大,那么把 MAX([RECORD_COUNTER]) +1 也還是原來的值),然后復制新記錄。
                                // 而利用 ID 自增長的特點,讓 [RECORD_COUNTER] = [ID] 則剛好滿足 [RECORD_COUNTER] 的要求
                                dataSyncSqls.Append(
                                    string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                    + "\r\n" + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                     + "\r\n" + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                       + ";"
                                    );
                            }
                        }

                        SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, dataSyncSqls.ToString());

                        // 清理中間表里的數據
                        clearSqls.Append("DELETE FROM [SKEP_DAS].[dbo].[SKEP_SYNC] WHERE [PrimaryKey] = '" + dt.Rows[i]["PrimaryKey"] + "';");
                    }
                    catch (Exception e)
                    {
                        logger.LogError("拼接腳本時出錯:\r\n" + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + "\r\n" + dataSyncSqls.ToString());
                        break;
                    }

                    // 拼接完腳本后即可刪除[SKEP_DAS].[dbo].[SKEP_SYNC]表中的臨時數據
                    if(clearSqls.Length > 0)
                    {
                        // 執行拼接好的腳本
                        SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearSqls.ToString());
                    }
                }

                #endregion

                return Task.FromResult(true);
            }

            catch (Exception ex)
            {
                logger.LogError("執行同步程序時出錯:\r\n" + ex.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                return Task.FromResult(false);
            }
        }

        private bool IsNullDataSet(DataSet ds)
        {
            if (ds != null && ds.Tables != null && ds.Tables.Count > 0 && ds.Tables[0] != null && ds.Tables[0].Rows.Count > 0)
                return false;
            return true;
        }
    }
}
 
        
注:visual studio中提供了SSIS工具進行數據同步,但筆者不推薦該方式,因為配置太多且出錯后無法調試,推薦通過程序進行同步

相關資源:https://www.dataintegration.info/data-synchronization


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM