所謂半同步半異步是指,在某個方法調用中,有些代碼行是同步執行方式,有些代碼行是異步執行方式,下面我們來舉個例子,還是以經典的PlaceOrder來說,哈哈。
PlaceOrder的主要邏輯:
public bool PlaceOrder(OrderInfo order) { //驗證Order合法性 //OrderInfo增加到倉儲 //生成order的pdf //通知客戶,email方式 }
我們假設做出如下決定:
public bool PlaceOrder(OrderInfo order) { //驗證Order合法性 (同步執行) //OrderInfo增加到倉儲 (異步執行,考慮到鎖,要放到消息隊列中,讓后台Worker來執行具體的sql操作) //生成order的pdf (同步執行,其實也可以異步,此處demo意圖,因此做成同步執行) //通知客戶,email方式 (同步執行,同上) }
如上面所示,如果我們只是在"OrderInfo增加到倉儲"這里通過Async方式(無論是多線程,或者是msmq、rabbitq),如果只是觸發這個異步執行,那么到函數返回時,很可能這個異步操作還沒有執行完成,但是UI層(又或者其他函數)卻需要某些信息,比如OrderID。因此,在這個函數中,除了發起異步調用外,還需要wait,直到異步調用執行完成,其實這個函數相當於完成了2個線程並行執行,但是最終返回的條件必須是2個線程都執行完成。
我們來看下
EventWaitHandle signal = new EventWaitHandle(true, EventResetMode.ManualReset); //多線程之間的阻塞機制,需要這個變量 OrderInfo returnedOrderInfo = null; //從多線程那邊返回的變量會保存在這里 public bool PlaceOrder(OrderInfo order) { //驗證Order合法性 (同步執行) //OrderInfo增加到倉儲 (異步執行,考慮到鎖,要放到消息隊列中,讓后台Worker來執行具體的sql操作) string msgId=SendOrder2Queue(order); //發送order到msmq,並且獲取相應的消息ID,因為后面會用到這個消息ID //生成order的pdf //會在這里阻塞,直到應答隊列出現相應msgId的消息為止 signal.Reset(); //初始化變量 returnedOrderInfo = null; //初始化變量 ThreadPool.QueueUserWorkItem(new WaitCallback(CheckResponseQueue), msgId); //調用ms的ThreadPool進行多線程 signal.WaitOne(); //阻塞住,直到子線程發出Set命令 //通知客戶,email方式 //收尾邏輯 if (this.returnedOrderInfo != null && !this.returnedOrderInfo.OrderID.Equals(Guid.Empty)) { CloneOrder(order, this.returnedOrderInfo); return true; } return false; }
需要注意的地方是,發送email那里,必須放在最后面,想象下這種情況:插入數據庫失敗了,此時msmq的應答隊列就會返回相應的失敗消息,此時就需要根據結果來判斷,是否需要發送email了(上面代碼沒有考慮到這點)。
考慮到斷電等情況,需要將msmq創建為Transaction類型的queue,如下:
public static class Config { public static readonly string OrderQueueConnectionString = ".\\private$\\Order"; public static readonly string OrderResponseQueueConnectionString = ".\\private$\\OrderResponse"; public static void PrepareMSMQ() { if (MessageQueue.Exists(OrderQueueConnectionString)) MessageQueue.Delete(OrderQueueConnectionString); if (MessageQueue.Exists(OrderResponseQueueConnectionString)) MessageQueue.Delete(OrderResponseQueueConnectionString); MessageQueue.Create(OrderQueueConnectionString, true); //true,代表Transaction的隊列 MessageQueue.Create(OrderResponseQueueConnectionString, true); //同上 } }
大家已經看到了,其實有2個隊列需要建立,一個是發送隊列,另外一個是應答隊列;ThreadPool中的子線程就是用來Monitor這個應答隊列中是否有相應消息的,我們來看看代碼:
private void CheckResponseQueue(object state) { string msgId = (string)state; string sMessageConnectionString_ResponseQueue = Config.OrderResponseQueueConnectionString; MessageQueue mq_response = new MessageQueue(sMessageConnectionString_ResponseQueue); mq_response.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) }); //這個很重要,要傳輸什么樣的消息,就要寫相應的格式化程序 while (true) { System.Threading.Thread.Sleep(200); Message[] msgs = mq_response.GetAllMessages(); //這句會獲取所有的消息,但是不會從隊列中去掉,類似於Peek的效果 string foundMsgId = string.Empty; foreach(Message msg in msgs) { if (msg.Label == msgId) { foundMsgId = msg.Id; break; } } if (foundMsgId != string.Empty) { Message msg=mq_response.ReceiveById(foundMsgId); //找到msgId后,這句才會真正從隊列中移除消息 OrderInfo replyOrderInfo=(OrderInfo)msg.Body; returnedOrderInfo = new OrderInfo(); //賦值給返回變量 returnedOrderInfo.FirstName = replyOrderInfo.FirstName; returnedOrderInfo.LastName = replyOrderInfo.LastName; returnedOrderInfo.BuyWhat = replyOrderInfo.BuyWhat; returnedOrderInfo.OrderID = replyOrderInfo.OrderID; break; } } signal.Set(); //發出信號,代表完成,讓主線程繼續往下執行 }
下面再來看看消息發送后,真正的后台處理程序(是個Console程序):
class Program { static void Main(string[] args) { System.Threading.Thread.Sleep(2000); string msmq = Core.Config.OrderQueueConnectionString; if (!MessageQueue.Exists(msmq)) { Console.WriteLine("msmq not exist."); return; } MessageQueue mq = new MessageQueue(msmq); MessageQueueTransaction tx = new MessageQueueTransaction(); //事務性隊列必須用這個才能正確插入消息 mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) }); while (true) { Message msg = mq.Receive(); Console.WriteLine("processing {0}", msg.Id); OrderInfo order = (OrderInfo)msg.Body; order.FirstName +=", processed on "+DateTime.Now.ToString(); order.OrderID = Guid.NewGuid(); if (msg.ResponseQueue != null) //由於在發送消息的時候已經指定了應答隊列,因此此處只是簡單的判斷這個屬性就可以了 { Message msg_reply = new Message(); msg_reply.Body = order; msg_reply.Label = msg.Id; msg_reply.Formatter = new System.Messaging.XmlMessageFormatter(new Type[] { typeof(OrderInfo) }); tx.Begin(); //很重要 msg.ResponseQueue.Send(msg_reply, tx); tx.Commit(); //同上 } Console.WriteLine("done"); } } }
我們再來看下主程序:
class Program { static void Main(string[] args) { Config.PrepareMSMQ(); //准備msmq資源,比如create等 OrderInfo order = new OrderInfo(); order.FirstName = "aaron"; order.LastName = "dai"; order.BuyWhat = "Car"; Console.WriteLine("Old OrderID--->" + order.OrderID); Console.WriteLine("Old FirstName--->" + order.FirstName); OrderService srv = new OrderService(); bool success=srv.PlaceOrder(order); Console.WriteLine("*******************************"); Console.WriteLine("Processed OrderID--->" + order.OrderID); Console.WriteLine("Processed FirstName--->"+order.FirstName); Console.WriteLine("*******************************"); Console.ReadLine(); } }
就要好了,來看看效果圖: