一直對多線程不感冒,常見的場景下用不到這種技術,所以一直不願去了解,但是遇到一些耗時的任務時就要考慮了。下面的例子是項目中遇到的,不想說這個如何的高深,只想說我也不是很了解到底如何工作的,到底帶來了多少的效率提升。書上的理論要多枯燥有多枯燥,枯燥的我都不想去動手寫里面的例子,下面的例子是實際用到的,還有些意思,不管怎么說開個頭吧。
1.ManualResetEvent[] doEvents = new ManualResetEvent[threadCount];
通知一個或多個正在等待的線程已發生事件。 誰通知誰,發生的事情是指?一頭霧水
2.ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] });
線程池中創建一個線程池線程來執行指定方法(用委托WaitCallBack表示),並將該線程排入線程池的隊列等待執行。
3.doEvent.Set();
將事件狀態設置為終止狀態,允許一個或多個等待線程繼續。
4.WaitHandle.WaitAll(doEvents);
等待指定數組中的所有元素都收到信號。
來看下面的例子,
SendDealSucessEmailFromQueue()方法:threadCount是一個配置項,指定一次要開多少個線程,曾有人建議為了安全起見,機器是有好個核就開多少個線程這里假設是5,不知道有沒有道理有待研究。dataNum也是一個配置項,指定一個線程要處理多少訂單數據,這里假設是50。ManualResetEvent[] doEvents = new ManualResetEvent[threadCount],指定每次啟動5個線程。然后循環doEvents[i] = new ManualResetEvent(false);使用false作為參數通知當前線程等待此線程完成之后再完成當前線程的工作。ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] }); 在線程池中添加線程來執行DealData方法,給這個方法傳遞一個參數,這個參數是一個對象數組,數組的第一項是當前第幾個線程,就是線程號,第二個是線程的事件(請原諒我這么翻譯,貌似沒有看到ManualResetEvent這個類的中文名字是什么,看樣子像手動重置事件,我這里簡稱線程事件)。注意for循環外面的一句話WaitHandle.WaitAll(doEvents);,這里先不解釋待說完循環中調用的DealData方法之后再說。
DealData(object eventParams)方法:上面說到給他傳遞的參數是一個對象數組,所以int threadId = (int)(((object[])eventParams)[0]); ManualResetEvent doEvent = (ManualResetEvent)(((object[])eventParams)[1]);這兩句很好理解,就是從這個對象數組中把實參值轉換過來。int tmpCount = orderList.Count / threadCount;計算每個線程要處理的訂單數量,要說的是因為要判斷存在有要處理的訂單才會開線程來處理,所以使用靜態變量orderList。下面的這個for循環需要耐心琢磨,其實在SendDealSucessEmailFromQueue()這個方法執行完之后就已經開了5個線程,這時每個線程在做些什么事情我們是不能控制的,只知道有5個線程像脫韁野馬一樣地跑,在執行DealData(object eventParams)的時候是不知道當前是的線程是這5匹馬中的那一匹,所以在這個for循環中我們就需要給他們分配工作,具體來說就是第0個線程處理0-49個訂單,第1個線程處理50-99個,第2個線程處理100-149個,第3個線程處理200-249個,注意我這里都是用數組下標來解釋。分配完即執行,執行完之后調用doEvent.Set();將當前線程置為終止。
再回到SendDealSucessEmailFromQueue()方法的最后一句WaitHandle.WaitAll(doEvents);等待線程事件中的所有任務執行完成之后繼續當前線程,就是繼續往下跑了,就是主程序了,可以理解為回到Main()方法中了。
/// <summary> /// 處理訂單郵件 /// </summary> public class OrderEmailProcess { /// <summary> /// 日志 /// </summary> private static readonly ILog logger = LogManager.GetLogger(typeof(OrderEmailProcess)); static List<Corp.CorpFlightADTKJob.Entity.FltOrderInfoEntity.OrderInfoEntity> orderList = new List<Entity.FltOrderInfoEntity.OrderInfoEntity>(); static int threadCount = 0; static int dataNum = 0; /// <summary> /// 發送成交確認郵件 /// 因為發郵件接口中占用了emailtype:8,所以使用4096,這里需要處理emailtype /// </summary> public static void SendDealSucessEmailFromQueue() { threadCount = int.Parse(ConfigurationManager.AppSettings["ThreadCount"]); dataNum = int.Parse(ConfigurationManager.AppSettings["DataNum"]); string logMsg = ""; orderList = CorpProcessDB.GetDealConfirmEmailQueue(ConnStrFactory.CorpProcessDB_Select, threadCount * dataNum); if (orderList != null && orderList.Count > 0) { //記錄日志 for (int i = 0; i < orderList.Count; i++) { logMsg += string.Format("OrderID:{0} Uid:{1} EmailType:{2}", orderList[i].OrderID, orderList[i].Uid, orderList[i].EmailType) + "\n"; } logger.Info("自動發郵件", logMsg); ManualResetEvent[] doEvents = new ManualResetEvent[threadCount]; for (int i = 0; i < threadCount; i++) { doEvents[i] = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] }); } WaitHandle.WaitAll(doEvents); } } /// <summary> /// 處理線程方法 /// </summary> /// <param name="eventParams"></param> private static void DealData(object eventParams) { int threadId = (int)(((object[])eventParams)[0]); //當前線程ID ManualResetEvent doEvent = (ManualResetEvent)(((object[])eventParams)[1]); int tmpCount = orderList.Count / threadCount; //平均每個線程處理數據 for (int i = 0; i < orderList.Count; i++) { if (i < (threadId) * tmpCount) continue; if ((i >= (threadId + 1) * tmpCount) && threadId != (threadCount - 1)) break; doDetailEvent(orderList[i]); } doEvent.Set(); } /// <summary> /// 發郵件 /// </summary> /// <param name="item"></param> /// <param name="threadId"></param> private static void doDetailEvent(Corp.CorpFlightADTKJob.Entity.FltOrderInfoEntity.OrderInfoEntity order) { int emailsendtime = order.EmailType; if ((emailsendtime & 8) == 8) { emailsendtime = emailsendtime + 4088; } CorpEmaiWSlHelper.SendCorpConfirmMail(order.OrderID, order.Uid, "N", emailsendtime, ""); } }
如果你覺得上面的例子跑不起來,那么下面的例子可以讓你在自己的機器上跑起來。這是一個web頁面,OrderInfoEntity這個類寫在另外一個文件中,這里和在一起展示。
public partial class About : System.Web.UI.Page { int threadCount = 5; int dataNum = 50; static string msg = ""; protected void Page_Load(object sender, EventArgs e) { msg = ""; SendDealSucessEmailFromQueue(); Response.Write(msg); } public List<OrderInfoEntity> orderList; public void SendDealSucessEmailFromQueue() { threadCount = 5; dataNum = 50; orderList = GetDealConfirmEmailQueue(threadCount * dataNum); if (orderList != null && orderList.Count > 0) { ManualResetEvent[] doEvents = new ManualResetEvent[threadCount]; for (int i = 0; i < threadCount; i++) { doEvents[i] = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] }); } WaitHandle.WaitAll(doEvents); } } public void DealData(object eventParams) { int threadId = (int)(((object[])eventParams)[0]); //當前線程ID ManualResetEvent doEvent = (ManualResetEvent)(((object[])eventParams)[1]); int tmpCount = orderList.Count / threadCount; //平均每個線程處理數據 for (int i = 0; i < orderList.Count; i++) { if (i < (threadId) * tmpCount) continue; if ((i >= (threadId + 1) * tmpCount) && threadId != (threadCount - 1)) break; doDetailEvent(orderList[i]); } doEvent.Set(); } static void doDetailEvent(OrderInfoEntity order) { msg += order.OrderID.ToString() + "</br>"; } /// <summary> /// 添加訂單列表 /// </summary> /// <param name="p"></param> /// <returns></returns> private static List<OrderInfoEntity> GetDealConfirmEmailQueue(int p) { List<OrderInfoEntity> result = new List<OrderInfoEntity>(); for (int i = 0; i < p; i++) { OrderInfoEntity entity = new OrderInfoEntity(i + 1); result.Add(entity); } return result; } } public class OrderInfoEntity { private int _orderID; public int OrderID { get { return _orderID; } set { _orderID = value; } } public OrderInfoEntity(int orderid) { this._orderID = orderid; } }
這個方法也是異步讀取的方法,不過簡單一些。
#region 異步讀取數據 long logID1 = RequestManager.newSubRequest(), logID2 = RequestManager.newSubRequest(), logID3 = RequestManager.newSubRequest(); Task<List<CorpFlightSearchFlightsEntity>> taskNormalFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchNormalFlights(searchRequest, logID1)); Task<List<CorpFlightSearchFlightsEntity>> taskCorpFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchCorpFlightsOnly(searchRequest, logID2)); Task<List<CorpFlightSearchFlightsEntity>> taskMultiSpecialFlts = null; if (searchRequest.FlightWay == "D") { taskMultiSpecialFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchMultiSpecialFlights(searchRequest, logID3)); Task.WaitAll(taskNormalFlts, taskCorpFlts, taskMultiSpecialFlts); } else { Task.WaitAll(taskNormalFlts, taskCorpFlts); } normalFlts = taskNormalFlts.Result; corpFlts = taskCorpFlts.Result; multiSpecialFlts = taskMultiSpecialFlts != null ? taskMultiSpecialFlts.Result : null; #endregion
據新來的高手說下面這樣寫才能保證速度,上面的方法只能提高吞吐量,並不能防止阻塞。
Task<List<CorpFlightSearchFlightsEntity>> taskNormalFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchNormalFlights(searchRequest, logID1)); Task<List<CorpFlightSearchFlightsEntity>> taskCorpFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchCorpFlightsOnly(searchRequest, logID2)); Task<List<CorpFlightSearchFlightsEntity>> taskMultiSpecialFlts = null; if (searchRequest.FlightWay == "D") { taskMultiSpecialFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchMultiSpecialFlights(searchRequest, logID3)); //Task.WaitAll(taskNormalFlts, taskCorpFlts, taskMultiSpecialFlts); normalFlts = taskNormalFlts.ContinueWith(t => t.Result).Result; corpFlts = taskCorpFlts.ContinueWith(t => t.Result).Result; multiSpecialFlts = taskMultiSpecialFlts.ContinueWith(t => t.Result).Result; } else { //Task.WaitAll(taskNormalFlts, taskCorpFlts); normalFlts = taskNormalFlts.ContinueWith(t => t.Result).Result; corpFlts = taskCorpFlts.ContinueWith(t => t.Result).Result; } normalFlts = taskNormalFlts.Result; corpFlts = taskCorpFlts.Result; multiSpecialFlts = taskMultiSpecialFlts != null ? taskMultiSpecialFlts.Result : null;
還有一種簡寫方法,在沒有前后依賴關系,沒有返回值的情況下還可以使用Parallel.Invoke方法,如下:
if (this._appFlightsList.Count > 0) { Parallel.Invoke(() => { Parallel.ForEach(_appFlightsList, ShowFlightsUICommon.SetAppFlight); this._appFlightsList.Sort(AppFlightSorter.CorpSort); }, () => { if (this._passengerTypeString.Trim().ToUpper() == "ADU") { if (this._lowestPrices != null && this._lowestPrices.Count > 0) { this._lowestPrices.Sort(LowestPriceSoter.CompareLowestPriceByDate); } ShowFlightsUICommon.SetCorpSingleTripAppSpecialSummary4OnlineCNOnly( this._specialSummary, corpFlightsResponse.SingleTripSummaryList); if (this._corpSearchMultipleSpecialResponse != null && this._corpSearchMultipleSpecialResponse.Groups.Count > 0) { ShowFlightsUICommon.SetCorpMultipleAppSpecialSummary( this._specialSummary, this._corpSearchMultipleSpecialResponse.Groups); } if (this._corpSearchTransitSpecialResponse != null && this._corpSearchTransitSpecialResponse.LowGroups.Count > 0) { ShowFlightsUICommon.SetCorpTransitAppSpecialSummary( this._specialSummary, this._corpSearchTransitSpecialResponse.LowGroups); } ShowFlightsUICommon.SetCorpAppSpecialPrice(this._specialSummary, this._IsCalcNetPrice, this._SaleRate); FlightProcess.FlightsFilterWithAccount(this._specialSummary, this._AccountID); } }); }
