C#:幾種數據庫的大數據批量插入 - faib
在之前只知道SqlServer支持數據批量插入,殊不知道Oracle、SQLite和MySql也是支持的,不過Oracle需要使用Orace.DataAccess驅動,今天就貼出幾種數據庫的批量插入解決方法。
首先說一下,IProvider里有一個用於實現批量插入的插件服務接口IBatcherProvider,此接口在前一篇文章中已經提到過了。
/// <summary>
/// 提供數據批量處理的方法。
/// </summary>
public interface IBatcherProvider : IProviderService
{
/// <summary>
/// 將 <see cref="DataTable"/> 的數據批量插入到數據庫中。
/// </summary>
/// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>
/// <param name="batchSize">每批次寫入的數據量。</param>
void Insert(DataTable dataTable, int batchSize = 10000);
}
一、SqlServer數據批量插入
SqlServer的批量插入很簡單,使用SqlBulkCopy就可以,以下是該類的實現:
/// <summary>
/// 為 System.Data.SqlClient 提供的用於批量操作的方法。
/// </summary>
public sealed class MsSqlBatcher : IBatcherProvider
{
/// <summary>
/// 獲取或設置提供者服務的上下文。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// 將 <see cref="DataTable"/> 的數據批量插入到數據庫中。
/// </summary>
/// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>
/// <param name="batchSize">每批次寫入的數據量。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
//給表名加上前后導符
var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName);
using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null)
{
DestinationTableName = tableName,
BatchSize = batchSize
})
{
//循環所有列,為bulk添加映射
dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement);
bulk.WriteToServer(dataTable);
bulk.Close();
}
}
catch (Exception exp)
{
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
}
SqlBulkCopy的ColumnMappings中列的名稱受大小寫敏感限制,因此在構造DataTable的時候應請注意列名要與表一致。
以上沒有使用事務,使用事務在性能上會有一定的影響,如果要使用事務,可以設置SqlBulkCopyOptions.UseInternalTransaction。
二、Oracle數據批量插入
System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess組件來作為提供者。
/// <summary>
/// Oracle.Data.Access 組件提供的用於批量操作的方法。
/// </summary>
public sealed class OracleAccessBatcher : IBatcherProvider
{
/// <summary>
/// 獲取或設置提供者服務的上下文。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// 將 <see cref="DataTable"/> 的數據批量插入到數據庫中。
/// </summary>
/// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>
/// <param name="batchSize">每批次寫入的數據量。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if (command == null)
{
throw new BatcherException(new ArgumentException("command"));
}
command.Connection = connection;
command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);
command.ExecuteNonQuery();
}
}
catch (Exception exp)
{
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
/// <summary>
/// 生成插入數據的sql語句。
/// </summary>
/// <param name="database"></param>
/// <param name="command"></param>
/// <param name="table"></param>
/// <returns></returns>
private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
{
var names = new StringBuilder();
var values = new StringBuilder();
//將一個DataTable的數據轉換為數組的數組
var data = table.ToArray();
//設置ArrayBindCount屬性
command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null);
var syntax = database.Provider.GetService<ISyntaxProvider>();
for (var i = 0; i < table.Columns.Count; i++)
{
var column = table.Columns[i];
var parameter = database.Provider.DbProviderFactory.CreateParameter();
if (parameter == null)
{
continue;
}
parameter.ParameterName = column.ColumnName;
parameter.Direction = ParameterDirection.Input;
parameter.DbType = column.DataType.GetDbType();
parameter.Value = data[i];
if (names.Length > 0)
{
names.Append(",");
values.Append(",");
}
names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName));
values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);
command.Parameters.Add(parameter);
}
return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
}
以上最重要的一步,就是將DataTable轉為數組的數組表示,即object[][],前數組的上標是列的個數,后數組是行的個數,因此循環Columns將后數組作為Parameter的值,也就是說,參數的值是一個數組。而insert語句與一般的插入語句沒有什么不一樣。
三、SQLite數據批量插入
SQLite的批量插入只需開啟事務就可以了,這個具體的原理不得而知。
public sealed class SQLiteBatcher : IBatcherProvider
{
/// <summary>
/// 獲取或設置提供者服務的上下文。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// 將 <see cref="DataTable"/> 的數據批量插入到數據庫中。
/// </summary>
/// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>
/// <param name="batchSize">每批次寫入的數據量。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = ServiceContext.Database.CreateConnection())
{
DbTransaction transcation = null;
try
{
connection.TryOpen();
transcation = connection.BeginTransaction();
using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if (command == null)
{
throw new BatcherException(new ArgumentException("command"));
}
command.Connection = connection;
command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable);
if (command.CommandText == string.Empty)
{
return;
}
var flag = new AssertFlag();
dataTable.EachRow(row =>
{
var first = flag.AssertTrue();
ProcessCommandParameters(dataTable, command, row, first);
command.ExecuteNonQuery();
});
}
transcation.Commit();
}
catch (Exception exp)
{
if (transcation != null)
{
transcation.Rollback();
}
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first)
{
for (var c = 0; c < dataTable.Columns.Count; c++)
{
DbParameter parameter;
//首次創建參數,是為了使用緩存
if (first)
{
parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();
parameter.ParameterName = dataTable.Columns[c].ColumnName;
command.Parameters.Add(parameter);
}
else
{
parameter = command.Parameters[c];
}
parameter.Value = row[c];
}
}
/// <summary>
/// 生成插入數據的sql語句。
/// </summary>
/// <param name="database"></param>
/// <param name="table"></param>
/// <returns></returns>
private string GenerateInserSql(IDatabase database, DataTable table)
{
var syntax = database.Provider.GetService<ISyntaxProvider>();
var names = new StringBuilder();
var values = new StringBuilder();
var flag = new AssertFlag();
table.EachColumn(column =>
{
if (!flag.AssertTrue())
{
names.Append(",");
values.Append(",");
}
names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));
values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);
});
return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
}
四、MySql數據批量插入
/// <summary>
/// 為 MySql.Data 組件提供的用於批量操作的方法。
/// </summary>
public sealed class MySqlBatcher : IBatcherProvider
{
/// <summary>
/// 獲取或設置提供者服務的上下文。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// 將 <see cref="DataTable"/> 的數據批量插入到數據庫中。
/// </summary>
/// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>
/// <param name="batchSize">每批次寫入的數據量。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if (command == null)
{
throw new BatcherException(new ArgumentException("command"));
}
command.Connection = connection;
command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);
if (command.CommandText == string.Empty)
{
return;
}
command.ExecuteNonQuery();
}
}
catch (Exception exp)
{
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
/// <summary>
/// 生成插入數據的sql語句。
/// </summary>
/// <param name="database"></param>
/// <param name="command"></param>
/// <param name="table"></param>
/// <returns></returns>
private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
{
var names = new StringBuilder();
var values = new StringBuilder();
var types = new List<DbType>();
var count = table.Columns.Count;
var syntax = database.Provider.GetService<ISyntaxProvider>();
table.EachColumn(c =>
{
if (names.Length > 0)
{
names.Append(",");
}
names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName));
types.Add(c.DataType.GetDbType());
});
var i = 0;
foreach (DataRow row in table.Rows)
{
if (i > 0)
{
values.Append(",");
}
values.Append("(");
for (var j = 0; j < count; j++)
{
if (j > 0)
{
values.Append(", ");
}
var isStrType = IsStringType(types[j]);
var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);
if (parameter != null)
{
values.Append(parameter.ParameterName);
command.Parameters.Add(parameter);
}
else if (isStrType)
{
values.AppendFormat("'{0}'", row[j]);
}
else
{
values.Append(row[j]);
}
}
values.Append(")");
i++;
}
return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
/// <summary>
/// 判斷是否為字符串類別。
/// </summary>
/// <param name="dbType"></param>
/// <returns></returns>
private bool IsStringType(DbType dbType)
{
return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength;
}
/// <summary>
/// 創建參數。
/// </summary>
/// <param name="provider"></param>
/// <param name="isStrType"></param>
/// <param name="dbType"></param>
/// <param name="value"></param>
/// <param name="parPrefix"></param>
/// <param name="row"></param>
/// <param name="col"></param>
/// <returns></returns>
private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col)
{
//如果生成全部的參數,則速度會很慢,因此,只有數據類型為字符串(包含'號)和日期型時才添加參數
if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime)
{
var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col);
var parameter = provider.DbProviderFactory.CreateParameter();
parameter.ParameterName = name;
parameter.Direction = ParameterDirection.Input;
parameter.DbType = dbType;
parameter.Value = value;
return parameter;
}
return null;
}
}
MySql的批量插入,是將值全部寫在語句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。
五、測試
接下來寫一個測試用例來看一下使用批量插入的效果。
[Test]
public void TestBatchInsert()
{
Console.WriteLine(TimeWatcher.Watch(() =>
InvokeTest(database =>
{
var table = new DataTable("Batcher");
table.Columns.Add("Id", typeof(int));
table.Columns.Add("Name1", typeof(string));
table.Columns.Add("Name2", typeof(string));
table.Columns.Add("Name3", typeof(string));
table.Columns.Add("Name4", typeof(string));
//構造100000條數據
for (var i = 0; i < 100000; i++)
{
table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());
}
//獲取 IBatcherProvider
var batcher = database.Provider.GetService<IBatcherProvider>();
if (batcher == null)
{
Console.WriteLine("不支持批量插入。");
}
else
{
batcher.Insert(table);
}
//輸出batcher表的數據量
var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher");
Console.WriteLine("當前共有 {0} 條數據", database.ExecuteScalar(sql));
})));
}
以下表中列出了四種數據庫生成10萬條數據各耗用的時間
數據庫 |
耗用時間 |
MsSql | 00:00:02.9376300 |
Oracle | 00:00:01.5155959 |
SQLite | 00:00:01.6275634 |
MySql | 00:00:05.4166891 |