作為開發人員,能學會簡單地對歷史數據遷移是日常基本功。在上篇文章中,我們初步地解釋了歷史數據遷移的基本概念,以及如何使用SQL SERVER存儲過程實現對歷史數據遷移。
一般來說直接在數據庫中寫SQL語句(insert into ... select from)的方式進行遷移,僅適用於“停服狀態”下的數據遷移場景,也就是數據庫處於無用戶使用的情況下,而且遷移場景有限。
當數據庫在生產環境中實時在用,而且數據量較大的前提下,很顯然會影響性能,上述方法並不可取。
今天我們來介紹一個新的思路,可將歷史數據遷移對現有實時在用數據庫的性能影響降到較低水平。
1、大體遷移思路
簡單描述下思路:
1、先利用程序根據創建時間升序排序,直接查詢TopN條數據,記錄到程序緩存中。
【查詢后就和正式數據庫沒關系了,只要TopN不大,這個簡單的查詢幾乎對數據庫沒有任何影響。】
2、然后程序將緩存中的TopN條數據寫入到歷史庫。
【這個階段和正式數據庫沒有半點關系,哪怕寫的再慢,也不影響正式數據庫】
3、核對下歷史數據庫中的數據,確保沒有問題,就可以刪除正式環境中的TopN條數據了。
【這里刪除操作稍微比查詢影響大些,但是僅僅是小批量的數據刪除同樣影響不大。】
4、如果需要大批量數據遷移怎么辦? 非常簡單,分批執行,比如循環執行CountX次,那么遷移的總數據量就為TopN×CountX ,所以根本不怕數據量大,開啟程序自動執行即可,也就是用耗時間來減少對數據庫性能的影響。【執行時間換性能】
這個思路和insert into ... select from的方式,最大的區別就在用寫入歷史數據庫的過程不影響正式在用的數據庫,只剩下TopN條數據的查詢和刪除操作,影響很小。
特別說明:TopN不能太大,這個越小越好,根據表字段的多少,數據庫的性能,以及用戶連接數情況綜合考慮,建議TopN取值在1000到5000條之間,這樣對數據庫影響幾乎可以忽略。
2、例子中使用到的主要技術
為了實現這個思路,我這里也隨便寫個簡單程序來試試效果,做個例子,僅供大家參考。
由於是最基礎的例子,我就不用通用的底層框架了,免得大家看起來吃力。同時為了運行演示方便,直接寫個exe可執行程序好了。
這個講解的例子,用到的技術主要包括:
數據庫:SQL SERVER,
數據庫訪問組件:Entity Framework Core
日志記錄:Serialog.AspNetCore
開發語言:C#
技術框架:.NET 5
項目模版:控制台應用程序
3、創建個Model實體類
按照EntityFramework Core的思路,先建個Model吧。
1 using System; 2 using System.Collections.Generic; 3 using System.ComponentModel.DataAnnotations; 4 using System.ComponentModel.DataAnnotations.Schema; 5 using System.Text; 6 7 namespace Tyingtech_glps.Entities.HDM 8 { 9 /// <summary> 10 /// 接口請求記錄 11 /// </summary> 12 [Table("GLPS_APIREQUEST")] 13 public class GLPS_APIREQUEST 14 { 15 [Key] 16 public string FID { get; set; } 17 18 /// <summary> 19 /// 接口編號【固定】 20 /// </summary> 21 public string FAPICODE { get; set; } 22 23 /// <summary> 24 /// 請求方身份ID 25 /// </summary> 26 public string FAPPID { get; set; } 27 28 /// <summary> 29 /// 接口請求方URL地址 30 /// </summary> 31 public string FFORMURL { get; set; } 32 33 /// <summary> 34 /// 接口請求方IP 35 /// </summary> 36 public string FIP { get; set; } 37 38 /// <summary> 39 /// 請求參數(JSON字符串) 40 /// </summary> 41 public string FREQUESTDATA { get; set; } 42 43 /// <summary> 44 /// 請求時間點 45 /// </summary> 46 public DateTime FREQTIME { get; set; } 47 48 /// <summary> 49 ///響應參數(JSON字符串) 50 /// </summary> 51 public string FRESPONSE { get; set; } 52 53 /// <summary> 54 /// 響應時間點 55 /// </summary> 56 public DateTime FRESTIME { get; set; } 57 58 /// <summary> 59 /// 總毫秒數 60 /// </summary> 61 public int FMILLISECOND { get; set; } 62 63 /// <summary> 64 /// 接口請求結果(1:成功;0:失敗;-1:接口內部異常) 65 /// </summary> 66 public int FISSUCCESS { get; set; } 67 68 /// <summary> 69 /// 失敗詳情,僅內部使用(如:內部報錯異常信息;AppId錯誤;非法請求;參數不全;...) 70 /// </summary> 71 public string FRESULT { get; set; } 72 73 /// <summary> 74 /// 約定格式數據1 (如:車牌號) 75 /// </summary> 76 public string FDATA1 { get; set; } 77 78 /// <summary> 79 /// 約定格式數據2 (如:進場、離場) 80 /// </summary> 81 public string FDATA2 { get; set; } 82 83 /// <summary> 84 /// 約定格式數據3 (如:...) 85 /// </summary> 86 public string FDATA3 { get; set; } 87 88 /// <summary> 89 /// 約定格式數據4(如:...) 90 /// </summary> 91 public string FDATA4 { get; set; } 92 93 /// <summary> 94 /// 約定格式數據5(如:...) 95 /// </summary> 96 public string FDATA5 { get; set; } 97 98 /// <summary> 99 /// 創建時間 100 /// </summary> 101 public DateTime FCREATETIME { get; set; } 102 } 103 }
4、執行TopN條數據的遷移
1 /// <summary> 2 /// 歷史數據遷移【一次性遷移】 3 /// </summary> 4 /// <param name="n">數據遷移量(條數) top N</param> 5 /// <returns>實際遷移成功數量</returns> 6 public static int DataToHisExecOne(int n) 7 { 8 n = n > 10000 ? 10000 : n; //一次最多1萬(再多可能會對性能有影響【遷移第一條:盡量不能影響現有在用數據庫的業務】) 9 10 //測試環境 11 EnumWhichDB whichDB = EnumWhichDB.DevGlps; 12 EnumWhichDB whichDBHis = EnumWhichDB.DevGlpsHis; 13 14 string connStr = DBConnectionString.GetConnStr(whichDB); //當前需要遷移的數據庫 15 string connStrHis = DBConnectionString.GetConnStr(whichDBHis); //歷史數據庫 16 17 //Serialog 記錄日志 18 var logFileName = string.Format("ToHisOne_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH")); 19 var log = new LoggerConfiguration() 20 .WriteTo.Console() 21 .WriteTo.File(logFileName) 22 .CreateLogger(); 23 24 log.Information($"開始遷移數據,計划遷移條數為:{n}"); 25 26 int beginCount = GetRowCount(whichDB); //當前數據庫數據遷移前的記錄行數 27 int beginCountHis = GetRowCount(whichDBHis); //歷史數據庫數據遷移前的記錄行數 28 29 Stopwatch sw = new Stopwatch(); //檢測運行時間(對每個階段) 30 Stopwatch swAll = new Stopwatch(); //總耗時 31 swAll.Start(); 32 33 sw.Start(); 34 DateTime maxCreateTime = GetAscMaxCreateTimeTopN(whichDB,n); 35 sw.Stop(); 36 log.Information("對應遷移數據FCREATETIME為:" + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fffff") + $",查詢maxCreateTime耗時:{sw.ElapsedMilliseconds} 毫秒。"); 37 38 sw.Restart(); //重新開始計時 39 List<GLPS_APIREQUEST> records = new List<GLPS_APIREQUEST>(); 40 //從數據庫查最早的數據 41 using (var db = new GlpsDbContext(connStr)) 42 { 43 records = db.DS_ApiRequest 44 .Where(t => t.FCREATETIME <= maxCreateTime) 45 .OrderBy(t => t.FCREATETIME) 46 .AsNoTracking() //非跟蹤查詢(只讀,提升效率) 47 .ToListAsync().Result; 48 } 49 sw.Stop(); //計時結束 50 log.Information($"按時間點實際查詢到 {records.Count} 條數據,耗時:{sw.ElapsedMilliseconds} 毫秒。"); 51 52 sw.Restart(); //重新開始計時 53 //寫數據到歷史數據庫 54 int newCount = 0; 55 using (var db = new GlpsDbContext(connStrHis)) 56 { 57 db.DS_ApiRequest.AddRange(records); 58 newCount = db.SaveChanges(); //最后保存數據 59 log.Information($"實際成功寫入到歷史數據庫條數: {newCount}"); 60 } 61 sw.Stop(); //計時結束 62 log.Information($"實際成功寫入到歷史數據庫條數:{newCount} , 寫入耗時:{sw.ElapsedMilliseconds} 毫秒。"); 63 64 65 //最后刪除數據 66 DateTime maxCreateTimeHis = GetMaxCreateTime(whichDBHis); //歷史數據庫最大的FCreateTime 67 68 //兩個時間相同,則可以刪除數據,否則不刪除,直接預警(中間數據可能出錯,需要人工干預) 69 if (maxCreateTime == maxCreateTimeHis) 70 { 71 sw.Restart(); 72 var rowCount = DeleteByFCreateTime(whichDB, maxCreateTime); 73 sw.Stop(); 74 log.Information($"遷移后刪除數據條數:{rowCount} , 刪除耗時:{sw.ElapsedMilliseconds} 毫秒。"); 75 } 76 else if (newCount < n && newCount == records.Count) //就是實際小於n,那么是:C#的datetime和數據庫的datetime精度不同 77 { 78 sw.Restart(); 79 var rowCount = DeleteByFCreateTime(whichDB,maxCreateTimeHis); //需按歷史數據庫的日期來刪除 80 sw.Stop(); 81 log.Information($"遷移后刪除數據條數:{rowCount} , 刪除耗時:{sw.ElapsedMilliseconds} 毫秒。"); 82 } 83 else 84 { 85 log.Error("數據對比出錯:maxCreateTime != maxCreateTimeHis。 未執行最后的刪除數據!!!請開發人員核對數據。"); 86 log.Information("maxCreateTime = " + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fff")); 87 log.Information("maxCreateTimeHis = " + maxCreateTimeHis.ToString("yyyy-MM-dd HH:mm:ss.fff")); 88 } 89 90 int endCount = GetRowCount(whichDB); //當前數據庫數據遷移后的記錄行數 91 int endCountHis = GetRowCount(whichDBHis); //歷史數據庫數據遷移后的記錄行數 92 log.Information($"遷移前GLPS_APIREQUEST的數據條數:{beginCount} , 遷移后數據條數:{endCount}"); 93 log.Information($"遷移前GLPS_APIREQUESTHis的數據條數:{beginCountHis} , 遷移后數據條數:{endCountHis}"); 94 95 swAll.Stop(); 96 log.Information($"swAll:遷移總耗時:{swAll.ElapsedMilliseconds} 毫秒。"); 97 log.Information("------------------------------------------------"); 98 99 return newCount; 100 }
中間用到的幾個單獨邏輯的方法
1 /// <summary> 2 /// 獲取總記錄行數 3 /// </summary> 4 /// <param name="whichDB"></param> 5 /// <returns></returns> 6 private static int GetRowCount(EnumWhichDB whichDB) 7 { 8 string connStr = DBConnectionString.GetConnStr(whichDB); 9 int totalCount = 0; 10 using (var db = new GlpsDbContext(connStr)) 11 { 12 totalCount = db.DS_ApiRequest.Count(); 13 } 14 return totalCount; 15 } 16 17 18 /// <summary> 19 /// 查詢數據庫中最大的FCreateTime 20 /// </summary> 21 /// <param name="whichDB"></param> 22 /// <returns></returns> 23 private static DateTime GetMaxCreateTime(EnumWhichDB whichDB) 24 { 25 string connStr = DBConnectionString.GetConnStr(whichDB); 26 DateTime maxCreateTime = DateTime.MinValue; 27 using (var db = new GlpsDbContext(connStr)) 28 { 29 //查詢歷史數據庫,最大的FCREATETIME 30 var record = db.DS_ApiRequest.OrderByDescending(t => t.FCREATETIME).Take(1).SingleOrDefault(); 31 if (record != null) 32 { 33 maxCreateTime = record.FCREATETIME; 34 } 35 } 36 return maxCreateTime; 37 } 38 39 /// <summary> 40 /// 按創建時間從小到大排序(FCreateTime Asc),取前N條數據的最大FCreateTime 41 /// 【即:取最早N條數據中,最大的創建時間】 42 /// </summary> 43 /// <param name="whichDB"></param> 44 /// <param name="topN"></param> 45 /// <returns></returns> 46 private static DateTime GetAscMaxCreateTimeTopN(EnumWhichDB whichDB,int topN) 47 { 48 string connStr = DBConnectionString.GetConnStr(whichDB); 49 DateTime maxCreateTime = DateTime.MinValue; 50 using (var db = new GlpsDbContext(connStr)) 51 { 52 //查詢totalCount對應最大的FCREATETIME 53 var record = db.DS_ApiRequest.OrderBy(t => t.FCREATETIME) 54 .Skip(topN - 1).Take(1).SingleOrDefault(); 55 if (record != null) 56 { 57 maxCreateTime = record.FCREATETIME; 58 } 59 } 60 return maxCreateTime; 61 } 62 63 /// <summary> 64 /// 刪除小於等於【某個創建時間】的數據 65 /// </summary> 66 /// <param name="whichDB"></param> 67 /// <param name="maxCreateTime"></param> 68 /// <returns>刪除記錄數</returns> 69 private static int DeleteByFCreateTime(EnumWhichDB whichDB, DateTime maxCreateTime) 70 { 71 int rowCount = 0; 72 string connStr = DBConnectionString.GetConnStr(whichDB); 73 using (var db = new GlpsDbContext(connStr)) 74 { 75 List<SqlParameter> listParams = new List<SqlParameter>{ 76 new SqlParameter("FCREATETIME", maxCreateTime) 77 }; 78 rowCount = db.Database.ExecuteSqlRaw(@"delete from GLPS_APIREQUEST where FCREATETIME<=@FCREATETIME", listParams); 79 } 80 return rowCount; 81 }
5、大數據量循環執行的邏輯
1 /// <summary> 2 /// 歷史數據遷移【分批遷移】 3 /// </summary> 4 /// <param name="totalCount">任務總遷移條數</param> 5 /// <param name="prePageCount">分批遷移,單次查詢數量</param> 6 public static void DataToHis(int totalCount=100,int prePageCount=10) 7 { 8 //臨界值設定 9 totalCount = totalCount > 1000000 ? 1000000 : totalCount; //單次任務100萬 10 prePageCount = prePageCount > 10000 ? 10000 : prePageCount; //每次1萬 11 12 //Serialog 記錄日志 13 var logFileName = string.Format("ToHis_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH")); 14 var log = new LoggerConfiguration() 15 .WriteTo.Console() 16 .WriteTo.File(logFileName) 17 .CreateLogger(); 18 19 Stopwatch swAll = new Stopwatch(); //總耗時 20 swAll.Start(); 21 22 log.Information($"本次計划總遷移數據條數:{totalCount},分批單次執行條數:{prePageCount}"); 23 int okCount = 0; //遷移成功的條數 24 int runTimes = 0; 25 while (okCount < totalCount) 26 { 27 runTimes++; 28 if (runTimes % 20 == 0) 29 { 30 Console.Clear(); //每執行20次的時候,清除控制台 31 Console.WriteLine("控制台已被清理。"); 32 } 33 if (totalCount - okCount < prePageCount) 34 { 35 prePageCount = totalCount - okCount; //最后一次如果沒有一頁數據,只遷移部分 36 if (prePageCount == 1) break; //如果是1條的話,日期精度容易出現問題,特意控制不執行 37 } 38 okCount += DataToHisExecOne(prePageCount); 39 log.Information(@"已執行累計條數{0},累計耗時:{1}分{2}秒{3},累計執行次數{4}", okCount, swAll.Elapsed.TotalMinutes, swAll.Elapsed.Seconds, swAll.Elapsed.Milliseconds, runTimes); 40 } 41 42 swAll.Stop(); 43 log.Information($"本次實際總遷移數據條數:{okCount},共分批執行次數:{runTimes}"); 44 log.Information("================================================"); 45 46 }
6、在appsettings.json 做一些基礎配置
配置的目的是為了方便執行,免得改程序。
1 "TyUseEnv": "0", //使用環境(0:測試環境;1:正式環境(沙箱)) 2 "TyTopN": "10", //每次遷移的數據條數 3 "TyTotalCount": "25", //分批總遷移數據量 4 "TyForTable": "GLPS_APIREQUEST", //需要遷移數據的表名(測試已支持:GLPS_APIREQUEST、GLPS_GATEENTRYREC)
其他的表以此類推,可以進行多表切換。
在控制台Main方法中通過依賴注入的方式,動態實例化要遷移的表結構。
歷史數據遷移,只要支持此接口(IExecDataToHis)即可,說白就2個遷移方法而已:單次執行和分批循環執行。
其他的表以此類推,可以進行多表切換。
在控制台Main方法中通過依賴注入的方式,動態實例化要遷移的表結構。
歷史數據遷移,只要支持此接口(IExecDataToHis)即可,說白就2個遷移方法而已:單次執行和分批循環執行。
1 using Twi.NET5.Core; 2 3 namespace Tyingtech_glps.Interface.HDM 4 { 5 /// <summary> 6 /// 可執行的數據遷移接口(單個和分批) 7 /// </summary> 8 public interface IExecDataToHis : IWhoAmI 9 { 10 /// <summary> 11 /// 歷史數據遷移【分批遷移】 12 /// </summary> 13 /// <param name="totalCount">任務總遷移條數</param> 14 /// <param name="perPageCount">分批遷移,單次查詢每頁數量</param> 15 public TwiReturnBase DataToHis(int totalCount = 10000, int perPageCount = 1000); 16 17 /// <summary> 18 /// 歷史數據遷移【一次性遷移】 19 /// </summary> 20 /// <param name="n">數據遷移量(條數) top N</param> 21 public TwiReturnBase DataToHisExecOne(int n); 22 } 23 }
TwiReturnBase 就是一個統一封裝的返回類型,不用管它。
7、控制台運行界面參考
最后exe程序的界面效果如下。
然后就是執行命令了,單次遷移輸入1,分批執行遷移輸入2。
遷移哪張表,每次單次遷移多少條,總共遷移多少數據量,都可在appsettings.json中配置。
8、單次執行效果
直接輸入命令1,開始執行。
(哈哈,看來我直接本地電腦還是非常卡的,不過不影響思路的效果,等最后我們換台測試服務器看看1000萬數據的效果。)
9、分配執行效果
自動執行多次的效果,只要配置好,就會自動執行,分配執行不影響性能。
如果中間報錯,會自動停止執行。
10、大數據量的測試效果
我們換台測試服務器,用此方法測試下有數據在跑的沙箱環境一千萬數據量的效果,每天源源不斷執行,測試遷移了快939萬的數據問題都不大。
由於沙箱環境中也有數據在跑,所以業務數據庫是實時增加數據量的。
11、此方法的優缺點
這個思路能解決哪怕是生產環境都可以用,有事沒事遷移點數據量,對數據庫性能沒有多大影響,而且支持不同數據庫的遷移,比如SQL SERVER 到MySQL等等,但是缺點就是遷移起來比較慢些。適用於小企業小項目的常見場景,這個思路基本就夠了。
【以后有空我還會再開篇文章講別的大數據遷移思路】
當然最后要說的是,這個例子不是很靠譜,為什么這么說呢?首先這個是硬編碼方式,局限性太大,如果要通用的話,程序應該再抽下,直接配置表名相關的即可。
實現通用工具模式,任何時候無需修改代碼,直接簡單改配置即可。這個我們下期教程繼續改進思路。