批量數據入庫


下面主要介紹數據庫批量操作數據(主要是 Insert)的方法,涉及 SQL Server、DB2、MySQL 等。

SQL Server

首先,准備工作,新建一個數據庫實例

create database Stu_Sqh

在數據庫實例中新建一張數據表:學生信息表

1 CREATE TABLE [dbo].[StudentInfo](
2     [NAME] [varchar](20) NOT NULL,
3     [sex] [varchar](5) NULL,
4     [BIRTH] [date] NOT NULL,
5     [AGE]  AS (datediff(year,[BIRTH],getdate())),
6     [ID] [int] IDENTITY(101,1) NOT NULL,
7     [SchoolID] [varchar](5) NULL
8 )
StudentInfo

為聯表查詢做准備,另新建一張數據表:學校信息表

1 CREATE TABLE [dbo].[SchoolInfo](
2     [ID] [varchar](5) NOT NULL,
3     [NAME] [varchar](10) NOT NULL,
4     [City] [varchar](10) NOT NULL
5 )
SchoolInfo

同時分別為兩張表添加主外鍵約束

alter table [SchoolInfo]
 add constraint PK_SchoolInfo primary key(ID)

alter table [StudentInfo]
 add constraint PK_StudentInfo primary key(ID), 
     constraint FK_StudentInfo_SchoolInfo foreign key(SchoolID) references SchoolInfo(ID)

定義一個學習信息類:Schoolnfo

1 /// <summary>
2     /// 學校信息
3     /// </summary>
4     public class SchoolInfo
5     {
6         public string ID { get; set; }
7         public string Name { get; set; }
8         public string City { get; set; }
9     }
SchoolInfo

注意,務必帶着 { get; set; },用於后面通過反射獲取參數的屬性相關信息。

在進行插入之前,先封裝一個類,實現數據庫插入操作

 1 public class DataOperation
 2 {
 3     // 數據庫連接串
 4     private const string DBConnectionString = Const.DBConnectionString;
 5     // 數據庫連接
 6     private static SqlConnection db_Connection = null;
 7     // 日志記錄組件
 8     public static ILog Logger = null;
 9 
10     /// <summary>
11     /// 創建數據庫連接
12     /// </summary>
13     public static void CreatDBConnection()
14     {
15         if (db_Connection != null)
16         {
17             Logger.Warn("DB connection already exits");
18             return;
19         }
20         else
21         {
22             db_Connection = new SqlConnection(DBConnectionString);
23             db_Connection.Open();
24             Logger.Warn("DB connection open success");
25         }
26     }
27 
28     /// <summary>
29     /// 釋放數據庫連接
30     /// </summary>
31     public static void DestroyDBConnection()
32     {
33         if (db_Connection == null)
34         {
35             Logger.Warn("DB connection already close");
36             return;
37         }
38         else
39         {
40             if (db_Connection.State != ConnectionState.Closed)
41             {
42                 db_Connection.Close();
43             }
44             db_Connection.Dispose();
45             Logger.Warn("DB connection close success");
46         }
47     }
48 
49     /// <summary>
50     /// 普通insert方法
51     /// </summary>
52     public static bool Insert(string sql)
53     {
54         try
55         {
56             CreatDBConnection();
57             SqlCommand command = new SqlCommand(sql, db_Connection);
58             command.ExecuteNonQuery();
59         }
60         catch(Exception ex)
61         {
62             string msg = string.Format("Insert() 執行異常: {0} {1}", ex.Message, ex.StackTrace);
63             Logger.Error(msg);
64             return false;
65         }
66 
67         return true;
68     }    
69 }
DataOperation - Insert

其中,數據庫連接串設置如下

/// <summary>
/// 數據庫連接串:本地
/// </summary>
public const string DBConnectionString = "server=.;database=Stu_Sqh;Trusted_Connection=True";

自然而然,最常用的是普通的 insert 方法,可以插入一條或多條數據

1 // 單條插入
2 insert into SchoolInfo values('10286', 'seu', 'NanJing')
3 // 多條插入
4 insert into SchoolInfo
5 values('108', 'pku', 'BeiJing'),
6       ('109', 'hdu', 'HangZhou'),
7       ... ...
8       ('110', 'dltg', 'DaLian')
Insert - 數據庫SQL實現

或者直接通過代碼實現

string sqlInsert = string.Format(@"insert into SchoolInfo values('xxx','xxx', 'xxx')");
bool insertFlag = DataOperation.Insert(sqlInsert);

或者通過在代碼中以 拼接SQL 的形式的組裝要執行的SQL語句

 1 StringBuilder sb = new StringBuilder();
 2 sb.Append("INSERT INTO SchoolInfo(Id,Name,City) VALUES");
 3 using( SqlCommand command = new SqlCommand() )
 4 {
 5     // 拼接SQL
 6     for(int i=0; i<list.count; ++i) {
 7         sb.AppendFormat("('{0}','seu{1}','{2}'),", 
 8              i.ToString(), i.ToString(), NJing);
 9     }
10     
11     CreatDBConnection();
12     command.Connection = db_Connection;
13     command.CommandText = sb.ToString().TrimEnd(',');
14     command.ExecuteNonQuery();
15 }
拼接SQL

但是,該方法在 C# 中有限制,一次性只能批量插入1000條,大數據量只能分批次插入。

綜上所述,在大數據量的情況下,普通的 insert 方法插入批量數據執行太慢。

參見.NET批量大數據插入性能分析及比較C#批量插入數據到Sqlserver中的幾種方式

 

下面提供幾種方法,提高批量數據插入效率:

1iBatista/MyBatis

// 待更新 ...

參考MyBatis 批量插入數據

2SqlBulkCopy

從 .Net 2.0 提供 SqlBulkCopy 對象,支持一次性批量插入數據,效率高。

首先,封裝一個獲取 DataTable 對象的方法,采用泛型參數

 1 public class DBHelper
 2 {
 3     public static DataTable GetDataTable<T>(List<T> list) 
 4     {
 5         DataTable dt = new DataTable();
 6 
 7         // 反射獲取泛型參數類型、泛型參數的屬性名稱和屬性類型
 8         Type type = typeof(T);
 9         PropertyInfo[] propInfos = type.GetProperties();
10         foreach (var pi in propInfos) {
11             dt.Columns.Add(pi.Name, 
12                 System.Type.GetType(pi.PropertyType.FullName));
13         }
14 
15         // 組裝 DataTable
16         foreach (T t in list)
17         {
18             DataRow dr = dt.NewRow();
19             foreach (var pi in propInfos)
20             {
21                 string key = pi.Name;
22                 object value = pi.GetValue(t, null);
23                 dr[key] = value;
24             }
25             dt.Rows.Add(dr);
26         }
27 
28         return dt;
29     }
30 }
GetDataTable

注意,通過反射 Type.GetProperties() 方法,要保證參數 T 中的屬性定義包括 { get; set; } 。

然后在 DataOperation 中新增一個 BulkInsert() 方法,利用 SqlBulkCopy 插入批量數據

 1 /// <summary>
 2 /// SqlBulkCopy Insert 方法
 3 /// </summary>
 4 public static bool BulkInsert(DataTable dt)
 5 {
 6     SqlCommand command = null;
 7     SqlTransaction sqlBulkTrans = null;
 8     SqlBulkCopy sqlBulkCopy = null;
 9     try
10     {
11         CreatDBConnection();
12 
13         command = db_Connection.CreateCommand();
14         sqlBulkTrans = db_Connection.BeginTransaction();
15         command.Transaction = sqlBulkTrans;
16 
17         // 插入數據的同時檢查約束,如果發生錯誤則調用sqlbulkTransaction事務
18         sqlBulkCopy = new SqlBulkCopy(db_Connection,
19                             SqlBulkCopyOptions.CheckConstraints, sqlBulkTrans);
20         sqlBulkCopy.DestinationTableName = dt.TableName;
21         foreach (DataColumn dc in dt.Columns) {
22             sqlBulkCopy.ColumnMappings.Add(dc.ColumnName, dc.ColumnName);
23         }
24 
25         sqlBulkCopy.WriteToServer(dt);
26         sqlBulkTrans.Commit();
27     }
28     catch (Exception ex)
29     {
30         string msg = string.Format("BulkInsert() 執行異常: {0} {1}", ex.Message, ex.StackTrace);
31         Logger.Error(msg);
32         if (sqlBulkTrans != null) {
33             sqlBulkTrans.Rollback();
34         }
35 
36         return false;
37     }
38     finally {
39         sqlBulkCopy.Close();
40     }
41     return true;
42 }
BulkInsert

直接按照如下代碼調用即可

List<SchoolInfo> list = new List<SchoolInfo>();
list.Add(xxx);  // 組裝 list

DataTable dt = DBHelper.GetDataTable<SchoolInfo>(list);
dt.TableName = "SchoolInfo";
bool bulkInsertFlag = DataOperation.BulkInsert(dt);

在使用用遇到以下問題

給定的 ColumnMapping 與源或目標中的任意列均不匹配

原因及解決方法:表的列名是區分大小寫的。務必保證數據庫表的列名和 DataTable 的列名完全一致,最好能保證順序也一致。

此外, SqlBulkCopyOptions.KeepIdentity 這個參數允許自增型主鍵導入數據。

局限性

  • 不支持將多條數據傳給存儲過程
  • 數據庫表列名務必與 dt 的列名(大小寫敏感)、順序高度一致

3表值參數

如(1)所述,SqlBulkCopy 對象能夠將多條數據一次性傳送給SQL Server,但是多條數據仍然無法一次性傳給存儲過程。

SQL Server 2008 增加的新特性:表值參數(Table-Valued Parameter,TVP),可以將 DataTable 作為參數傳遞給函數或存儲過程。

不必在存儲過程中創建臨時表或許多參數,即可向存儲過程發送多行數據,節省自定義SQL語句。

表值參數通過自定義的表類型來聲明,首先,需要自定義一個表類型

IF NOT EXISTS (SELECT * FROM sys.types AS t WHERE t.name='LocalSchoolTVP')  
	BEGIN  
		CREATE TYPE dbo.LocalSchoolTVP AS TABLE(   
			ID varchar(5)  primary key NOT NULL,
			Name varchar(10) NOT NULL,
			City varchar(10) NOT NULL  
		)  
	END;  
GO  

在數據庫路徑 DbSqh --> 可編程性 --> 類型 --> 用戶定義表類型 中可以查看新增的表類型 LocalSchoolTVP。

然后定義一個存儲過程,接受一個 LocalSchoolTVP 的表類型參數

 1 USE [DbSqh]
 2 GO
 3 SET ANSI_NULLS ON
 4 GO
 5 SET QUOTED_IDENTIFIER ON
 6 GO
 7 
 8 /*
 9    此處用戶自定義表類型  
10 */
11 
12 if exists(select 1 from sysobjects where id=object_id('AddSchoolInfoBatchByTVP') and xtype='P')
13     drop proc AddSchoolInfoBatchByTVP
14 GO
15 
16 create proc AddSchoolInfoBatchByTVP(
17     @TVP LocalSchoolTVP READONLY
18 )
19 as  
20 begin
21     insert into SchoolInfo
22     select ID, Name, City from @TVP
23 end;
24 
25 RETURN 0;
Proc AddSchoolInfoBatchByTVP

注意,存儲過程中表值參數聲明為 readonly,只讀屬性,不能對表值參數執行 UPDATE、DELETE 或 INSERT等 DML 操作。

在 SQL Server 中創建表值參數和存儲過程的相關信息參考:http://www.cnblogs.com/isfish/p/4337488.html

首先在 SQL Server 中對創建的表類型和存儲過程進行簡單的測試

 1 declare @TVP as LocalSchoolTVP
 2 
 3 insert into @TVP
 4 values
 5 ('z01','tinghua','BeiJing'),
 6 ('z02','bjlg','BeiJing'),
 7 ('z03','seu','NanJing'),
 8 ('z04','nju','NanJing'),
 9 ('z05','whut','WuHan');
10 
11 EXEC AddSchoolInfoBatchByTVP @TVP
12 GO
TestTvpToProc

在調用之前,首先封裝一個方法執行存儲過程

 1 /// <summary>
 2 /// TVP表值參數  Insert 方法(方法體其實同 SqlXmlInsertByProc 完全一樣)
 3 /// </summary>
 4 public static bool TVPInsertByProc(string ProcName, SqlParameter[] sqlParameters)
 5 {
 6     SqlTransaction sqlTrans = null;
 7     try
 8     {
 9         CreatDBConnection();
10         sqlTrans = db_Connection.BeginTransaction();
11         using (SqlCommand command = new SqlCommand())
12         {
13             command.Connection = db_Connection;
14             command.Transaction = sqlTrans;
15             command.CommandType = CommandType.StoredProcedure;
16             command.CommandText = ProcName;
17             command.Parameters.AddRange(sqlParameters);
18 
19             command.ExecuteNonQuery();
20             sqlTrans.Commit();
21         }
22     }
23     catch (Exception ex)
24     {
25         string msg = string.Format("TVPInsertByProc() 執行異常: {0} {1}", ex.Message, ex.StackTrace);
26         Logger.Error(msg);
27         if (sqlTrans != null)
28         {
29             sqlTrans.Rollback();
30         }
31         return false;
32     }
33     return true;
34 }
TVPInsertByProc

下面在代碼中,調用存儲過程執行數據批量入庫

List<SchoolInfo> list = new List<SchoolInfo>();
list.Add(xxx);  // 組裝 list

