數據遷移最快方式,多線程並行執行 Sql插入


前言:

  由於系統升級,新開發的系統對數據驗證,及數據關聯做了很多優化,現需要將原歷史版本的數據遷移到新系統中;原數據庫大約有 1千多萬數據,大約 50個表。

  歷史數據庫命名為:A。 新系統庫暫命名為 :B;

  使用  .net 4.5 控制台程序 + EF + MSSQL 數據庫,由於有業務邏輯及時序處理,故只能按時序從單表一條條的寫入到新庫中;

 

化化過程:

  1、EF 如果使用多線程會出現 Sql 連接超過,或是連接不上數據庫;

  2、EF 優化連接 自定義 SqlConnection,並傳到入 多線程中,解決連接不上數據庫的問題減少數據庫連接數,但由於 EF 在 SaveChangesAsync的時候做了事務提交,但事務是不支持並行操作,故會出現異常;

  3、EF 優化事務,關閉EF默認事務  DbContextConfiguration.EnsureTransactionsForFunctionsAndCommands = false; 這里有個坑  關閉事務對  SaveChangesAsync 無效,問題依然存在;

  4、找了很多資料總算找到可以通過  ExecuteSqlCommandAsync 執行 Sql 語句,可以關閉事務;

  5、優化成執行Sql 語句:await db.Database.ExecuteSqlCommandAsync(TransactionalBehavior.DoNotEnsureTransaction, sql, SqlParameters[]);

  經過以上優化處理后,就開始寫代碼:

一、關鍵的異步鎖程序:

    /// <summary>
    /// 提供異步鎖
    /// </summary>
    class AsyncRoot : IDisposable
    {
        /// <summary>
        /// 信號量
        /// </summary>
        private readonly SemaphoreSlim semaphoreSlim;

        /// <summary>
        /// 異步鎖
        /// </summary>
        public AsyncRoot()
            : this(1)
        {
        }

        /// <summary>
        /// 異步鎖
        /// </summary>
        /// <param name="concurrent">允許並行的線程數</param>
        public AsyncRoot(int concurrent)
        {
            this.semaphoreSlim = new SemaphoreSlim(concurrent, concurrent);
        }

        /// <summary>
        /// 鎖住代碼塊
        /// using( asyncRoot.Lock() ){ }
        /// </summary>
        /// <returns></returns>
        public IDisposable Lock()
        {
            this.semaphoreSlim.Wait();
            return new UnLocker(this.semaphoreSlim);
        }

        /// <summary>
        /// 鎖住代碼塊
        /// using( await asyncRoot.LockAsync() ){ }
        /// </summary>
        /// <returns></returns>
        public async Task<IDisposable> LockAsync()
        {
            await this.semaphoreSlim.WaitAsync().ConfigureAwait(false);
            return new UnLocker(this.semaphoreSlim);
        }

        /// <summary>
        /// 釋放資源
        /// </summary>
        public void Dispose()
        {
            this.semaphoreSlim.Dispose();
        }

        /// <summary>
        /// 提供解鎖
        /// </summary>
        class UnLocker : IDisposable
        {
            /// <summary>
            /// 信號量
            /// </summary>
            private readonly SemaphoreSlim semaphoreSlim;

            /// <summary>
            /// 解鎖
            /// </summary>
            /// <param name="semaphoreSlim">信號量</param>
            public UnLocker(SemaphoreSlim semaphoreSlim)
            {
                this.semaphoreSlim = semaphoreSlim;
            }

            /// <summary>
            /// 釋放鎖
            /// </summary>
            public void Dispose()
            {
                this.semaphoreSlim.Release();
            }
        }
    }
多線層異常鎖

 

二、對數據插入到數據庫:

 邏輯分析:對傳入的 數據集合,拆分為單個實體操作任務,每個任務使用同一個連接獨立的數據庫上下文,對實體反射為 Sql 語句(其中增加主鍵,表名、字段名、值的判斷驗證),

然后通過 ExecuteSqlCommandAsync 不使用事務的方式執行 Sql 語句;具體代碼見下:

//表示最大線程數
private readonly AsyncRoot root = new AsyncRoot(50);

    /// <summary>
    /// 多線程工作
    /// </summary>
    public class Workers
    {
        /// <summary>
        /// 多線程鎖
        /// </summary>
        private readonly AsyncRoot root = new AsyncRoot(50);

        /// <summary>
        /// 執行對象操作
        /// </summary>
        /// <param name="datas"></param>
        /// <returns></returns>
        public async Task RunAsync<T>(IEnumerable<T> datas) where T : class
        {
            //創建 Sql 連接
            var connection = new SqlConnection(System.Configuration.ConfigurationManager.ConnectionStrings["SqlDb"].ConnectionString);
            await connection.OpenAsync();
            var tasks = datas.Select(item => SaveToDbAsync(item, connection));
            await Task.WhenAll(tasks);
        }

        /// <summary>
        /// 單條記錄保存到數據庫
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="data"></param>
        /// <param name="connection"></param>
        /// <returns></returns>
        private async Task SaveToDbAsync<T>(T data, DbConnection connection) where T : class
        {
            using (await root.LockAsync())
            {
                using (var db = new SqlDb(connection))
                {
                    try
                    {
                        var dbset = db.Set<T>();
                        var tType = typeof(T);
                        var tableName = tType.Name;
                        //獲取  TableAttribute 數據庫中的表名
                        var tableAtt = Attribute.GetCustomAttribute(tType, typeof(TableAttribute)) as TableAttribute;
                        if (tableAtt != null)
                        {
                            tableName = tableAtt.Name;
                        }

                        var sbSql = new StringBuilder();

                        sbSql.AppendLine("insert into " + tableName + " (");
                        var plist = new List<string>();
                        var fieldParameters = new List<SqlParameter>();
                        var keyFiled = "ID";
                        foreach (var p in typeof(T).GetProperties())
                        {
                            var pName = p.Name.ToUpper();
                            //獲取  ColumnAttribute 數據庫中的列名
                            var pAtt = Attribute.GetCustomAttribute(p, typeof(ColumnAttribute)) as ColumnAttribute;
                            if (pAtt != null)
                            {
                                pName = pAtt.Name.ToUpper();
                            }

                            var keyAtt = Attribute.GetCustomAttribute(p, typeof(KeyAttribute)) as KeyAttribute;
                            if (keyAtt != null || p.Name.Equals("ID", StringComparison.OrdinalIgnoreCase))
                            {
                                keyFiled = pName;
                            }

                            var fieldParameter = "@" + pName;
                            //過濾不插入數據庫中的字段
                            var mapAtt = Attribute.GetCustomAttribute(p, typeof(NotMappedAttribute));
                            if (mapAtt == null)
                            {
                                var value = p.GetValue(data, null);
                                //如果屬性值為 Null,不插入數據庫
                                if (value != null)
                                {
                                    plist.Add(fieldParameter);
                                    fieldParameters.Add(new SqlParameter(fieldParameter, value));
                                }
                            }
                        }
                        sbSql.Append(string.Join(",", plist.Select(item => item.Replace("@", ""))));
                        sbSql.Append(")values(");
                        sbSql.Append(string.Join(",", plist));
                        sbSql.Append(")");
                        //判斷主鍵是否已經存在,存在就不插入數據
                        var ifSql = "if not exists(select 1 from [" + tableName + "] where " + keyFiled + " = @" + keyFiled + ")";

                        var sql = ifSql + sbSql.ToString();
                        await db.Database.ExecuteSqlCommandAsync(TransactionalBehavior.DoNotEnsureTransaction, sql, fieldParameters.ToArray());
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex);
                    }
                }
            }
        }
    }
多線程及對象生成 Sql插入數據庫
    /// <summary>
    /// Sql數據庫
    /// </summary>
    public class SqlDb : DbContext
    {
        /// <summary>
        /// 自定義連接
        /// </summary>
        /// <param name="connection">數據庫連接</param>
        public SqlDb(DbConnection connection) :
            base(connection, false)
        {
            if (connection.State != System.Data.ConnectionState.Open)
            {
                connection.Open();
            }

            this.Database.CommandTimeout = 60 * 1000;
            this.Configuration.UseDatabaseNullSemantics = true;
            this.Configuration.EnsureTransactionsForFunctionsAndCommands = false;
            this.Configuration.ValidateOnSaveEnabled = false;
        }
    }
數據庫上下文

 

三、注意事項:

  1、如果字段為 geography (地理位置) 類型,會出現異常,希望在使用的時候注意一下;

  2、由於集合為同一個對象,故在每次反射的對象幾乎都是重復操作,可以根據實際情況增加緩存;

其它:

多線程並行操作小實例源碼:https://github.com/intotf/netExample/tree/master/Tool/MultiTaskAsync

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM