自曾列就別往下看 別折騰了
下面在sqlserver中演示
mysql 請google MySqlBulkLoader
oracle 請google OracleBulkCopy
表結構
DROP TABLE [dbo].[Product] GO CREATE TABLE [dbo].[Product] ( [Id] varchar(36) NOT NULL , [Name] varchar(255) NOT NULL , [Price] decimal(18,4) NOT NULL ) GO ALTER TABLE [dbo].[Product] ADD PRIMARY KEY ([Id]) GO
批量添加
public static void Insert<T>(string connectionString, List<T> dataList, string destinationTableName, int batchSize = 0) { DataTable dataTable = ConvertToDataTable(dataList); Insert(connectionString, dataTable, destinationTableName, batchSize); } public static void Insert(string connectionString, DataTable dataTable, string destinationTableName, int batchSize = 0) { using (SqlConnection connection = new SqlConnection(connectionString)) { if (connection.State != ConnectionState.Open) { connection.Open(); } using (SqlTransaction transaction = connection.BeginTransaction()) { using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction)) { bulkCopy.BatchSize = batchSize; bulkCopy.DestinationTableName = destinationTableName; try { bulkCopy.WriteToServer(dataTable); transaction.Commit(); } catch (Exception ex) { Console.WriteLine(ex.Message); transaction.Rollback(); } } } } }
批量添加測試代碼
public static void Insert() { List<Product> products = new List<Product>(); for (int i = 0; i < 100000; i++) { Product product = new Product { Id = Guid.NewGuid().ToString(), Name = $"商品{i}", Price = (decimal)i }; products.Add(product); } Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); Insert(SqLiteHelper.SqlServerConnection, products, "Product"); stopwatch.Stop(); Console.WriteLine("耗時:" + stopwatch.ElapsedMilliseconds); }
批量更新
public static void Update<T>(string connectionString, List<T> list, string destinationTableName) { var dt = ConvertToDataTable(list); using (SqlConnection connection = new SqlConnection(connectionString)) { if (connection.State != ConnectionState.Open) { connection.Open(); } using (SqlTransaction transaction = connection.BeginTransaction()) { using (SqlCommand command = new SqlCommand(string.Empty, connection)) { try { command.Transaction = transaction; command.CommandText = "CREATE TABLE #TmpTable(Id varchar(36),Name varchar(255),Price decimal(18,4))"; command.ExecuteNonQuery(); using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction)) { bulkCopy.BulkCopyTimeout = 660; bulkCopy.DestinationTableName = "#TmpTable"; bulkCopy.WriteToServer(dt); bulkCopy.Close(); } command.CommandTimeout = 300; command.CommandText = "UPDATE T SET T.Name =Temp.Name FROM " + destinationTableName + " T INNER JOIN #TmpTable Temp ON T.Id=Temp.Id; DROP TABLE #TmpTable;"; command.ExecuteNonQuery(); transaction.Commit(); } catch (Exception) { transaction.Rollback(); } } } } }
批量更新測試代碼
public static List<string> GetList() { List<string> list = new List<string>(); using (SqlConnection conn = new SqlConnection(SqLiteHelper.SqlServerConnection)) { using (SqlCommand command = new SqlCommand("SELECT TOP 5000 Id FROM Product", conn)) { conn.Open(); var data = command.ExecuteReader(); while (data.Read()) { list.Add(data["Id"].ToString()); } } } return list; } public static void Update() { var list = GetList(); List<Product> products = new List<Product>(); for (int i = 0; i < list.Count; i++) { Product product = new Product { Id = list[i], Name = $"默認{i}", Price = (decimal)i * 5 }; products.Add(product); } Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); Update(SqLiteHelper.SqlServerConnection, products, "Product"); stopwatch.Stop(); Console.WriteLine("耗時:" + stopwatch.ElapsedMilliseconds); }
List轉DataTable
public static DataTable ConvertToDataTable<T>(IList<T> data) { PropertyDescriptorCollection properties = TypeDescriptor.GetProperties(typeof(T)); DataTable table = new DataTable(); foreach (PropertyDescriptor prop in properties) { table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType); } foreach (T item in data) { DataRow row = table.NewRow(); foreach (PropertyDescriptor prop in properties) { row[prop.Name] = prop.GetValue(item) ?? DBNull.Value; } table.Rows.Add(row); } return table; }
實體類
public class Product { public string Id { get; set; } public string Name { get; set; } public decimal Price { get; set; } }
鏈接字符串配置
public class SqLiteHelper { public const string SqlServerConnection = "Data Source=IP;Initial Catalog=庫名;uid=帳號;pwd=密碼;MultipleActiveResultSets=True"; }
測試了一下 添加10W 差不多 10S左右
補充一個 多表操作
public static void Inserts(string connectionString, Dictionary<string, DataTable> dataTables, int batchSize = 0) { using (SqlConnection connection = new SqlConnection(connectionString)) { if (connection.State != ConnectionState.Open) { connection.Open(); } using (SqlTransaction transaction = connection.BeginTransaction()) { try { foreach (var item in dataTables) { using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction)) { bulkCopy.BatchSize = batchSize; bulkCopy.DestinationTableName = item.Key; bulkCopy.WriteToServer(item.Value); } } transaction.Commit(); } catch (Exception ex) { Console.WriteLine(ex.Message); transaction.Rollback(); } } } }
測試代碼
public static void Inserts() { const int count = 10000; List<Order> orders = new List<Order>(); List<Product> products = new List<Product>(); for (var i = 0; i < count; i++) { Product product = new Product { Id = Guid.NewGuid().ToString(), Name = $"商品{i}", Price = i * 0.8M }; products.Add(product); Order order = new Order { Id = Guid.NewGuid().ToString(), ProductId = product.Id, Remake = "suggestions", Status = 1 }; orders.Add(order); } var productsDataTable = Batch.ConvertToDataTable(products); var ordersDataTable = Batch.ConvertToDataTable(orders); Dictionary<string, DataTable> dataTables = new Dictionary<string, DataTable> { { "Product", productsDataTable}, { "Orders",ordersDataTable} }; Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); Inserts(SqLiteHelper.SqlServerConnection, dataTables); stopwatch.Stop(); Console.WriteLine("耗時:" + stopwatch.ElapsedMilliseconds); }
新增訂單實體對象
public class Order { public string Id { get; set; } public string ProductId { get; set; } public int Status { get; set; } public string Remake { get; set; } }
Order表結構
DROP TABLE [dbo].[Orders] GO CREATE TABLE [dbo].[Orders] ( [Id] varchar(36) NOT NULL , [ProductId] varchar(36) NOT NULL , [Status] int NOT NULL , [Remake] varchar(255) NOT NULL ) GO ALTER TABLE [dbo].[Orders] ADD PRIMARY KEY ([Id]) GO
批量刪除也貼一個吧
public void BatchDelete<T>(List<T> idList) { var type = typeof(T); var id = GetProperties(type); var innerJoin = $"a.{id}=b.{id}"; var tempTableName = $"#TmpTable{type.Name}"; var dataTableName = BulkCopyRepositoryExtension.GetTableName(type); var sqlConnection = (SqlConnection)_unit.Connection; var sqlTransaction = (SqlTransaction)_unit.Transaction; var sqlCommand = (SqlCommand)_unit.Command; sqlCommand.CommandText = $"SELECT * INTO {tempTableName} FROM {dataTableName} WHERE 1 = 2"; sqlCommand.ExecuteNonQuery(); using (SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.Default, sqlTransaction)) { bulkCopy.DestinationTableName = tempTableName; using (var reader = new ObjectReader(type, idList, BulkCopyRepositoryExtension.GetFields(type))) { bulkCopy.WriteToServer(reader); } } sqlCommand.CommandText = $"DELETE a FROM {dataTableName} AS a INNER JOIN {tempTableName} AS b ON {innerJoin}; DROP TABLE {tempTableName};"; sqlCommand.ExecuteNonQuery(); }
批量刪除關鍵代碼已貼上 如需全部代碼 QQ群 4816230869
為了滿足關注粉絲的要求貼上說明
1、SqlBulkCopy類的構造方法
其中: conn表示一個SqlConnection對象
connStr表示數據庫連接字符串
- SqlBulkCopy(conn)
- SqlBulkCopy(connStr)
- SqlBulkCopy(connStr, SqlBulkCopyOptions copyOptions)
- SqlBulkCopy(conn, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction)
其中還有幾個陌生的對象:SqlBulkCopyOptions 和 SqlTransaction
1.1、SqlBulkCopyOptions類
這個類是一個枚舉類型:
對象 | 值 | 備注 |
Default | 0 | |
KeepIdentity | 1 | 保留源標識值。 如果未指定,則由目標分配標識值。 |
CheckConstraints | 2 | 在插入數據的同時檢查約束。 默認情況下,不檢查約束。 |
TableLock | 4 | 在批量復制操作期間獲取批量更新鎖。 如果未指定,則使用行鎖。 |
KeepNulls | 8 | 保留目標表中的空值,而不管默認值的設置如何。 如果未指定,則空值將由默認值替換(如果適用) |
FireTriggers | 16 | 指定后,會導致服務器為插入到數據庫中的行激發插入觸發器。 |
UseInternalTransaction | 32 | 如果已指定,則每一批批量復制操作將在事務中進行。 如果指示了此選項,並且為構造函數提供了 System.Data.SqlClient.SqlTransaction對象,則發生 System.ArgumentException(參數異常)。因為兩個事務沖突了。 |
1.2、SqlTransaction類
這個類是事務類,是個密封類,實現了DbTransaction抽象類
2、SqlBulkCopy類的常用屬性
屬性名 | 功能 | 備注 |
BatchSize | 設置或獲取每達到多少行就更新到服務器(也就是目標表) | 值為int, |
BulkCopyTimeout | 設置或獲取超時時間 | 默認30秒,如果設置成0,將無限制等待, 值為int,單位為秒 |
DestinationTableName | 設置或獲取服務器上的目標表的名稱 | 也就是批量更新的目標表, 值為String類型 |
EnableStreaming | 設置或獲取是否支持傳輸 IDataReader 對象的數據 | true為支持, 值為bool類型 |
NotifyAfter | 設置或獲取在生成通知事件之前要處理的行數 | 默認為0, 值為int類型, |
ColumnMappings | 獲取列映射定義數據源中的列和目標表中的列之間的映射關系 | 返回值為SqlBulkCopyColumnMappingCollection |
2.1、表中的SqlBulkCopyColumnMappingCollection類型是一個映射集合類,是目標表的列和源表的列的映射關系的集合。
這個類是一個密封類,不能被繼承,實現了一個CollectionBase抽象類。
SqlBulkCopyColumnMappingCollection沒有提供構造方法,我們也不需要去newat的對象,主要是使用它的幾個重載的Add()方法
Add()有五個重載的方法:
- SqlBulkCopyColumnMapping Add(SqlBulkCopyColumnMapping bulkCopyColumnMapping);
- SqlBulkCopyColumnMapping Add(string sourceColumn, string destinationColumn);
- SqlBulkCopyColumnMapping Add(int sourceColumnIndex, string destinationColumn);
- SqlBulkCopyColumnMapping Add(string sourceColumn, int destinationColumnIndex);
- SqlBulkCopyColumnMapping Add(int sourceColumnIndex, int destinationColumnIndex);
其中四個方法是類似的,都是對應的列名或者列的位置。
第一個方法是添加一個已經構建好的SqlBulkCopyColumnMapping對象,
他也有集合常用的方法:
方法名 | 功能 | 備注 |
Clear(); | 清除集合中的映射關系 | |
Contains(SqlBulkCopyColumnMapping value); | 判斷是否包含指定映射關系 | |
IndexOf(SqlBulkCopyColumnMapping value); | 返回指定映射關系的位置 | |
Remove(SqlBulkCopyColumnMapping value); | 移除指定映射關系 | |
RemoveAt(int index); | 移除指定位置的映射關系 | |
Insert(int index, SqlBulkCopyColumnMapping value); | 在指定位置插入映射關系 | |
CopyTo(SqlBulkCopyColumnMapping[] array, int index); | 從指定位置開始將映射關系復制到指定數組中 | index指定的集合中的位置, 而不是數組中的角標 |
3、SqlBulkCopy類的常用方法
- WriteToServer,這個方法重載了四次,功能是將數據寫到目的表中。
WriteToServer(DataRow[] rows); | 將 DataRow 數組所有元素寫到目標表中 |
WriteToServer(DataTable table); | 將 DataTable 所有行寫到目標表中 |
WriteToServer(IDataReader reader); | 將指定的 IDataReader 對象中的數據寫到目標表中 |
WriteToServer(DataTable table, DataRowState rowState); | 將 DataTable 中指定狀態的所有行寫到目標表中 |
【上表中的 DataRowState 狀態行可以參考這篇博客DataTable的AcceptChanges()方法和DataRow的RowState屬性】