關於使用多線程處理同步數據服務的一些嘗試


一、背景

1.業務背景

最近公司的項目在處理用戶合並的一些需求,主要涉及到的是三個數據庫,多個賬號合並成一個賬號,以及產生的歷史業務數據的更新和同步
這邊主要負責的是更新和同步這些數據 同時將兩個賬號合並一個賬號也一同處理。
2.代碼背景
使用的是C#
3.3個情況分開處理 涉及到需要合並的數據有10W條  這部分具體的業務數據涉及到30張表 總數據量還是很多的 
 

二、思路

1.一開始的思路 使用循環 一條一條的用戶數據去執行 但是執行效率很低 開始單獨處理的1400多條數 執行時間都長達1個多小時 太費時
foreach (var item in collection)
{
 
}

  

如果是完全把10W條需要合並的數據 涉及到的業務數據可能有幾十萬 甚至上百萬的話 會嚴重的耽誤時間
2.后來的思路 使用異步  多線程來處理 由於單個處理業務數據 不需要同步和返回狀態判斷 所有准備使用線程池直接處理
ThreadPool.QueueUserWorkItem(new WaitCallback(方法名));

  

由於是一開始使用 不是很熟悉 使用這個ThreadPool之后 沒有設置線程的最大和最小  由於服務是一直在處理數據 后來查閱到
應用程序中,線程把大部分的時間花費在等待狀態,等待某個事件發生,然后才能給予響應 
這一般使用ThreadPool(線程池)來解決;
最后就沒有使用這個方法。
 
3.后來發現短信發送的輪詢服務的方法可以借鑒,於是仔細的看了邏輯最后采用的方式便是使用task來處理。
Task的背后的實現也是使用了線程池線程,但它的性能優於ThreadPool,因為它使用的不是線程池的全局隊列,
而是使用的本地隊列,使線程之間的資源競爭減少。同時Task提供了豐富的API來管理線程、控制。但是相對前面的兩種耗內存,
Task依賴於CPU對於多核的CPU性能遠超前兩者,單核的CPU三者的性能沒什么差別。
由於擔心線程開的過多,導致濫用和浪費,於是在代碼上做了一些處理 下面把代碼貼出來一部分邏輯
 while (_go)
 
 
            {
                //判斷是否有需要處理的數據
                var count = ToOne_Merge_Task.Where("UpdateStatus = 1 And IsMoocUpdate = 0 and IsValidType =0").Count();
                if (count == 0)
                {
                    Console.WriteLine("Sleep:1500");
                    Thread.Sleep(1500);
                    continue;
                }
 
                try
                {
                    //調用方法
                    getUpdateUserList(moocUserList);
 
                }
                catch (Exception ex)
                {
 
                    log.Error(ExceptionUtil.WriteException(ex));
 
                }
                finally
                {
                    Console.WriteLine("Sleep:1500");
                    Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd.HH:mm:ss.fffffff"));
                    Console.WriteLine();
 
                    Thread.Sleep(1500);
                }
 
            }
 

 

      
 static void getUpdateUserList(List<MOOC_User> moocUserList)
        {
            //每次取10條需要處理的數據
            List<ToOne_Merge_Task> mergeTaskList = (from q in ToOne_Merge_Task.CreateContext()
                                                    where q.UpdateStatus == 1 && q.IsMoocUpdate == 0 && q.IsValidType == 0
                                                    select q).Take(10).ToList();
 
            if (mergeTaskList.Count() > 0)
            {
                var taskIds = (from q in mergeTaskList select q.Id).ToList();
                var formatTaskIds = StringUtil.CollectionToCommaDelimitedString(taskIds, "'");
 
                //查出當前正在處理的數據有多少
                var count = (from q in ToOne_Merge_Task.CreateContext()
                             where q.UpdateStatus == 1 && q.IsMoocUpdate == 2 && q.IsValidType == 0
                             select q).Count();
 
                //設定30個閥值 減少對數據庫的壓力
                if (count <= 20)
                {
                    //將需要處理的數據狀態改為2 加入到更新的隊列里面
                    ToOne_Merge_Task.Where(string.Format("id in ({0})", formatTaskIds)).Set("IsMoocUpdate", 2).Update();
 
                    try
                    {
 
                        foreach (var mergeTask in mergeTaskList)
                        {
                            //這里創建多個異步任務
                            var task = new Task(delegate()
                            {
                                updateUserInfo(mergeTask.Id, mergeTask.IcveUserId, mergeTask.ZjyUserId, moocUserList);
 
                            });
                            task.Start();
                        }
                    }
                    catch (Exception ex)
                    {
 
                        log.Error(ExceptionUtil.WriteException(ex));
                    }
                }
 
            }
        }

 

主要的思路是使用數據庫的某個字段來實時判斷當前執行的任務數,防止線程開啟過多,同時防止對數據庫的壓力過大。
由於使用了多線程異步的邏輯,在代碼的執行速度上面提升了很多。相比之前的提示是很可觀的。
以上就是使用task來同步數據的一些嘗試。希望看到這篇文章的朋友可以多提意見,不對的地方歡迎指出。

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM