C# 海量數據瞬間插入到數據庫的方法
當我們在數據庫中進行大量的數據追加時,是不是經常因為數據量過大而苦惱呢?
而所謂的海量數據,一般也是上萬級的數據,比如我們要添加一百萬條數據,應該如何提高它的效率呢?
Oracle數據庫:
普通肉墊式
什么叫批量插入呢,就是一次性插入一批數據,我們可以把這批數據理解為一個大的數組,而這些全部只通過一個SQL來實現,而在傳統方式下,需要調用很多次的SQL才可以完成,這就是著名的“數組綁定”的功能。我們先來看一下傳統方式下,插入多行記錄的操作方式:
//設置一個數據庫的連接串, string connectStr = "User Id=scott;Password=tiger;Data Source="; OracleConnection conn = new OracleConnection(connectStr); OracleCommand command = new OracleCommand(); command.Connection = conn; conn.Open(); //通過循環寫入大量的數據,這種方法顯然是肉墊 for (int i = 0; i < recc; i++) { string sql = "insert into dept values(" + i.ToString() + "," + i.ToString() + "," + i.ToString() + ")"; command.CommandText = sql; command.ExecuteNonQuery(); }
帶事務的栗子:
string result; //創建連接 var conn = new OracleConnection(_connectStr); conn.Open(); var tran = conn.BeginTransaction(); //事務 try { CreateTable(jiFenZhuiSuGuanXis.Select(info => info.TableGroup).Distinct().ToList()); //創建數據庫表 帶有分表要求 //創建Command 並循環插入數據 var command = conn.CreateCommand(); foreach (var guanXi in jiFenZhuiSuGuanXis) { //插入 var insertStr = string.Format( "insert into {5} values('{0}','{1}','{2}','{3}','{4}')", "","","","",""); command.CommandText = insertStr; command.ExecuteNonQuery(); } tran.Commit(); result = "成功"; } catch (OracleException ex) { tran.Rollback(); result = "出現錯誤。\n"+ex.Message; LogHelper.WriteLog("UpLoad.OracleException捕獲異常。\n", ex); } catch (Exception ex) { result = "出現錯誤。\n" + ex.Message; LogHelper.WriteLog("UpLoad.Exception捕獲異常。\n", ex); } finally { if (conn.State == ConnectionState.Open) conn.Close(); } return result;
使用ODP特性
//設置一個數據庫的連接串 string connectStr = "User Id=scott;Password=tiger;Data Source="; OracleConnection conn = new OracleConnection(connectStr); OracleCommand command = new OracleCommand(); command.Connection = conn; //到此為止,還都是我們熟悉的代碼,下面就要開始嘍 //這個參數需要指定每次批插入的記錄數 command.ArrayBindCount = recc; //在這個命令行中,用到了參數,參數我們很熟悉,但是這個參數在傳值的時候 //用到的是數組,而不是單個的值,這就是它獨特的地方 command.CommandText = "insert into dept values(:deptno, :deptname, :loc)"; conn.Open(); //下面定義幾個數組,分別表示三個字段,數組的長度由參數直接給出 int[] deptNo = new int[recc]; string[] dname = new string[recc]; string[] loc = new string[recc]; // 為了傳遞參數,不可避免的要使用參數,下面會連續定義三個 // 從名稱可以直接看出每個參數的含義,不在每個解釋了 OracleParameter deptNoParam = new OracleParameter("deptno", OracleDbType.Int32); deptNoParam.Direction = ParameterDirection.Input; deptNoParam.Value = deptNo; command.Parameters.Add(deptNoParam); OracleParameter deptNameParam = new OracleParameter("deptname", OracleDbType.Varchar2); deptNameParam.Direction = ParameterDirection.Input; deptNameParam.Value = dname; command.Parameters.Add(deptNameParam); OracleParameter deptLocParam = new OracleParameter("loc", OracleDbType.Varchar2); deptLocParam.Direction = ParameterDirection.Input; deptLocParam.Value = loc; command.Parameters.Add(deptLocParam); //在下面的循環中,先把數組定義好,而不是像上面那樣直接生成SQL for (int i = 0; i < recc; i++) { deptNo[i] = i; dname[i] = i.ToString(); loc[i] = i.ToString(); } //這個調用將把參數數組傳進SQL,同時寫入數據庫 command.ExecuteNonQuery();
如果插入多張表格的數據 Command 需要重新new
string result; //創建連接 var conn = new OracleConnection(_connectStr); conn.Open(); var tran = conn.BeginTransaction(); //事務 try { //創建數據庫表 分表要求 CreateTable(jiFenZhuiSuGuanXis.Select(info => info.TableGroup).Distinct().ToList()); //根據分表名稱 將數據分組 var dataTableGroup = jiFenZhuiSuGuanXis.GroupBy(j => j.TableGroup); foreach (var group in dataTableGroup) { var command = conn.CreateCommand(); //創建Command command.ArrayBindCount = group.Count(); //插入數量 //插入語句 command.CommandText = string.Format( "insert into {0} values(:jiid,:productcode,:productspec,:zhuisucode,:jifencode)", group.Key.ToUpper()); #region 定義傳遞參數 var idParam = new string[group.Count()]; var productCodeParam = new string[group.Count()]; var productSpecParam = new string[group.Count()]; var zhuisucodeParam = new string[group.Count()]; var jifencodeParam = new string[group.Count()]; //定義傳遞參數 command.Parameters.AddRange(new[] { new OracleParameter("jiid", OracleDbType.NVarchar2) { Direction = ParameterDirection.Input, Value = idParam }, new OracleParameter("productcode", OracleDbType.NVarchar2) { Direction = ParameterDirection.Input, Value = productCodeParam }, new OracleParameter("productspec", OracleDbType.NVarchar2) { Direction = ParameterDirection.Input, Value = productSpecParam }, new OracleParameter("zhuisucode", OracleDbType.NVarchar2) { Direction = ParameterDirection.Input, Value = zhuisucodeParam }, new OracleParameter("jifencode", OracleDbType.NVarchar2) { Direction = ParameterDirection.Input, Value = jifencodeParam } } ); #endregion #region 參數賦值 var i = 0; foreach (var xi in group) { idParam[i] = xi.Id; //ID參數 productCodeParam[i] = xi.ProductCode; //productcode參數 productSpecParam[i] = xi.ProductSpec; //productspec參數 zhuisucodeParam[i] = xi.ZhuiSuCode; //zhuisucode參數 jifencodeParam[i] = xi.JiFenCode; //JiFenCode參數 i++; } #endregion command.ExecuteNonQuery(); //執行 } tran.Commit(); result = "成功"; } catch (OracleException ex) { tran.Rollback(); result = "出現錯誤。\n" + ex.Message; LogHelper.WriteLog("UpLoadOdp.OracleException捕獲異常。\n", ex); } catch (Exception ex) { result = "出現錯誤。\n" + ex.Message; LogHelper.WriteLog("UpLoadOdp.Exception捕獲異常。\n", ex); } finally { if (conn.State == ConnectionState.Open) conn.Close(); } return result;