DataTable dt = DBHelper.GetDataTable<SchoolInfo>(list);
//dt.TableName = "SchoolInfo"; // 用不到
SqlParameter[] sqlParams = new SqlParameter[1];
sqlParams[0] = new SqlParameter("@TVP", SqlDbType.Structured);
sqlParams[0].Value = dt;
sqlParams[0].TypeName = "LocalSchoolTVP";
string procName = "AddSchoolInfoBatchByTVP";
DataOperation.SqlXmlInsertByProc(procName, sqlParams);

其中,參數 sqlParams[0] 可以簡寫為如下形式

sqlParams[0] = new SqlParameter("@TVP", SqlDbType.Structured) { 
                Value = dt, TypeName = "LocalSchoolTVP"
            };

局限性

  • 表值參數僅僅在插入數目少於 1000 行時具有很好的執行性能
  • SQL Server 不維護表值參數列的統計信息
  • 表值參數需要預先定義與數據表相對應的表類型,略顯麻煩

參考Using SQL Server’s Table Valued Parameters

4SQL XML

將 XML 字符串形式的數據,批量插入到數據庫。通過將 XML 字符串作為參數,傳給存儲過程,存儲過程解析 XML 字符串

先來了解下從 SQL Server 2005 新增的 SqlDbType.xml 數據庫類型,最大可支持 2GB 大小,新建一張數據表

1 CREATE TABLE [dbo].[CityInfo](
2     ID varchar(5) NOT NULL,
3     CityXml xml NOT NULL
4 )
5 
6 alter table [CityInfo]
7  add constraint PK_CityInfo primary key(ID)
CityInfo with SqlDbType.xml

其中,可以用 varchar(max) 或 nvarchar(max) 代替 xml 類型,效果是一樣的。

采用普通的數據庫SQL語句,直接插入一條信息

insert into CityInfo
values('1','<City><ID>1</ID><Name>北京</Name></City>')

或者采用代碼的形式實現,首先封裝一個支持插入 xml 數據庫類型的插入方法

 1 /// <summary>
 2 /// 普通insert方法, 數據表包含SqlDbType.Xml數據庫類型
 3 /// </summary>
 4 public static bool InsertWithXml(string sql, SqlParameter[] sqlParameters)
 5 {
 6     try 
 7     {
 8         CreatDBConnection();
 9         using (SqlCommand command = new SqlCommand(sql, db_Connection))
10         {
11             command.Parameters.AddRange(sqlParameters);
12             command.ExecuteNonQuery();          
13         }
14     }
15     catch (Exception ex) 
16     {
17         string msg = string.Format("InsertWithXml() 執行異常: {0} {1}", ex.Message, ex.StackTrace);
18         Logger.Error(msg);
19         return false;
20     }
21     return true;
22 }
InsertWithXml

調用該方法插入一條數據(目前只允許一條)

string sql = "insert into CityInfo(ID,CityXml) VALUES(@id,@xml)";
StringBuilder sb = new StringBuilder();
sb.Append("<City><ID>9</ID><Name>廣州</Name></City>");

XmlTextReader xtr = new XmlTextReader(sb.ToString(), XmlNodeType.Document, null);
SqlXml sqlXml = new SqlXml(xtr);
SqlParameter[] sqlParams = new SqlParameter[2];
sqlParams[0] = new SqlParameter("@id", id.ToString());
sqlParams[1] = new SqlParameter("@xml", SqlDbType.Xml, sqlXml.Value.Length);
sqlParams[1].Value = sqlXml;
bool insertWithXmlFlag = DataOperation.InsertWithXml(sql, sqlParams);

再或者,直接拼接 insert into TableName values(),() ... () 語句,調用普通的 DataOperation.Insert 方法亦可。

相關信息可參見:在C#中使用SqlDbType.Xml類型參數使用 SQL XML 數據類型 - Java(推薦)

下面開始研究傳 XML字符串 存儲過程,實現批量數據入庫

首先了解 SQLServer 數據庫對 XML 的各種操作

  • OpenXml
  • SQL For XML

具體的相關信息可以參見另一篇博客:SQL Server For XML - sqh

封裝一個方法,調用存儲過程

 1 /// <summary>
 2 /// SQL XML Insert 方法
 3 /// </summary>
 4 public static bool SqlXmlInsertByProc(string ProcName, SqlParameter[] sqlParameters)
 5 {   
 6     SqlTransaction sqlTrans = null;
 7     try
 8     {
 9         CreatDBConnection();
10         sqlTrans = db_Connection.BeginTransaction();
11         using (SqlCommand command = new SqlCommand())
12         {                   
13             command.Connection = db_Connection;
14             command.Transaction = sqlTrans;
15             command.CommandType = CommandType.StoredProcedure;
16             command.CommandText = ProcName;
17             command.Parameters.AddRange(sqlParameters);
18 
19             command.ExecuteNonQuery();
20             sqlTrans.Commit();
21         }
22     }
23     catch(Exception ex)
24     {
25         string msg = string.Format("SqlXmlInsertByProc() 執行異常: {0} {1}", ex.Message, ex.StackTrace);
26         Logger.Error(msg);
27         if (sqlTrans != null) {
28             sqlTrans.Rollback();
29         }
30         return false;
31     }
32     return true;
33 }
SqlXmlInsertByProc

相應的存儲過程,接受一個 xml 數據庫類型的參數

 1 USE [DbSqh]
 2 GO
 3 SET ANSI_NULLS ON
 4 GO
 5 SET QUOTED_IDENTIFIER ON
 6 GO
 7 
 8 if exists(select 1 from sysobjects where id=object_id('AddSchoolInfoBatch') and xtype='P')
 9     drop proc AddSchoolInfoBatch
10 GO
11 
12 create proc AddSchoolInfoBatch(
13     @SchoolXml xml
14 )
15 as 
16  
17 begin
18     declare @ErrMsg nvarchar(100);
19     declare @XmlHandle int;  
20     EXEC sp_xml_preparedocument @XmlHandle OUTPUT, @SchoolXml 
21 
22     insert into SchoolInfo(ID, Name, City)
23     select ID, Name, City
24     from OPENXML(@XmlHandle, '/Schools/School', 2)
25     with(
26         ID varchar(5),
27         Name varchar(10),
28         City varchar(10)
29     );
30 
31     EXEC sp_xml_removedocument @XmlHandle
32 end;
33 
34 RETURN 0;
PROC AddSchoolInfoBatch

其中,sp_xml_preparedocumentsp_xml_removedocument 用於獲取和刪除一個指向內存中XML文檔的句柄。

關於在存儲過程中解析 XML 字符串,參見:http://www.cnblogs.com/JuneZhang/p/5142772.htmlhttp://www.xuebuyuan.com/1210868.html

首先在 SQL Server 中對該存儲過程的正確性作簡單測試

 1 DECLARE @idoc int
 2 DECLARE @doc xml
 3 
 4 SET @doc ='<Schools>
 5 <School><ID>121</ID><Name>nju</Name><City>NJ</City></School>
 6 </Schools>'
 7 
 8 EXEC sp_xml_preparedocument @idoc OUTPUT, @doc
 9 
10 
11 SELECT *
12 FROM      
13 OPENXML (@idoc, '/Schools/School',2)
14 WITH (        
15     ID varchar(5),
16     Name varchar(10),
17     City varchar(10)
18 )
19 
20 EXEC sp_xml_removedocument @idoc
TestXmlToProc

下面給出在代碼中的調用方法

List<SchoolInfo> list = new List<SchoolInfo>();
list.Add(xxx);  // 組裝 list

// xml字符串拼接, 以結點形式, 不包括屬性
StringBuilder sb = new StringBuilder();
sb.Append("<Schools>");
foreach (var si in list) {
	sb.AppendFormat("<School><ID>{0}</ID><Name>{1}</Name><City>{2}</City></School>", 
	//sb.AppendFormat("<School ID=\"{0}\" Name=\"{1}\" City=\"{2}\"></School>",
		si.ID, si.Name, si.City);
}
sb.Append("</Schools>");

XmlTextReader xtr = new XmlTextReader(sb.ToString(), XmlNodeType.Document, null);
SqlXml sqlXml = new SqlXml(xtr);
SqlParameter[] sqlParams = new SqlParameter[1];
sqlParams[0] = new SqlParameter("@SchoolXml", SqlDbType.Xml, sqlXml.Value.Length);
sqlParams[0].Value = sqlXml;
string procName = "AddSchoolInfoBatch";
DataOperation.SqlXmlInsertByProc(procName, sqlParams);

上述代碼中,入參類型是 SqlDbType.Xml 與存儲過程入參類型完全匹配,其實此處設置為 SqlDbType.Varchar,直接傳參 sb.ToString() 也可以。

務必注意,傳遞的XML字符串不要包含 <?xml version=\"1.0\"?> ,否則報錯。再一個問題,XML 字符串長度最好不要超過 8000 個字符。

參考Sql Server參數化查詢之where in和like實現之xml和DataTable傳參(TVP + SQL XML)

DB2

// 待更新 ...

MySQL

// 待更新 ...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM