前言:
由於系統升級,新開發的系統對數據驗證,及數據關聯做了很多優化,現需要將原歷史版本的數據遷移到新系統中;原數據庫大約有 1千多萬數據,大約 50個表。
歷史數據庫命名為:A。 新系統庫暫命名為 :B;
使用 .net 4.5 控制台程序 + EF + MSSQL 數據庫,由於有業務邏輯及時序處理,故只能按時序從單表一條條的寫入到新庫中;
化化過程:
1、EF 如果使用多線程會出現 Sql 連接超過,或是連接不上數據庫;
2、EF 優化連接 自定義 SqlConnection,並傳到入 多線程中,解決連接不上數據庫的問題減少數據庫連接數,但由於 EF 在 SaveChangesAsync的時候做了事務提交,但事務是不支持並行操作,故會出現異常;
3、EF 優化事務,關閉EF默認事務 DbContextConfiguration.EnsureTransactionsForFunctionsAndCommands = false; 這里有個坑 關閉事務對 SaveChangesAsync 無效,問題依然存在;
4、找了很多資料總算找到可以通過 ExecuteSqlCommandAsync 執行 Sql 語句,可以關閉事務;
5、優化成執行Sql 語句:await db.Database.ExecuteSqlCommandAsync(TransactionalBehavior.DoNotEnsureTransaction, sql, SqlParameters[]);
經過以上優化處理后,就開始寫代碼:
一、關鍵的異步鎖程序:

/// <summary> /// 提供異步鎖 /// </summary> class AsyncRoot : IDisposable { /// <summary> /// 信號量 /// </summary> private readonly SemaphoreSlim semaphoreSlim; /// <summary> /// 異步鎖 /// </summary> public AsyncRoot() : this(1) { } /// <summary> /// 異步鎖 /// </summary> /// <param name="concurrent">允許並行的線程數</param> public AsyncRoot(int concurrent) { this.semaphoreSlim = new SemaphoreSlim(concurrent, concurrent); } /// <summary> /// 鎖住代碼塊 /// using( asyncRoot.Lock() ){ } /// </summary> /// <returns></returns> public IDisposable Lock() { this.semaphoreSlim.Wait(); return new UnLocker(this.semaphoreSlim); } /// <summary> /// 鎖住代碼塊 /// using( await asyncRoot.LockAsync() ){ } /// </summary> /// <returns></returns> public async Task<IDisposable> LockAsync() { await this.semaphoreSlim.WaitAsync().ConfigureAwait(false); return new UnLocker(this.semaphoreSlim); } /// <summary> /// 釋放資源 /// </summary> public void Dispose() { this.semaphoreSlim.Dispose(); } /// <summary> /// 提供解鎖 /// </summary> class UnLocker : IDisposable { /// <summary> /// 信號量 /// </summary> private readonly SemaphoreSlim semaphoreSlim; /// <summary> /// 解鎖 /// </summary> /// <param name="semaphoreSlim">信號量</param> public UnLocker(SemaphoreSlim semaphoreSlim) { this.semaphoreSlim = semaphoreSlim; } /// <summary> /// 釋放鎖 /// </summary> public void Dispose() { this.semaphoreSlim.Release(); } } }
二、對數據插入到數據庫:
邏輯分析:對傳入的 數據集合,拆分為單個實體操作任務,每個任務使用同一個連接,獨立的數據庫上下文,對實體反射為 Sql 語句(其中增加主鍵,表名、字段名、值的判斷驗證),
然后通過 ExecuteSqlCommandAsync 不使用事務的方式執行 Sql 語句;具體代碼見下:
//表示最大線程數
private readonly AsyncRoot root = new AsyncRoot(50);

/// <summary> /// 多線程工作 /// </summary> public class Workers { /// <summary> /// 多線程鎖 /// </summary> private readonly AsyncRoot root = new AsyncRoot(50); /// <summary> /// 執行對象操作 /// </summary> /// <param name="datas"></param> /// <returns></returns> public async Task RunAsync<T>(IEnumerable<T> datas) where T : class { //創建 Sql 連接 var connection = new SqlConnection(System.Configuration.ConfigurationManager.ConnectionStrings["SqlDb"].ConnectionString); await connection.OpenAsync(); var tasks = datas.Select(item => SaveToDbAsync(item, connection)); await Task.WhenAll(tasks); } /// <summary> /// 單條記錄保存到數據庫 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="data"></param> /// <param name="connection"></param> /// <returns></returns> private async Task SaveToDbAsync<T>(T data, DbConnection connection) where T : class { using (await root.LockAsync()) { using (var db = new SqlDb(connection)) { try { var dbset = db.Set<T>(); var tType = typeof(T); var tableName = tType.Name; //獲取 TableAttribute 數據庫中的表名 var tableAtt = Attribute.GetCustomAttribute(tType, typeof(TableAttribute)) as TableAttribute; if (tableAtt != null) { tableName = tableAtt.Name; } var sbSql = new StringBuilder(); sbSql.AppendLine("insert into " + tableName + " ("); var plist = new List<string>(); var fieldParameters = new List<SqlParameter>(); var keyFiled = "ID"; foreach (var p in typeof(T).GetProperties()) { var pName = p.Name.ToUpper(); //獲取 ColumnAttribute 數據庫中的列名 var pAtt = Attribute.GetCustomAttribute(p, typeof(ColumnAttribute)) as ColumnAttribute; if (pAtt != null) { pName = pAtt.Name.ToUpper(); } var keyAtt = Attribute.GetCustomAttribute(p, typeof(KeyAttribute)) as KeyAttribute; if (keyAtt != null || p.Name.Equals("ID", StringComparison.OrdinalIgnoreCase)) { keyFiled = pName; } var fieldParameter = "@" + pName; //過濾不插入數據庫中的字段 var mapAtt = Attribute.GetCustomAttribute(p, typeof(NotMappedAttribute)); if (mapAtt == null) { var value = p.GetValue(data, null); //如果屬性值為 Null,不插入數據庫 if (value != null) { plist.Add(fieldParameter); fieldParameters.Add(new SqlParameter(fieldParameter, value)); } } } sbSql.Append(string.Join(",", plist.Select(item => item.Replace("@", "")))); sbSql.Append(")values("); sbSql.Append(string.Join(",", plist)); sbSql.Append(")"); //判斷主鍵是否已經存在,存在就不插入數據 var ifSql = "if not exists(select 1 from [" + tableName + "] where " + keyFiled + " = @" + keyFiled + ")"; var sql = ifSql + sbSql.ToString(); await db.Database.ExecuteSqlCommandAsync(TransactionalBehavior.DoNotEnsureTransaction, sql, fieldParameters.ToArray()); } catch (Exception ex) { Console.WriteLine(ex); } } } } }

/// <summary> /// Sql數據庫 /// </summary> public class SqlDb : DbContext { /// <summary> /// 自定義連接 /// </summary> /// <param name="connection">數據庫連接</param> public SqlDb(DbConnection connection) : base(connection, false) { if (connection.State != System.Data.ConnectionState.Open) { connection.Open(); } this.Database.CommandTimeout = 60 * 1000; this.Configuration.UseDatabaseNullSemantics = true; this.Configuration.EnsureTransactionsForFunctionsAndCommands = false; this.Configuration.ValidateOnSaveEnabled = false; } }
三、注意事項:
1、如果字段為 geography (地理位置) 類型,會出現異常,希望在使用的時候注意一下;
2、由於集合為同一個對象,故在每次反射的對象幾乎都是重復操作,可以根據實際情況增加緩存;
其它:
多線程並行操作小實例源碼:https://github.com/intotf/netExample/tree/master/Tool/MultiTaskAsync