C#讀取Txt大數據並更新到數據庫


環境

   Sqlserver 2016

   .net 4.5.2

 

目前測試數據1300萬 大約3-4分鍾.(限制一次讀取條數 和 線程數是 要節省服務器資源,如果調太大服務器其它應用可能就跑不了了), SqlServerDBHelper為數據庫幫助類.沒有什么特別的處理. 配置連接串時記錄把連接池開起來

另外.以下代碼中每次寫都創建了連接 .之前試過一個連接反復用. 130次大約有20多次 數據庫會出問題.並且需要的時間是7-8分鍾 左右. 

配置文件: xxx.json

 [ {
    /*連接字符串 */
    "ConnStr": "",
    "FilePath": "讀取的文件地址",
    /*數據庫表名稱 */
    "TableName": "寫入的數據庫表名",
    /*導入前執行的語句 */
    "ExecBeforeSql": "",
    /*導入后執行的語句 */
    "ExecAfterSql": "",
    /*映射關系 */
    "Mapping": [
      {
        "DBName": "XXX",
        "TxtName": "DDD"
      }      
    ],
    /*過濾數據的正則 當前只實現了小數據一次性讀完的檢查*/
    "FilterRegex": [],
    /*檢查數據合法性(從數據庫獲取字段屬性進行驗證) */
    "CheckData": false,
    /*列分隔符*/
    "Separator": "\t",
    /*表頭的行數*/
    "HeaderRowsNum": 1
  }
]

 

讀取代碼 : 注意 ConfigurationManager.AppSettings["frpage"] 和 ConfigurationManager.AppSettings["fr"] 需要自己配置好

 

 //讀取配置文件信息
            List<dynamic> dt = JsonConvert.DeserializeObject<List<dynamic>>(File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "config\\ImportTxt.json")));
            LogUtil.Info("開始讀取txt數據,讀取配置:" + dt.Count + "");
            if (dt.Count == 0)
            {
                return;
            }


            List<Task> li = new List<Task>();
            foreach (dynamic row in dt)
            {
                LogUtil.Info("開始處理數據:" + JsonConvert.SerializeObject(row));
                li.Add(ProcessRow(row));

            }
            Task.WaitAll(li.ToArray());
            LogUtil.Info("數據讀取完畢");
        public async Task ProcessRow(dynamic row)
        {
            await Task.Run(() =>
             {
                 AutoResetEvent AE = new AutoResetEvent(false);
                 DataTable Data = null;
                 string error = "", ConnStr, TableName, ExecBeforeSql, ExecAfterSql;
                 Boolean IsCheck = Convert.ToBoolean(row["CheckData"]);
                 TableName = Convert.ToString(row.TableName);
                 ConnStr = Convert.ToString(row.ConnStr);
                 ExecBeforeSql = Convert.ToString(row.ExecBeforeSql);
                 ExecAfterSql = Convert.ToString(row.ExecAfterSql);
                 int HeaderRowsNum = Convert.ToInt32(row.HeaderRowsNum);
                 string Separator = Convert.ToString(row.Separator);

                 Dictionary<string, string> dic = new Dictionary<string, string>();

                 //文件達到多大時就分行讀取
                 int fr = 0;
                 if (!int.TryParse(ConfigurationManager.AppSettings["fr"], out fr))
                 {
                     fr = 100;
                 }
                 fr = fr * 1024 * 1024;

                 //分行讀取一次讀取多少
                 int page = 0;
                 if (!int.TryParse(ConfigurationManager.AppSettings["frpage"], out page))
                 {
                     page = 50000;
                 }

                 foreach (var dyn in row.Mapping)
                 {
                     dic.Add(Convert.ToString(dyn.TxtName), Convert.ToString(dyn.DBName));
                 }


                 List<string> regex = new List<string>();
                 foreach (string item in row["FilterRegex"])
                 {
                     regex.Add(item);
                 }
                 string fpath = "", cpath = "";




                 cpath = Convert.ToString(row["FilePath"]);
                 string rootPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "tmp");
                 if (!Directory.Exists(rootPath))
                 {
                     Directory.CreateDirectory(rootPath);
                 }

                 fpath = Path.Combine(rootPath, Path.GetFileName(cpath));
                 File.Copy(cpath, fpath, true);
                 LogUtil.Info("拷文件到本地已經完成.從本地讀取數據操作");
                 int threadCount = Environment.ProcessorCount * 3;

                 FileInfo fi = new FileInfo(fpath);
                 //如果文件大於100M就需要分批讀取.一次50萬條
                 if (fi.Length > fr)
                 {

                     long sumCount = 0;
                     StreamReader sr = new StreamReader(fi.OpenRead());                    
                     int headRow = 0;
                     string rowstr = "";

                     List<Thread> li_th = new List<Thread>();
                     bool last = false;
                     int ij = 0;
                     LogUtil.Info("生成StreamReader成功  ");
                     #region 逐行讀取
                     
                     
                     while (sr.Peek() > -1)
                     {
                         rowstr = sr.ReadLine();
                         #region 將行數據寫入DataTable
                         if (headRow < HeaderRowsNum)
                         {
                             Data = new DataTable();
                             foreach (string scol in rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries))
                             {
                                 Data.Columns.Add(scol.Trim(), typeof(string));
                             }
                             headRow++;
                             continue;
                         }
                         else
                         {  //行數據
                             if (headRow > 1)
                             {
                                 for (int i = 1; i < headRow && sr.Peek() > -1; i++)
                                 {
                                     rowstr += " " + sr.ReadLine();
                                 }
                             }
                             Data.Rows.Add(rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries));
                             if (Data.Rows.Count < page && sr.Peek() > -1)
                             {
                                 continue;
                             }
                         }
                         last = (sr.Peek() == -1);
                         #endregion

                         sumCount += Data.Rows.Count;

                         ProcessPath(Data, page, sr, ref ij, TableName, ExecBeforeSql, ExecAfterSql, dic, IsCheck, li_th);
                                                

                         #region 檢查線程等待
                         if ((ij > 0 && (ij % threadCount) == 0) || last)
                         {
                             LogUtil.Info("完成一批次當前共寫數據: " + sumCount);
                             while (true)
                             {
                                 bool isok = true;
                                 foreach (var item in li_th)
                                 {
                                     if (item.IsAlive)
                                     {
                                         isok = false;
                                         Application.DoEvents();
                                         Thread.Sleep(1000);
                                     }
                                 }
                                 if (isok)
                                 {
                                     li_th.Clear();
                                     break;
                                 }
                             }

                             //最后一頁要等所有的執行完才能執行
                             if (sr.Peek() == -1)
                             {
                                 WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, false, true);
                                 LogUtil.Info("最后一次寫入完成");
                             }
                             LogUtil.Info(" 線程退出開始新的循環...");
                         }
                         Data.Clear();
                         #endregion
                     }
                     sr.Dispose();
                     #endregion
                 }
                 else
                 {
                     using (SQLServerDBHelper sdb = new SQLServerDBHelper())
                     {
                         sdb.OpenConnection();
                         #region 一次性讀取處理
                         Data = LoadDataTableFromTxt(fpath, ref error, Separator, HeaderRowsNum, regex, IsCheck, dic, TableName);
                         if (IsCheck)
                         {
                             DataRow[] rows = Data.Select("ErrorMsg is not null");
                             if (rows.Length > 0)
                             {
                                 LogUtil.Info($"讀取{TableName} 數據出錯 : {JsonConvert.SerializeObject(rows)}");
                                 return;
                             }
                         }

                         LogUtil.Info($"讀取{TableName} 的txt數據完成.共讀取數據:{Data.Rows.Count}條");
                         if (Data.Rows.Count == 0 || !string.IsNullOrWhiteSpace(error))
                         {
                             if (!string.IsNullOrWhiteSpace(error))
                             {
                                 LogUtil.Info("讀取數據出錯,地址:" + Convert.ToString(row["FilePath"]) + "  \r\n 錯誤:" + error);
                             }
                             return;
                         }
                         sdb.BgeinTransaction();
                         try
                         {
                             WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, sdb: sdb);
                             sdb.CommitTransaction();
                             LogUtil.Info(TableName + "數據更新完畢 !!");
                         }
                         catch (Exception ex)
                         {

                             LogUtil.Info(TableName + " 更新數據出錯,錯誤:" + ex.Message + "  \r\n 堆棧:" + ex.StackTrace);
                             sdb.RollbackTransaction();
                         }
                         #endregion

                     }



                 }

                 GC.Collect();
             });

        }

        private void ProcessPath(DataTable Data, int page, StreamReader sr, ref int ij, string TableName, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool IsCheck, List<Thread> li_th)
        {
            int threadCount = Environment.ProcessorCount * 4;

            string error = "";
            PoolModel p = new PoolModel { TableName = TableName, ExecBeforeSql = ExecBeforeSql, ExecAfterSql = ExecAfterSql, dic = dic };
            p.Data = Data.Copy();
            if (IsCheck)
            {
                using (SQLServerDBHelper sdb = new SQLServerDBHelper())
                {
                    error = CheckData(Data, TableName, dic, sdb);
                }
                DataRow[] rows = Data.Select("ErrorMsg is not null");
                if (rows.Length > 0 || !string.IsNullOrWhiteSpace(error))
                {
                    LogUtil.Info($"讀取{TableName} 數據出錯 : {JsonConvert.SerializeObject(rows)}\r\n錯誤: " + error);
                    return;
                }
            }

            ij++;
            if (ij == 1)
            {

                WriteTODB(p.TableName, p.Data, p.ExecBeforeSql, p.ExecAfterSql, p.dic, true, false);
                LogUtil.Info("首次寫入完成");
            }

            else if (sr.Peek() > -1)
            {

                Thread t = new Thread(d =>
                {

                    PoolModel c = d as PoolModel;
                    try
                    {
                        WriteTODB(c.TableName, c.Data, c.ExecBeforeSql, c.ExecAfterSql, c.dic, false, false);                      
                    }
                    catch (ThreadAbortException)
                    {
                        LogUtil.Error("線程退出.................");
                    }
                    catch (Exception ex)
                    {

                        LogUtil.Error(c.TableName + "寫入數據失敗:" + ex.Message + "\r\n堆棧:" + ex.StackTrace + "\r\n 數據:   " + JsonConvert.SerializeObject(c.Data));
                        ExitApp();
                        return;
                    }

                });
                t.IsBackground = true;
                t.Start(p);
                li_th.Add(t);
            }

        }

        public void ExitApp()
        {
            Application.Exit();
        }

        public void WriteTODB(string TableName, DataTable Data, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool first = true, bool last = true, SQLServerDBHelper sdb = null)
        {
            bool have = false;
            if (sdb == null)
            {
                sdb = new SQLServerDBHelper();
                have = true;
            }

            if (first && !string.IsNullOrWhiteSpace(ExecBeforeSql))
            {
                LogUtil.Info(TableName + "執行前Sql :" + ExecBeforeSql);
                sdb.ExecuteNonQuery(ExecBeforeSql);
            }
            sdb.BulkCopy(Data, TableName, dic);
            if (last && !string.IsNullOrWhiteSpace(ExecAfterSql))
            {
                LogUtil.Info(TableName + "執行后Sql :" + ExecAfterSql);
                sdb.ExecuteNonQuery(ExecAfterSql);
            }
            LogUtil.Info(TableName + "本次執行完成 ");
            if (have)
            {
                sdb.Dispose();
            }
        }


        public string CheckData(DataTable dt, string dbTableName, Dictionary<string, string> dic, SQLServerDBHelper sdb)
        {
            if (string.IsNullOrWhiteSpace(dbTableName))
            {
                return "表名不能為空!";
            }
            if (dic.Count == 0)
            {
                return "映射關系數據不存在!";

            }

            List<string> errorMsg = new List<string>();
            List<string> Cols = new List<string>();
            dic.Foreach(c =>
            {
                if (!dt.Columns.Contains(c.Key))
                {
                    errorMsg.Add(c.Key);
                }
                Cols.Add(c.Key);
            });

            if (errorMsg.Count > 0)
            {
                return "數據列不完整,請與映射表的數據列數量保持一致!列:" + string.Join(",", errorMsg);
            }


            //如果行數據有錯誤信息則添加到這一列的值里
            dt.Columns.Add(new DataColumn("ErrorMsg", typeof(string)) { DefaultValue = "" });
            string sql = @"--獲取SqlServer中表結構
                SELECT syscolumns.name as ColName,systypes.name as DBType,syscolumns.isnullable,
                syscolumns.length
                FROM syscolumns, systypes
                WHERE syscolumns.xusertype = systypes.xusertype
                AND syscolumns.id = object_id(@tb) ; ";
            DataSet ds = sdb.GetDataSet(sql, new SqlParameter[] { new SqlParameter("@tb", dbTableName) });
            EnumerableRowCollection<DataRow> TableDef = ds.Tables[0].AsEnumerable();

            // string colName="";
            Object obj_val;

            //將表結構數據重組成字典.
            var dic_Def = TableDef.ToDictionary(c => Convert.ToString(c["ColName"]), d =>
            {
                string DBType = "";
                string old = Convert.ToString(d["DBType"]).ToUpper();
                DBType = GetCSharpType(old);
                return new { ColName = Convert.ToString(d["ColName"]), DBType = DBType, SqlType = old, IsNullble = Convert.ToBoolean(d["isnullable"]), Length = Convert.ToInt32(d["length"]) };
            });

            DateTime now = DateTime.Now;
            foreach (DataRow row in dt.Rows)
            {
                errorMsg.Clear();
                foreach (string colName in Cols)
                {
                    if (dic.ContainsKey(colName))
                    {
                        if (!dic_Def.ContainsKey(dic[colName]))
                        {
                            return "Excel列名:" + colName + " 映射數據表字段:" + dic[colName] + "在當前數據表中不存在!";
                        }
                        //去掉數據兩邊的空格
                        row[colName] = obj_val = Convert.ToString(row[colName]).Trim();
                        var info = dic_Def[dic[colName]];
                        //是否是DBNULL
                        if (obj_val.Equals(DBNull.Value))
                        {
                            if (!info.IsNullble)
                            {
                                errorMsg.Add("" + colName + "不能為空!");

                            }
                        }
                        else
                        {
                            if (info.DBType == "String")
                            {
                                //time類型不用驗證長度(日期的 時間部分如 17:12:30.0000)
                                if (info.SqlType == "TIME")
                                {
                                    if (!DateTime.TryParse(now.ToString("yyyy-MM-dd") + " " + obj_val.ToString(), out now))
                                    {
                                        errorMsg.Add("" + colName + "填寫的數據無效應為日期的時間部分如:17:30:12");

                                    }
                                }
                                else if (Convert.ToString(obj_val).Length > info.Length)
                                {
                                    errorMsg.Add("" + colName + "長度超過配置長度:" + info.Length);
                                }
                            }
                            else
                            {
                                Type t = Type.GetType("System." + info.DBType);
                                try
                                {   //如果數字中有千分位在這一步可以處理掉重新給這個列賦上正確的數值                        
                                    row[colName] = Convert.ChangeType(obj_val, t); ;
                                }
                                catch (Exception ex)
                                {
                                    errorMsg.Add("" + colName + "填寫的數據" + obj_val + "無效應為" + info.SqlType + "類型.");
                                }

                            }

                        }
                    }

                }
                row["ErrorMsg"] = string.Join(" || ", errorMsg);
            }

            return "";
        }

        /// <summary>
        /// wm 2018年11月28日13:37
        ///  將數據庫常用類型轉為C# 中的類名(.Net的類型名)
        /// </summary>
        /// <param name="old"></param>
        /// <returns></returns>
        private string GetCSharpType(string old)
        {
            string DBType = "";
            switch (old)
            {
                case "INT":
                case "BIGINT":
                case "SMALLINT":
                    DBType = "Int32";
                    break;
                case "DECIMAL":
                case "FLOAT":
                case "NUMERIC":
                    DBType = "Decimal";
                    break;
                case "BIT":
                    DBType = "Boolean";
                    break;
                case "TEXT":
                case "CHAR":
                case "NCHAR":
                case "VARCHAR":
                case "NVARCHAR":
                case "TIME":
                    DBType = "String";
                    break;
                case "DATE":
                case "DATETIME":
                    DBType = "DateTime";
                    break;
                default:
                    throw new Exception("GetCSharpType數據類型" + DBType + "無法識別!");

            }

            return DBType;
        }




    public class PoolModel
    {
        public string TableName { get; set; }
        public DataTable Data { get; set; }
        public string ExecBeforeSql { get; set; }
        public string ExecAfterSql { get; set; }
        public Dictionary<string, string> dic { get; set; }

    }

 

    /// <summary>
        /// wm 2018年11月28日13:32
        /// 獲取Txt數據並對數據進行校驗返回一個帶有ErrorMsg列的DataTable,如果數據校驗失敗則該字段存放失敗的原因
        /// 注意:在使用該方法前需要數據表應該已經存在
        /// </summary>
        /// <param name="isCheck">是否校驗數據合法性(數據需要校驗則會按傳入的dbTableName獲取數據庫表的結構出來驗證)</param>
        /// <param name="map">如果需要驗證數據則此處需要傳映射關系   key Excel列名,Value 數據庫列名</param>
        /// <param name="dbTableName">驗證數據合法性的表(即數據會插入到的表)</param>
        /// <param name="error">非數據驗證上的異常返回</param>
        /// <param name="Regexs">用來過濾數據的正則</param>
        /// <param name="path">讀取文件的路徑</param>
        /// <param name="Separator">列分隔符</param>
        /// <param name="HeaderRowsNum">表頭的行數</param>
        /// <returns>如果需求驗證則返回一個帶有ErrorMsg列的DataTable,如果數據校驗失敗則該字段存放失敗的原因, 不需要驗證則數據讀取后直接返回DataTable</returns>
        public DataTable LoadDataTableFromTxt(string path, ref string error, string Separator, int HeaderRowsNum, List<string> Regexs = null, bool isCheck = false, Dictionary<string, string> map = null, string dbTableName = "", SQLServerDBHelper sdb = null)
        {
            DataTable dt = new DataTable();
            error = "";
            if (isCheck && (map == null || map.Count == 0 || string.IsNullOrWhiteSpace(dbTableName)))
            {
                error = "參數標明需要對表格數據進行校驗,但沒有指定映射表集合或數據表名.";
                return dt;
            }
            string txts = File.ReadAllText(path);
            #region 把讀出來的方便數據轉成DataTable

            Regexs?.ForEach(c =>
            {
                txts = new Regex(c).Replace(txts, "");
            });
            ////替換掉多表的正則
            //Regex mu_re = new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //FTP new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //原來以-分隔的 new Regex(@"-{5,}(\s)+-{5,}\s+\|.+(\s)?\|.+(\s)?\|-{5,}");
            ////去掉所有橫線
            //Regex mu_r = new Regex(@"[+-]{4,}"); //FTP new Regex(@"[+-]{4,}"); //原  new Regex(@"(\|-{5,})|(-{5,})"); 
            //string s1 = mu_re.Replace(txts, "");
            //string s2 = mu_r.Replace(s1, "");
            // string[] tts = s2.Split(new string[] { "\r\n" }, StringSplitOptions.None);
            string[] tts = txts.Split(new string[] { "\r\n" }, StringSplitOptions.None);
            string[] vals;
            string s1;
            //生成表頭默認第一行時表頭直到遇到第一個只有一個|的內容為止(有幾行表頭,下面的內容就會有幾行)
            int headerNum = -1;//記錄表頭有幾列

            DataRow dr;
            //處理col重復的問題,如果有重復按第幾個來命名 比如  A1 A2 
            Dictionary<string, int> col_Rep = new Dictionary<string, int>();
            string colName = "";
            bool isre = false;//記錄當前是否有重復列
            int empty_HeaderRow = 0;
            for (int i = 0; i < tts.Length; i++)
            {
                s1 = tts[i];

                //還未獲取出表頭
                if (headerNum < HeaderRowsNum)
                {
                    vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries);
                    foreach (string col in vals)
                    {
                        colName = col.Trim();

                        if (col_Rep.Keys.Contains(colName))
                        {
                            col_Rep[colName]++;
                            isre = true;
                            //重復列處理
                            //colName += col_Rep[colName];
                            continue;
                        }
                        else
                        {
                            col_Rep.Add(colName, 1);
                        }
                        dt.Columns.Add(colName, typeof(string));
                    }
                    headerNum = (i == (HeaderRowsNum - 1)) ? HeaderRowsNum : 0;
                }
                else
                {
                    if (string.IsNullOrWhiteSpace(s1.Trim()) || string.IsNullOrWhiteSpace(s1.Replace(Separator, "")))
                    {
                        continue;
                    }
                    if (isre)
                    {
                        error = "列:" + string.Join(",", col_Rep.Where(c => c.Value > 1).Select(c => c.Key)) + "存在重復";
                        return dt;
                    }


                    //多行時把多行的數據加在一起處理
                    if (headerNum > 1)
                    {
                        for (int j = 1; j < headerNum && (i + j) < tts.Length; j++)
                        {
                            //數據第一行最后沒有| 如果沒數據則直接換行了所以這里補一個空格防止數據被當空數據移除了
                            s1 += " " + tts[i + j];
                        }
                    }
                    vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries);
                    dr = dt.NewRow();
                    dr.ItemArray = vals;
                    dt.Rows.Add(dr);
                    //因為本次循環結束上面會去++ 所以這里只加headerNum-1次
                    i += (headerNum - 1);
                }

            }
            #endregion

            if (isCheck)
            {
                //dt.Columns.Remove("Item");
                //dt.Columns["Item1"].ColumnName = "Item";
                //dt.Columns.RemoveAt(dt.Columns.Count - 2);
                error = CheckData(dt, dbTableName, map, sdb);
            }

            return dt;

        }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM