使用SqlBulkCopy批量插入/更新數據


    在開發中遇到了一張表的數據因為只做了同步插入而沒有做同步更新的操作,導致了百萬數據不准確。面對大量數據需要更新,傳統的循環逐條插入以及拼接1000條數據插入都比較耗時,網上有博主做出了相關測試。

       根據以上場景,新建控制台程序。config添加數據庫連接配置,sqlHelper連接更新數據源,sqlBulkCopyHelper連接更新目標庫。

    創建sqlHelper類

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SqlBulkCopyHelper
{
    public class sqlHelper
    {

        /// <summary>
        /// 數據庫操作幫助類
        /// 此段基礎代碼為SQLServer數據庫幫助類
        ///     如需操作MySQL
        ///     1.將代碼中Sql改為MySql
        ///     2.添加MySql.Data.dll引用(可通過官網或NuGet)
        ///     3.using System.Data.SqlClient;替換為using MySql.Data.MySqlClient;
        /// </summary>
        /// <summary>
        /// 數據庫連接字符串
        /// </summary>
        private static string connectionStr =
       System.Configuration.ConfigurationManager.ConnectionStrings["ConnectionName"].ConnectionString;
        public sqlHelper() { }
        public sqlHelper(string connectionStr)
        {
            sqlHelper.connectionStr = connectionStr;
        }
        /// <summary>
        /// 得到連接對象
        /// </summary>
        /// <returns></returns>
        public static SqlConnection GetConn()
        {
            SqlConnection sqlconn = null;
            sqlconn = new SqlConnection(connectionStr);
            return sqlconn;
        }

        /// <summary>
        /// 查詢操作
        /// </summary>
        /// <param name="sql"></param>
        /// <returns></returns>
        public static DataTable GetDataTable(string sql, params SqlParameter[] sp)
        {
            using (SqlConnection conn = GetConn())
            {
                conn.Open();
                using (SqlDataAdapter sda = new SqlDataAdapter(sql, conn))
                {
                    sda.SelectCommand.Parameters.AddRange(sp);
                    DataTable dt = new DataTable();
                    sda.Fill(dt);
                    return dt;
                }
            }
        }
        /// <summary>
        /// 增刪改操作
        /// </summary>
        /// <param name="sql">sql語句</param>
        /// <returns>執行后的條數</returns>
        public static int ExecuteNonQuery(string sql, params SqlParameter[] sp)
        {
            using (SqlConnection conn = GetConn())
            {
                conn.Open();
                using (SqlCommand cmd = new SqlCommand(sql, conn))
                {
                    cmd.Parameters.AddRange(sp);
                    int i = cmd.ExecuteNonQuery();
                    return i;
                }
            }

        }

        /// <summary>
        /// 執行一條SQL語句,返回首行首列
        /// </summary>
        /// <param name="sql">sql語句</param>
        /// <returns>首行首列</returns>
        public static object ExecuteScalar(string sql, params SqlParameter[] sp)
        {
            using (SqlConnection conn = GetConn())
            {
                conn.Open();
                using (SqlCommand cmd = new SqlCommand(sql, conn))
                {
                    cmd.Parameters.AddRange(sp);
                    return cmd.ExecuteScalar();
                }
            }
        }
    }
}

    創建sqlBulkCopyHelper

/// <summary>
        /// SqlBulkCopy 批量更新數據
        /// </summary>
        /// <param name="dataTable">數據集</param>
        /// <param name="crateTemplateSql">臨時表創建字段</param>
        /// <param name="updateSql">更新語句</param>
        public static void BulkUpdateData(DataTable dataTable, string crateTemplateSql, string updateSql)
        {
            ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.PerUserRoamingAndLocal);
            using (var conn = new SqlConnection(ConfigurationManager.ConnectionStrings["DefaultConnection"].ConnectionString))
            {
                using (var command = new SqlCommand("", conn))
                {
                    try
                    {
                        conn.Open();
                        //數據庫並創建一個臨時表來保存數據表的數據
                        command.CommandText = String.Format("  CREATE TABLE #TmpTable ({0})", crateTemplateSql);
                        command.ExecuteNonQuery();

                        //使用SqlBulkCopy 加載數據到臨時表中
                        using (var bulkCopy = new SqlBulkCopy(conn))
                        {
                            foreach (DataColumn dcPrepped in dataTable.Columns)
                            {
                                bulkCopy.ColumnMappings.Add(dcPrepped.ColumnName, dcPrepped.ColumnName);
                            }

                            bulkCopy.BulkCopyTimeout = 660;
                            bulkCopy.DestinationTableName = "#TmpTable";
                            bulkCopy.WriteToServer(dataTable);
                            bulkCopy.Close();
                        }

                        // 執行Command命令 使用臨時表的數據去更新目標表中的數據  然后刪除臨時表
                        command.CommandTimeout = 300;
                        command.CommandText = updateSql;
                        command.ExecuteNonQuery();
                    }
                    finally
                    {
                        conn.Close();
                    }
                }
            }
        }

    Program代碼

/// <summary>
        /// 更新表數據
        /// </summary>
        public static void update()
        {
            String sqlstring = @"";

            System.Diagnostics.Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start(); //  開始監視代碼運行時間
            DataTable dt = sqlHelper.GetDataTable(sqlstring);
            stopwatch.Stop(); //  停止監視
            Console.WriteLine("執行查詢sql用時:" + stopwatch.Elapsed.TotalSeconds + "秒,共查詢到:" + dt.Rows.Count + "");

            String updateSql = @"Merge into Table AS T 
Using #TmpTable AS S 
ON (T.order_no = S.order_no and T.item_code = S.item_code )
WHEN MATCHED 
THEN UPDATE SET T.[qty]=S.[qty],T.[total_amount]=S.[total_amount];";
            String crateTemplateSql = @"
    [order_no] [varchar](32) NULL,
    [qty] [int] NULL,
    [total_amount] [decimal](18, 2) NULL,
    [item_code] [varchar](32) NULL,";

            for (int i = 0; i < (dt.Rows.Count + 10000 - 1) / 10000; i++)
            {
                System.Diagnostics.Stopwatch stopwatch2 = new Stopwatch();
                stopwatch2.Start();
                sqlBulkCopyHelper.BulkUpdateData(dt.AsEnumerable().Skip(i * 10000).Take(10000).CopyToDataTable(), crateTemplateSql, updateSql);
                stopwatch2.Stop();
                Console.WriteLine("更新第" + (i + 1) + "次耗時:" + stopwatch2.Elapsed.TotalSeconds + "秒,剩余" + ((dt.Rows.Count + 10000 - 1) / 10000 - i - 1) + "");
            }
            Console.ReadLine();
        }

    1.更新的時候,datatable數據量過大內存不夠用,這里是分了一下頁。

    2.還需要注意的就是sqlBulkCopy在使用的時候,視圖或者是源表的字段大小寫、類型必須與目標表一致。

    3. Merge,使用merger語句可以將插入、更新、刪除合並成一句,完成存在就更新不存在就插入的需求。

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM