本文為 Dennis Gao 原創技術文章,發表於博客園博客,未經作者本人允許禁止任何形式的轉載。
前言
很顯然,你應該不至於使用 EntityFramework 直接插入 10W 數據到數據庫中,那可能得用上個幾分鍾。EntityFramework 最被人詬病的地方就是它的性能,處理大量數據時的效率。此種條件下,通常會轉回使用 ADO.NET 來完成任務。
但是,如果已經在項目中使用了 EntityFramework,如果碰到需要直接向數據庫中插入 10W 的數據的需求,引入 ADO.NET 和 SqlBulkCopy 的組合將打破 EntityFramework 作為 ORM 所帶來的優勢,我們不得不再次去編寫那些 SQL 語句,關注表結構的細節,相應的代碼可維護性也在下降。
那么,假設我們將 SqlBulkCopy 的功能封裝為 EntityFramework 中的一個擴展方法,通過接口像外暴露 BulkInsert 方法。這樣,我們既沒有改變使用 EntityFramework 的習慣,同時也隱藏了 SqlBulkCopy 的代碼細節,更重要的是,合理的封裝演進出復用的可能性,可以在多個 Entity 表中使用。
環境准備
以下測試基於 EntityFramework 6.0.2 版本。
首先定義一個 Customer 類:
1 public class Customer 2 { 3 public long Id { get; set; } 4 public string Name { get; set; } 5 public string Address { get; set; } 6 public string Phone { get; set; } 7 }
通過 CustomerMap 類將 Entity 映射到數據庫表結構:
1 public class CustomerMap : EntityTypeConfiguration<Customer> 2 { 3 public CustomerMap() 4 { 5 // Primary Key 6 this.HasKey(t => t.Id); 7 8 // Properties 9 this.Property(t => t.Name) 10 .IsRequired() 11 .HasMaxLength(256); 12 13 this.Property(t => t.Phone) 14 .IsRequired() 15 .HasMaxLength(256); 16 17 // Table & Column Mappings 18 this.ToTable("Customer", "STORE"); 19 this.Property(t => t.Id).HasColumnName("Id"); 20 this.Property(t => t.Name).HasColumnName("Name"); 21 this.Property(t => t.Address).HasColumnName("Address"); 22 this.Property(t => t.Phone).HasColumnName("Phone"); 23 } 24 }
我們定義數據庫的名字為 “Retail”,則使用 RetailEntities 類來實現 DbContext :
1 public class RetailEntities : DbContext 2 { 3 static RetailEntities() 4 { 5 Database.SetInitializer<RetailEntities>( 6 new DropCreateDatabaseAlways<RetailEntities>()); 7 } 8 9 public RetailEntities() 10 : base("Name=RetailEntities") 11 { 12 } 13 14 public DbSet<Customer> Customers { get; set; } 15 16 protected override void OnModelCreating(DbModelBuilder modelBuilder) 17 { 18 modelBuilder.Configurations.Add(new CustomerMap()); 19 } 20 }
將 DatabaseInitializer 設置為 DropCreateDatabaseAlways,這樣我們可以保證每次都針對新表進行測試。
如果需要更復雜的模型,我們將基於如下的模型進行測試:
測試主機
數據庫:Microsoft SQL Server 2012 (64-bit)
EntityFramework 插入 10W 數據需要多久
我們先來看下EntityFramework 插入 10W 數據需要多久。
構造 10W 個 Customer 實例:
1 int customerCount = 100000; 2 3 List<Customer> customers = new List<Customer>(); 4 for (int i = 0; i < customerCount; i++) 5 { 6 Customer customer = new Customer() 7 { 8 Name = "Dennis Gao" + i, 9 Address = "Beijing" + i, 10 Phone = "18888888888" + i, 11 }; 12 customers.Add(customer); 13 14 Console.Write("."); 15 }
使用如下語法來將上面構造的 10W 數據保存到數據庫中:
1 using (RetailEntities context = new RetailEntities()) 2 { 3 foreach (var entity in customers) 4 { 5 context.Customers.Add(entity); 6 } 7 context.SaveChanges(); 8 }
通過 context.SaveChanges() 來保證一次事務提交。
為了計算使用時間,在上面代碼的前后加上 Stopwatch 來計算:
1 Stopwatch watch = Stopwatch.StartNew(); 2 3 using (RetailEntities context = new RetailEntities()) 4 { 5 foreach (var entity in customers) 6 { 7 context.Customers.Add(entity); 8 } 9 context.SaveChanges(); 10 } 11 12 watch.Stop(); 13 Console.WriteLine(string.Format( 14 "{0} customers are created, cost {1} milliseconds.", 15 customerCount, watch.ElapsedMilliseconds));
然后運行,
好吧,我應該沒有耐心等待它運行完。
現在減少數據量進行測試,將數據數量降低到 500 條,
空表插入500 條數據耗時 5652 毫秒。我多測了幾遍,這個數據穩定在 5 秒以上。
將數據量改變到 1000 條,
將數據量改變到 1500 條,
將數據量改變到 10000 條,
那么我們估計下 10W 數據大概需要 10W / 500 * 2 = 至少 400 秒 = 至少 6 分鍾。
好吧,慢是毋庸置疑的。
SqlBulkCopy 接口描述
Microsoft SQL Server 提供一個稱為 bcp 的流行的命令提示符實用工具,用於將數據從一個表移動到另一個表(表既可以在同一個服務器上,也可以在不同服務器上)。 SqlBulkCopy 類允許編寫提供類似功能的托管代碼解決方案。 還有其他將數據加載到 SQL Server 表的方法(例如 INSERT 語句),但相比之下 SqlBulkCopy 提供明顯的性能優勢。
使用 SqlBulkCopy 類只能向 SQL Server 表寫入數據。 但是,數據源不限於 SQL Server;可以使用任何數據源,只要數據可加載到 DataTable 實例或可使用 IDataReader 實例讀取數據。
在 .NET 4.5 中還提供了支持 async 語法的接口。
這里,我們選用 DataTable 來構建數據源,將 10W 數據導入 DataTable 中。可以看出,我們需要構建出給定 Entity 類型所對應的數據表的 DataTable,將所有的 entities 數據插入到 DataTable 中。
構建 TableMapping 映射
此時,我並不想手工書寫表中的各字段名稱,同時,我可能甚至都不想關心 Entity 類到底被映射到了數據庫中的哪一張表上。
此處,我們定義一個 TableMapping 類,用於存儲一張數據庫表的映射信息。
在獲取和生成 TableMapping 之前,我們需要先定義和獲取 DbMapping 類。
1 internal class DbMapping 2 { 3 public DbMapping(DbContext context) 4 { 5 _context = context; 6 7 var objectContext = ((IObjectContextAdapter)context).ObjectContext; 8 _metadataWorkspace = objectContext.MetadataWorkspace; 9 10 _codeFirstEntityContainer = _metadataWorkspace.GetEntityContainer("CodeFirstDatabase", DataSpace.SSpace); 11 12 MapDb(); 13 } 14 }
通過讀取 CodeFirstEntityContainer 中的元數據,我們可以獲取到指定數據庫中的所有表的信息。
1 private void MapDb() 2 { 3 ExtractTableColumnEdmMembers(); 4 5 List<EntityType> tables = 6 _metadataWorkspace 7 .GetItems(DataSpace.OCSpace) 8 .Select(x => x.GetPrivateFieldValue("EdmItem") as EntityType) 9 .Where(x => x != null) 10 .ToList(); 11 12 foreach (var table in tables) 13 { 14 MapTable(table); 15 } 16 }
進而,根據表映射類型的定義,可以獲取到表中字段的映射信息。
1 private void MapTable(EntityType tableEdmType) 2 { 3 string identity = tableEdmType.FullName; 4 EdmType baseEdmType = tableEdmType; 5 EntitySet storageEntitySet = null; 6 7 while (!_codeFirstEntityContainer.TryGetEntitySetByName(baseEdmType.Name, false, out storageEntitySet)) 8 { 9 if (baseEdmType.BaseType == null) break; 10 baseEdmType = baseEdmType.BaseType; 11 } 12 if (storageEntitySet == null) return; 13 14 var tableName = (string)storageEntitySet.MetadataProperties["Table"].Value; 15 var schemaName = (string)storageEntitySet.MetadataProperties["Schema"].Value; 16 17 var tableMapping = new TableMapping(identity, schemaName, tableName); 18 _tableMappings.Add(identity, tableMapping); 19 _primaryKeysMapping.Add(identity, storageEntitySet.ElementType.KeyMembers.Select(x => x.Name).ToList()); 20 21 foreach (var prop in storageEntitySet.ElementType.Properties) 22 { 23 MapColumn(identity, _tableMappings[identity], prop); 24 } 25 }
然后,可以將表信息和字段信息存放到 TableMapping 和 ColumnMapping 當中。
internal class TableMapping { public string TableTypeFullName { get; private set; } public string SchemaName { get; private set; } public string TableName { get; private set; } public ColumnMapping[] Columns { get { return _columnMappings.Values.ToArray(); } } }
構建 DataTable 數據
終於,有了 TableMapping 映射之后,我們可以開始創建 DataTable 了。
1 private static DataTable BuildDataTable<T>(TableMapping tableMapping) 2 { 3 var entityType = typeof(T); 4 string tableName = string.Join(@".", tableMapping.SchemaName, tableMapping.TableName); 5 6 var dataTable = new DataTable(tableName); 7 var primaryKeys = new List<DataColumn>(); 8 9 foreach (var columnMapping in tableMapping.Columns) 10 { 11 var propertyInfo = entityType.GetProperty(columnMapping.PropertyName, '.'); 12 columnMapping.Type = propertyInfo.PropertyType; 13 14 var dataColumn = new DataColumn(columnMapping.ColumnName); 15 16 Type dataType; 17 if (propertyInfo.PropertyType.IsNullable(out dataType)) 18 { 19 dataColumn.DataType = dataType; 20 dataColumn.AllowDBNull = true; 21 } 22 else 23 { 24 dataColumn.DataType = propertyInfo.PropertyType; 25 dataColumn.AllowDBNull = columnMapping.Nullable; 26 } 27 28 if (columnMapping.IsIdentity) 29 { 30 dataColumn.Unique = true; 31 if (propertyInfo.PropertyType == typeof(int) 32 || propertyInfo.PropertyType == typeof(long)) 33 { 34 dataColumn.AutoIncrement = true; 35 } 36 else continue; 37 } 38 else 39 { 40 dataColumn.DefaultValue = columnMapping.DefaultValue; 41 } 42 43 if (propertyInfo.PropertyType == typeof(string)) 44 { 45 dataColumn.MaxLength = columnMapping.MaxLength; 46 } 47 48 if (columnMapping.IsPk) 49 { 50 primaryKeys.Add(dataColumn); 51 } 52 53 dataTable.Columns.Add(dataColumn); 54 } 55 56 dataTable.PrimaryKey = primaryKeys.ToArray(); 57 58 return dataTable; 59 }
通過 Schema 名稱和表名稱來構建指定 Entity 類型的 DataTable 對象。
然后將,entities 數據列表中的數據導入到 DataTable 對象之中。
1 private static DataTable CreateDataTable<T>(TableMapping tableMapping, IEnumerable<T> entities) 2 { 3 var dataTable = BuildDataTable<T>(tableMapping); 4 5 foreach (var entity in entities) 6 { 7 DataRow row = dataTable.NewRow(); 8 9 foreach (var columnMapping in tableMapping.Columns) 10 { 11 var @value = entity.GetPropertyValue(columnMapping.PropertyName); 12 13 if (columnMapping.IsIdentity) continue; 14 15 if (@value == null) 16 { 17 row[columnMapping.ColumnName] = DBNull.Value; 18 } 19 else 20 { 21 row[columnMapping.ColumnName] = @value; 22 } 23 } 24 25 dataTable.Rows.Add(row); 26 } 27 28 return dataTable; 29 }
SqlBulkCopy 導入數據
終於,數據源准備好了。然后使用如下代碼結構,調用 WriteToServer 方法,將數據寫入數據庫。
1 using (DataTable dataTable = CreateDataTable(tableMapping, entities)) 2 { 3 using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(transaction.Connection, options, transaction)) 4 { 5 sqlBulkCopy.BatchSize = batchSize; 6 sqlBulkCopy.DestinationTableName = dataTable.TableName; 7 sqlBulkCopy.WriteToServer(dataTable); 8 } 9 }
看下保存 500 數據的效果,用時 1.9 秒。
看下保存 1W 數據的效果,用時 2.1 秒。
看下保存 10W 數據的效果,用時 7.5 秒。
再試下 100W 數據的效果,用時 27 秒。
封裝 BulkInsert 擴展方法
我們可以為 DbContext 添加一個 BulkInsert 擴展方法。
1 internal static class DbContextBulkOperationExtensions 2 { 3 public const int DefaultBatchSize = 1000; 4 5 public static void BulkInsert<T>(this DbContext context, IEnumerable<T> entities, int batchSize = DefaultBatchSize) 6 { 7 var provider = new BulkOperationProvider(context); 8 provider.Insert(entities, batchSize); 9 } 10 }
IBulkableRepository 接口
在下面兩篇文章中,我介紹了精煉的 IRepository 接口。
1 public interface IRepository<T> 2 where T : class 3 { 4 IQueryable<T> Query(); 5 void Insert(T entity); 6 void Update(T entity); 7 void Delete(T entity); 8 }
當我們需要擴展 BulkInsert 功能時,可以通過繼承來完成功能擴展。
1 public interface IBulkableRepository<T> : IRepository<T> 2 where T : class 3 { 4 void BulkInsert(IEnumberable<T> entities); 5 }
這樣,我們就可以使用很自然的方式直接使用 BulkInsert 功能了。
代碼在哪里
代碼在這里:
參考資料
- SqlBulkCopy
- Table-Valued Parameters
- EntityFramework.BulkInsert
- Using SQL bulk copy with your LINQ-to-SQL datacontext
本文為 Dennis Gao 原創技術文章,發表於博客園博客,未經作者本人允許禁止任何形式的轉載。