Asp.Net Core中使用FTP讀取大文件並使用SqlBulkCopy實現大批量插入SQL SERVER數據庫


  背景

  在介紹整個原理和代碼之前,我們來交代一下整個過程的背景,我們的系統作為一個下游的DMS系統,需要每天定期讀取第三方接口傳入數據,由於第三方接口每天傳入的數據有上百萬條主機廠備件庫存數據,之前通過Kafka消息接口進行傳輸的話效率低、速度慢而且容易出錯,所以后面和第三方統一意見采用FTP方式進行發送和接收,這樣我們只需每天去他們的FTP服務器上面去讀取他們的文件並實時插入到我們的SQL SERVER數據庫中即可,整個過程的重點就是兩個部分:1 從第三方FTP服務器讀取大文件 2 大量數據插入SQL SERVER數據庫,后面的部分重點也是這兩個部分,一是要能夠快速讀取這些數據,二是要能夠快速插入數據庫而不能夠拖慢數據庫性能。

  一 FTP服務器讀取大文件

  1.1 安裝Nuget包

  第一個部分就是怎樣從第三方的FTP服務器上面讀取到對應的文件信息呢?Asp.Net Core中有哪些比較成熟的方案呢?通過調研我們發現FluentFTP這個包能夠完全適應我們的要求,所以第一步就是項目中安裝FluentFTP這個包,具體的一些介紹也可以參考README.md上面的介紹,安裝了這個包之后就是具體的配置和代碼讀取文件的方式了。

  1.2 配置FTP相關地址

  和所有其它遠程登錄形式相同,這個包也是通過FTPClient的形式實現的,所以第一步我們就需要配置具體的FTP服務的地址,這個在appsettings.json中進行配置(這個在實際的生產環境也可以配置在響應的k8s的pod上面),這個配置包括服務器地址、用戶名和密碼等相關信息,如下面所示:

 "Eai": {
    "Addresses": {
      "PartInfo": "ftp://testuser:testpassword@192.168.17.42:21/CheryDcsParts",
      "BranchPartStockInfo": "ftp://testuser:testpassword@192.168.17.42:21/CheryDcsParts"
    }
  }

  1.3 讀取FTP服務器上面的文件

  這個部分我們主要是通過具體的代碼來說明,這里特別需要說明,下面的代碼中我們會將FTP文件讀取到內容並插入到數據庫中,這個子函數的參數由兩部分構成,filePath和instanceId兩個,這個是有第三方接口主動調用我們的WebApi接口時傳輸的,由於讀取的文件可能存在各種錯誤,所以我們進行一行行讀取的時候會判斷當前行是否有錯誤,我們會將整個結果分成兩個部分,一個是正確的數據,另外一個部分是錯誤的數據,這兩個部分最終會插入到數據庫中不同的表中進行保存,便於日后排查數據。

 public async Task<FtpResultModel> GetResultFromFtpAsync(string filePath, string instanceId) {
            var inputs = new List<BranchPartStockTempModel>();
            var storagePath = _configuration.GetValue<string>("Local:BranchPartStockInfo");
            var ftpBasePath = new UriBuilder(_configuration.GetValue<string>("EAI:Addresses:BranchPartStockInfo"));
            Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
            //可以對中文進行編碼
            var gb2312Encoding = Encoding.GetEncoding("GB2312");
            // 受限於 Kubernetes Network,無法使用 Active Mode 訪問 FTP 服務端
            // 完整的文件路徑 = FTP 用戶的當前目錄 + filePath
            IFtpClient ftpClient = new FtpClient(ftpBasePath.Host, ftpBasePath.Port, ftpBasePath.UserName, ftpBasePath.Password) {
                DataConnectionType = FtpDataConnectionType.PASV
            };

            //使用瀏覽器,查看ftp路徑為ftp://10.2.14.237/interface/erp/zsdeai2016/
            //通過GetWorkingDirectory(),會獲取到 /oracle/eaiftp/ 導致路徑錯誤
            //var remoteFilePath = Path.Combine(ftpClient.GetWorkingDirectory(), filePath);
            var remoteFilePath = Path.Combine(ftpBasePath.Path, filePath);
            _logger.LogInformation("Loading file {0} from FTP.", remoteFilePath);
            using var fileStream = await ftpClient.OpenReadAsync(remoteFilePath);
            using var reader = new StreamReader(fileStream, gb2312Encoding);
            //備份文件
            var localFilePath = CombinePath(storagePath, $"instanceId.{Path.GetRandomFileName()}.txt");
            //判斷目錄是否存在
            if (!Directory.Exists(storagePath)) {
                Directory.CreateDirectory(storagePath);
            }

            using var fileStreamBack = new StreamWriter(localFilePath);
            //錯誤數據日志
            var errorLogs = new List<BranchPartStockSyncErrorLog>();
            //解析數據
            string line;
            var i = 0;
            while ((line = await reader.ReadLineAsync()) != null) {
                var hasError = false;
                await fileStreamBack.WriteLineAsync(line);
                ++i;
                if (string.IsNullOrWhiteSpace(line))
                    continue;
                var value = line.Split('$', StringSplitOptions.RemoveEmptyEntries);

                if (value.Length < 4) {
                    CreatePartSyncLog(filePath, instanceId, errorLogs, i, null, null, SharedLocalizer["當前行,數據結構有誤。缺少列"]);
                } else {
                    //數據結構:第一列是件號,第二列是公司(集團),第三列是倉庫,第四列是庫存數量
                    var partCode = GetValueByIndex(value, 0);
                    var warehouseCode = GetValueByIndex(value, 2);

                    if (string.IsNullOrWhiteSpace(warehouseCode)) {
                        hasError = true;
                        CreatePartSyncLog(filePath, instanceId, errorLogs, i, warehouseCode, partCode, SharedLocalizer["倉庫編號不能為空"]);
                    }

                    if (string.IsNullOrWhiteSpace(partCode)) {
                        hasError = true;
                        CreatePartSyncLog(filePath, instanceId, errorLogs, i, warehouseCode, partCode, SharedLocalizer["配件名稱不能為空"]);
                    }

                    //獲取的庫存數量
                    var stockQuantityStr = GetValueByIndex(value, 3);

                    if (!decimal.TryParse(stockQuantityStr, out var stockQuantity)) {
                        hasError = true;
                        CreatePartSyncLog(filePath, instanceId, errorLogs, i, warehouseCode, partCode, SharedLocalizer["傳入的庫存數量格式不正確"]);
                    }

                    if (hasError) {
                        continue;
                    }

                    var branchPartStockTempModel = new BranchPartStockTempModel {
                        WarehouseCode = warehouseCode,
                        PartCode = partCode,
                        StockQuantity = stockQuantity,
                        LineNumber = i
                    };

                    inputs.Add(branchPartStockTempModel);
                }
            }

            _logger.LogInformation($"收到Sap備件, 數量 {inputs.Count + errorLogs.Count},其中正確:{inputs.Count}條,錯誤:{errorLogs.Count}條");

            return new FtpResultModel {
                ValidData = inputs,
                ErrorLogs = errorLogs
            };
        }

  這個里面還有一點需要注意的地方就是,我們讀取的FileStream以及StreamReader都是.Net 中非托管的對象,需要在使用之后進行手動釋放,否則會造成無法及時釋放內存造成內存使用率增加,這個在使用的時候需要特別注意,所以在我們的代碼中都使用了using前綴從而在使用完畢后及時釋放非托管內存。此外我們的代碼中還是需要使用ILogger及時記錄一些關鍵信息從而方便線上環境進行問題排查。

  二 使用SqlBulkCopy實現大批量插入SQL SERVER數據庫

  在上面的部分我們已經將FTP服務器上面的數據讀取到了內存中,接下來我們就演示如何將我們特定的數據通過SqlBulkCopy插入到數據庫中,在開始之前你也可以點擊這里來閱讀SqlBulkCopy的官方文檔。

  這個部分我們也是通過下面的代碼來一步步講述。

 /// <summary>
        /// 插入分公司備件庫存
        /// </summary>
        /// <param name="filePath">文件路徑</param>
        /// <param name="instanceId">實例Id</param>
        /// <param name="branchPartStockModels">分公司備件庫存實體</param>
        /// <param name="errorBranchPartStockModels">錯誤信息實體</param>
        /// <returns></returns>
        [UnitOfWork(isTransactional: false)]
        public async Task InsertBranchPartStockAsync(string filePath, string instanceId, IList<SyncGetBranchPartStockModel> branchPartStockModels, IList<BranchPartStockSyncErrorLog> errorBranchPartStockModels) {
            var transaction = Connection.BeginTransaction();
            var command = Connection.CreateCommand();
            try {
                const int CommandTimeOut = 5 * 60;
                command.Transaction = transaction;
                command.CommandText = _tableGenerator.GenerateTableScript(typeof(SyncGetBranchPartStockModel), DataBaseType.SqlServer);
                command.CommandTimeout = CommandTimeOut;
                await command.ExecuteNonQueryAsync();
                await new BulkUploadToSql().CommitAsync(branchPartStockModels, transaction);
                command.CommandText = MergeBranchPartStock(filePath, instanceId);
                await command.ExecuteNonQueryAsync();

                await new BulkUploadToSql().CommitAsync(errorBranchPartStockModels, transaction);

                transaction.Commit();
            } finally {
                command?.Dispose();
                transaction?.Dispose();
            }
        }

  這段代碼中Connection是基類中對外公開的一個屬性,指的是當前DbContext對應的唯一的Connection,這里面所有的操作都是通過ExecuteNonQuery這個來實現的這個大家應該非常熟悉了,這段代碼中還使用到了_tableGenerator.GenerateTableScript這個方法,這個方法主要是通過定義一個實體Model用代碼來創建數據庫臨時表方法,如果想了解更多內容,請點擊這里查看,這個里面最核心的部分就是下面的這段代碼了。

new BulkUploadToSql().CommitAsync(branchPartStockModels, transaction);

  這里我們來重點看看我們在BulkUploadToSql的方法中到底實現了些什么?

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Sunlight.DataAnnotations;

namespace Sunlight.Dcs.Parts.Data.EntityFrameworkCore {
    public class BulkUploadToSql : IBulkUploadToSql {
        public string TableName { get; set; }
        public int CommitBatchSize { get; set; } = 1000;

        public async Task CommitAsync<T>(IList<T> sourceData, IDbTransaction transaction) {
            if (sourceData.Count <= 0) {
                return;
            }
            GetTableName(typeof(T));

            // make sure to enable triggers
            // more on triggers in next post
            if (!(transaction.Connection is SqlConnection connection))
                throw new ValidationException("當前只支持SqlServer, 其它數據庫需要開發");

            if (connection.State != ConnectionState.Open)
                connection.Open();
            using var bulkCopy = new SqlBulkCopy(connection,
                SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepNulls, (SqlTransaction)transaction
            ) { DestinationTableName = TableName };

            using var dt = new DataTable();
            var numberOfPages = (sourceData.Count / CommitBatchSize) + (sourceData.Count % CommitBatchSize == 0 ? 0 : 1);
            for (var pageIndex = 0; pageIndex < numberOfPages; pageIndex++) {
                sourceData.Skip(pageIndex * CommitBatchSize).Take(CommitBatchSize).ToDataTable(dt);
                // write the data in the "dataTable"
                await bulkCopy.WriteToServerAsync(dt);
                // reset
                //this.dataTable.Clear();
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <returns></returns>
        private void GetTableName(MemberInfo entityType) {
            if (TableName != null)
                return;
            var prefix = entityType.GetCustomAttribute<TempTableAttribute>() != null ? "#" : string.Empty;
            TableName = $"{prefix}{entityType.GetCustomAttribute<TableAttribute>()?.Name ?? entityType.Name}";
        }
    }

    public static class BulkUploadToSqlHelper {
        public static void ToDataTable<T>(this IEnumerable<T> data, DataTable table) {
            table.Reset();
            var properties =
                TypeDescriptor.GetProperties(typeof(T));
            foreach (PropertyDescriptor prop in properties)
                table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
            foreach (var item in data) {
                var row = table.NewRow();
                foreach (PropertyDescriptor prop in properties)
                    row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
                table.Rows.Add(row);
            }
        }
    }
}

  這個里面CommitAsync是整個類核心,這個方法里面的sourceData就是我們從FTP服務器上面讀取到的大批量文件,大概有100萬條數據,這么多的數據如果通過Insert進行一條一條的插入那簡直就是一種災難。在我們的代碼中我們也是通過創建一個SqlBulkCopy對象來分頁插入的,每次都是插入1000條,當然每次插入的數量也是可以根據我們的需要來進行調整的,這個里面我們可以看一下微軟官方文檔中最重要的一段對SqlBulkCopy中的描述:

  SqlBulkCopy 類可用於只將數據寫入 SQL Server 表。 但是,數據源不限於 SQL Server;可以使用任何數據源,只要數據可以加載到 DataTable 實例或使用 IDataReader 實例進行讀取即可。

  所以我們的第一步工作就是將我們內存中的數據讀取到DataTable中去,后面我們就能夠將DataTable中的數據實現批量拷貝到數據庫中的目的了。

  通過上面的兩個過程我們就能夠完整地表現這篇博客的主題,主要包括:1 如何使用FluentFTP來讀取文件。2 如何使用SqlBulkCopy一次插入大量數據到數據庫中。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM