EF批量插入太慢?那是你的姿勢不對


大概所有的程序員應該都接觸過批量插入的場景,我也相信任何的程序員都能寫出可正常運行的批量插入的代碼。但怎樣實現一個高效、快速插入的批量插入功能呢?

由於每個人的工作履歷,工作年限的不同,在實現這樣的一個需求時,可能技術選型各有不同,有直接生成insert語句的,有用EF的或者其他的orm框架的。其實不管是手寫insert還是使用EF,最終交給數據庫執行的還是insert語句。下面是EF批量插入的示例代碼:

var list = new List<Student>();
for (int i = 0; i < 100; i++)
{
    list.Add(new Student { CreateTime = DateTime.Now, Name = "zjjjjjj" });
}

await _context.Students.AddRangeAsync(list);
await _context.SaveChangesAsync();

生成的腳本截圖如下:
zzz

這種實現方式在數據量100以內時,耗時還算可以。但如果要批量導入的數據達到萬級的時候,那耗時簡直是災難。我測試的數據如下(測試數據庫為mysql,具體配置不詳):

數據量 耗時(s)
10 0.028
1w 3.929
10w 31.280

10w的數據已經耗時超過了30s,我沒有勇氣測試100w數據的耗時,有興趣的可以自行測試下。

下面就應該進入正題了,對於較大數據量(1000以上)場景下的批量插入,各個數據庫應該都提供了相關的解決方案,由於工作所限,目前筆者僅接觸過mysql和mssql。

mysql的實現方案是LOAD DATA命令,此命令接收一個csv文件,然后將文件上傳到數據庫服務器后,解析數據后插入。好在MySqlConnector提供了相關的封裝,不用咱們去熟悉那么復雜的命令參數。

mssql實現的方案是使用SqlBulkCopy類,不過此類僅接收DataTable類型的數據,所以,在批量插入的時候,需要將數據源轉換成DataTable。

綜上所示,不管是mysql,還是mssql,均需要將數據源轉換成指定的格式才可以使用批量導入的功能,所以這一塊的主要核心就是轉換數據源格式。mysql需要轉換成csv,mssql需要轉換成DataTable。下面就來一起看看具體的轉換的方法。

以下代碼是轉換csv和DataTable相關方法:

namespace FL.DbBulk
{
    public static class Extension
    {
        /// <summary>
        /// 獲取實體影射的表名
        /// </summary>
        /// <param name="type"></param>
        /// <returns></returns>
        public static string GetMappingName(this System.Type type)
        {
            var key = $"batch{type.FullName}";

            var tableName = CacheService.Get(key);
            if (string.IsNullOrEmpty(tableName))
            {
                var tableAttr = type.GetCustomAttribute<TableAttribute>();
                if (tableAttr != null)
                {
                    tableName = tableAttr.Name;
                }
                else
                {
                    tableName = type.Name;
                }
                CacheService.Add(key, tableName);
            }
            return tableName;
        }

        public static List<EntityInfo> GetMappingProperties(this System.Type type)
        {
            var key = $"ICH.King.DbBulk{type.Name}";
            var list = CacheService.Get<List<EntityInfo>>(key);
            if (list == null)
            {
                list = new List<EntityInfo>();
                foreach (var propertyInfo in type.GetProperties())
                {
                    if (!propertyInfo.PropertyType.IsValueType &&
                        propertyInfo.PropertyType.Name != "Nullable`1" && propertyInfo.PropertyType != typeof(string)) continue;
                    var temp = new EntityInfo();
                    temp.PropertyInfo = propertyInfo;
                    temp.FieldName = propertyInfo.Name;
                    var attr = propertyInfo.GetCustomAttribute<ColumnAttribute>();
                    if (attr != null)
                    {
                        temp.FieldName = attr.Name;
                    }
                    temp.GetMethod = propertyInfo.CreateGetter();
                    list.Add(temp);
                }
                CacheService.Add(key, list);
            }

            return list;
        }

        /// <summary>
        /// 創建cvs字符串
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="entities"></param>
        /// <param name="primaryKey"></param>
        /// <returns></returns>
        public static string CreateCsv<T>(this IEnumerable<T> entities, string primaryKey = "")
        {
            var sb = new StringBuilder();
            var properties = typeof(T).GetMappingProperties().ToArray();
            foreach (var entity in entities)
            {
                for (int i = 0; i < properties.Length; i++)
                {
                    var ele = properties[i];
                    if (i != 0) sb.Append(",");
                    var value = ele.Get(entity);
                    if (ele.PropertyInfo.PropertyType.Name == "Nullable`1")
                    {
                        if (ele.PropertyInfo.PropertyType.GenericTypeArguments[0] == typeof(DateTime))
                        {
                            if (value == null)
                            {
                                sb.Append("NULL");
                            }
                            else
                            {
                                sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
                            }
                            continue;
                        }
                    }

                    if (ele.PropertyInfo.PropertyType == typeof(DateTime))
                    {
                        sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
                        continue;
                    }
                    //如果是主鍵&&string類型,且值不為空
                    if (ele.FieldName == primaryKey && ele.PropertyInfo.PropertyType == typeof(string))
                    {
                        sb.Append(Guid.NewGuid().ToString());
                        continue;
                    }
                    if (value == null)
                    {
                        continue;
                    }
                    if (ele.PropertyInfo.PropertyType == typeof(string))
                    {
                        var vStr = value.ToString();
                        if (vStr.Contains("\""))
                        {
                            vStr = vStr.Replace("\"", "\"\"");
                        }
                        if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n"))
                        {
                            vStr = $"\"{vStr}\"";
                        }
                        sb.Append(vStr);
                    }
                    else sb.Append(value);
                }
                sb.Append(IsWin() ? "\r\n" : "\n");
                //sb.AppendLine();
            }

            return sb.ToString();
        }

        public static bool IsWin()
        {
            return RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
        }

        public static string CreateCsv(this DataTable table)
        {
            StringBuilder sb = new StringBuilder();
            DataColumn colum;
            foreach (DataRow row in table.Rows)
            {
                for (int i = 0; i < table.Columns.Count; i++)
                {
                    colum = table.Columns[i];
                    if (i != 0) sb.Append(",");
                    if (colum.DataType == typeof(string))
                    {
                        var vStr = row[colum].ToString();
                        if (vStr.Contains("\""))
                        {
                            vStr = vStr.Replace("\"", "\"\"");
                        }
                        if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n"))
                        {
                            vStr = $"\"{vStr}\"";
                        }
                        sb.Append(vStr);
                    }
                    else sb.Append(row[colum]);
                }
                sb.Append(IsWin() ? "\r\n" : "\n");
            }
            return sb.ToString();
        }


        public static DataTable ToDataTable<T>(this IEnumerable<T> list, string primaryKey = "")
        {
            var type = typeof(T);
            //獲取實體映射的表名
            var mappingName = type.GetMappingName();
            var dt = new DataTable(mappingName);
            //獲取實體映射的屬性列表
            var columns = type.GetMappingProperties();
            dt.Columns.AddRange(columns.Select(x => new DataColumn(x.FieldName)).ToArray());
            foreach (var data in list)
            {
                var row = dt.NewRow();
                foreach (var entityInfo in columns)
                {
                    var value = entityInfo.Get(data);
                    if (primaryKey == entityInfo.FieldName && entityInfo.PropertyInfo.PropertyType == typeof(string))
                    {
                        row[entityInfo.FieldName] = value ?? Guid.NewGuid().ToString();
                    }
                    else
                    {
                        row[entityInfo.FieldName] = value;
                    }
                }
                dt.Rows.Add(row);
            }

            return dt;
        }

       
    }
}

轉換成DataTable方法相對簡單,但這里我做了個優化下,當判斷主鍵是string類型,且值為空時,會自動生成一個GUID,並給其賦值,這樣做的目的是為了和EF原生的插入功能兼容。

生成Csv的相對比較麻煩,因為Csv是用逗號以及其他符號來區分每一行、每一列數據,但經常會存在要插入的數據包含了csv的特殊符號,這樣情況下就需要做轉義。另外,還有一個需要考慮的問題,linux和windows默認的換行符是有區別的,windows的換行符為\r\n,而linux默認的是\n,所以在生成csv時,需要根據不同的系統進行處理。

下面來看下具體怎么調用相關的插入方法,首先看下mysql的,主要代碼如下所示:

private async Task InsertCsvAsync(string csv, string tableName, List<string> columns)
{
    var fileName = Path.GetTempFileName();
    await File.WriteAllTextAsync(fileName, csv);
    var conn = _context.Database.GetDbConnection() as MySqlConnection;
    var loader = new MySqlBulkLoader(conn)
    {
        FileName = fileName,
        Local = true,
        LineTerminator = Extension.IsWin() ? "\r\n" : "\n",
        FieldTerminator = ",",
        TableName = tableName,
        FieldQuotationCharacter = '"',
        EscapeCharacter = '"',
        CharacterSet = "UTF8"
    };
    loader.Columns.AddRange(columns);
    await loader.LoadAsync();
}

在上述的代碼中,首先創建一個臨時文件,然后將其他數據源轉換的csv內容寫入到文件中,獲取數據庫連接,再然后創建MySqlBulkLoader類的實例,將相關參數進行復制后,還需要配置字段列表,最后執行LoadAsync命令。

下面是mssql的批量插入的核心代碼:

public async Task InsertAsync(DataTable table)
{
    if (table == null)
    {
        throw new ArgumentNullException();
    }

    if (string.IsNullOrEmpty(table.TableName))
    {
        throw new ArgumentNullException("DataTable的TableName屬性不能為空");
    }
    var conn = (SqlConnection)_context.Database.GetDbConnection();
    await conn.OpenAsync();
    using (var bulk = new SqlBulkCopy(conn))
    {
        bulk.DestinationTableName = table.TableName;
        foreach (DataColumn column in table.Columns)
        {
            bulk.ColumnMappings.Add(column.ColumnName, column.ColumnName);
        }
        await bulk.WriteToServerAsync(table);
    }
}

以上方法相對簡單,在此不做更多解釋。

至此,mysql和mssql批量的導入的方案已經介紹完畢,但可能就會有人說了,這跟EF好像也沒什么關系呀。
其實如果你有仔細看的話,或許能發現,我在代碼中使用了一個名為_context字段,此字段其實就是EF的DbContext的實例。但文章內容到此時也沒有完全的和EF結合,下面就來介紹下如何更優雅的將此功能集成到EF中。

在.net core中,接入EF的時候其實已經指定了使用的數據庫類型,實例代碼如下:

 services.AddDbContext<MyDbContext>(opt => opt.UseMySql("server=10.0.0.146;Database=demo;Uid=root;Pwd=123456;Port=3306;AllowLoadLocalInfile=true"))

既然以及指定了數據庫類型,那么在調用批量插入的時候,應該就不需要讓調用者判斷是使用mysql的方法,還是mssql的方法。具體怎么設計呢?且耐心往下看。

首先分別定義接口ISqlBulk,IMysqlBulk,ISqlServerBulk代碼如下:

namespace FL.DbBulk
{
    public interface ISqlBulk
    {
        /// <summary>
        /// 批量導入數據
        /// </summary>
        /// <param name="table">數據源</param>
        void Insert(DataTable table);
        /// <summary>
        /// 批量導入數據
        /// </summary>
        /// <param name="table">數據源</param>
        Task InsertAsync(DataTable table);

        void Insert<T>(IEnumerable<T> enumerable) where T : class;
        Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class;
    }
}

IMysqlBulk,ISqlServerBulk接口繼承ISqlBulk,代碼如下:

namespace FL.DbBulk
{
    public interface IMysqlBulk : ISqlBulk
    {
        Task InsertAsync<T>(string csvPath, string tableName = "") where T : class;
    }
}
namespace FL.DbBulk
{
    public interface ISqlServerBulk:ISqlBulk
    {
        
    }
}

然后創建ISqlBulk實現類:

namespace FL.DbBulk
{
    public class SqlBulk : ISqlBulk
    {
        private ISqlBulk _bulk;
        public SqlBulk(DbContext context, IServiceProvider provider)
        {
            if (context.Database.IsMySql())
            {
                _bulk = provider.GetService<IMysqlBulk>();
            }
            else if (context.Database.IsSqlServer())
            {
                _bulk = provider.GetService<ISqlServerBulk>();
            }
        }

        public void Insert(DataTable table)
        {
            _bulk.Insert(table);
        }

        public async Task InsertAsync(DataTable table)
        {
            await _bulk.InsertAsync(table);
        }

        public void Insert<T>(IEnumerable<T> enumerable) where T : class
        {
            _bulk.Insert(enumerable);
        }

        public async Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class
        {
            await _bulk.InsertAsync(enumerable);
        }

    }
}

在SqlBulk的構造函數中,通過context.Database的擴展方法判斷數據庫的類型,然后再獲取相應的接口的實例。再然后就是實現IMysqlBulk和ISqlServerBulk的實現類。上文已經把核心代碼貼出,再此為了篇幅,就不貼完整代碼了。

再然后,就是提供一個注入services的方法,代碼如下:

namespace Microsoft.Extensions.DependencyInjection
{
    public static class ServiceCollectionExtension
    {
        public static IServiceCollection AddBatchDB<T>(this IServiceCollection services) where  T:DbContext
        {
            services.TryAddScoped<IMysqlBulk, MysqlBulk>();
            services.TryAddScoped<ISqlServerBulk, SqlServerBulk>();
            services.TryAddScoped<ISqlBulk, SqlBulk>();
            services.AddScoped<DbContext, T>();
            return services;
        }
    }
}

有了以上代碼,我們就可以通過在Startup中很方便的啟用批量插入的功能了。

最后,貼出兩種插入方式對比的測試數據:

數據量 EF默認耗時(s) ISqlBulk耗時(s)
10 0.028 0.030
1w 3.929 1.581
10w 31.280 15.408

以上測試數據均是使用同一個mysql數據庫,不同配置以及網絡環境下,測試的數據會有差異,有興趣的可以自己試試。

至此,本人內容已完畢。


最后,貼出git地址,如果思路或代碼可以幫到你,歡迎點贊,點star
https://github.com/fuluteam/FL.DbBulk.git

福祿ICH·架構組 福爾斯


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM