一、背景
1.業務背景
最近公司的項目在處理用戶合並的一些需求,主要涉及到的是三個數據庫,多個賬號合並成一個賬號,以及產生的歷史業務數據的更新和同步
這邊主要負責的是更新和同步這些數據 同時將兩個賬號合並一個賬號也一同處理。
2.代碼背景
使用的是C#
3.3個情況分開處理 涉及到需要合並的數據有10W條 這部分具體的業務數據涉及到30張表 總數據量還是很多的
二、思路
1.一開始的思路 使用循環 一條一條的用戶數據去執行 但是執行效率很低 開始單獨處理的1400多條數 執行時間都長達1個多小時 太費時
foreach (var item in collection) { }
如果是完全把10W條需要合並的數據 涉及到的業務數據可能有幾十萬 甚至上百萬的話 會嚴重的耽誤時間
2.后來的思路 使用異步 多線程來處理 由於單個處理業務數據 不需要同步和返回狀態判斷 所有准備使用線程池直接處理
ThreadPool.QueueUserWorkItem(new WaitCallback(方法名));
由於是一開始使用 不是很熟悉 使用這個ThreadPool之后 沒有設置線程的最大和最小 由於服務是一直在處理數據 后來查閱到
應用程序中,線程把大部分的時間花費在等待狀態,等待某個事件發生,然后才能給予響應
這一般使用ThreadPool(線程池)來解決;
這一般使用ThreadPool(線程池)來解決;
最后就沒有使用這個方法。
3.后來發現短信發送的輪詢服務的方法可以借鑒,於是仔細的看了邏輯最后采用的方式便是使用task來處理。
Task的背后的實現也是使用了線程池線程,但它的性能優於ThreadPool,因為它使用的不是線程池的全局隊列,
而是使用的本地隊列,使線程之間的資源競爭減少。同時Task提供了豐富的API來管理線程、控制。但是相對前面的兩種耗內存,
Task依賴於CPU對於多核的CPU性能遠超前兩者,單核的CPU三者的性能沒什么差別。
由於擔心線程開的過多,導致濫用和浪費,於是在代碼上做了一些處理 下面把代碼貼出來一部分邏輯
while (_go) { //判斷是否有需要處理的數據 var count = ToOne_Merge_Task.Where("UpdateStatus = 1 And IsMoocUpdate = 0 and IsValidType =0").Count(); if (count == 0) { Console.WriteLine("Sleep:1500"); Thread.Sleep(1500); continue; } try { //調用方法 getUpdateUserList(moocUserList); } catch (Exception ex) { log.Error(ExceptionUtil.WriteException(ex)); } finally { Console.WriteLine("Sleep:1500"); Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd.HH:mm:ss.fffffff")); Console.WriteLine(); Thread.Sleep(1500); } }
static void getUpdateUserList(List<MOOC_User> moocUserList) { //每次取10條需要處理的數據 List<ToOne_Merge_Task> mergeTaskList = (from q in ToOne_Merge_Task.CreateContext() where q.UpdateStatus == 1 && q.IsMoocUpdate == 0 && q.IsValidType == 0 select q).Take(10).ToList(); if (mergeTaskList.Count() > 0) { var taskIds = (from q in mergeTaskList select q.Id).ToList(); var formatTaskIds = StringUtil.CollectionToCommaDelimitedString(taskIds, "'"); //查出當前正在處理的數據有多少 var count = (from q in ToOne_Merge_Task.CreateContext() where q.UpdateStatus == 1 && q.IsMoocUpdate == 2 && q.IsValidType == 0 select q).Count(); //設定30個閥值 減少對數據庫的壓力 if (count <= 20) { //將需要處理的數據狀態改為2 加入到更新的隊列里面 ToOne_Merge_Task.Where(string.Format("id in ({0})", formatTaskIds)).Set("IsMoocUpdate", 2).Update(); try { foreach (var mergeTask in mergeTaskList) { //這里創建多個異步任務 var task = new Task(delegate() { updateUserInfo(mergeTask.Id, mergeTask.IcveUserId, mergeTask.ZjyUserId, moocUserList); }); task.Start(); } } catch (Exception ex) { log.Error(ExceptionUtil.WriteException(ex)); } } } }
主要的思路是使用數據庫的某個字段來實時判斷當前執行的任務數,防止線程開啟過多,同時防止對數據庫的壓力過大。
由於使用了多線程異步的邏輯,在代碼的執行速度上面提升了很多。相比之前的提示是很可觀的。
以上就是使用task來同步數據的一些嘗試。希望看到這篇文章的朋友可以多提意見,不對的地方歡迎指出。