半同步半異步模式的實現 - MSMQ實現


所謂半同步半異步是指,在某個方法調用中,有些代碼行是同步執行方式,有些代碼行是異步執行方式,下面我們來舉個例子,還是以經典的PlaceOrder來說,哈哈。

PlaceOrder的主要邏輯:

public bool PlaceOrder(OrderInfo order)
{
            //驗證Order合法性

            //OrderInfo增加到倉儲

            //生成order的pdf            

            //通知客戶,email方式
}

我們假設做出如下決定:

public bool PlaceOrder(OrderInfo order)
{
            //驗證Order合法性           (同步執行)

            //OrderInfo增加到倉儲       (異步執行,考慮到鎖,要放到消息隊列中,讓后台Worker來執行具體的sql操作)

            //生成order的pdf           (同步執行,其實也可以異步,此處demo意圖,因此做成同步執行)

            //通知客戶,email方式        (同步執行,同上)
}

如上面所示,如果我們只是在"OrderInfo增加到倉儲"這里通過Async方式(無論是多線程,或者是msmq、rabbitq),如果只是觸發這個異步執行,那么到函數返回時,很可能這個異步操作還沒有執行完成,但是UI層(又或者其他函數)卻需要某些信息,比如OrderID。因此,在這個函數中,除了發起異步調用外,還需要wait,直到異步調用執行完成,其實這個函數相當於完成了2個線程並行執行,但是最終返回的條件必須是2個線程都執行完成。

我們來看下

        EventWaitHandle signal = new EventWaitHandle(true, EventResetMode.ManualReset);   //多線程之間的阻塞機制,需要這個變量
        OrderInfo returnedOrderInfo = null;                    //從多線程那邊返回的變量會保存在這里 public bool PlaceOrder(OrderInfo order)
        {
            //驗證Order合法性           (同步執行)

            //OrderInfo增加到倉儲       (異步執行,考慮到鎖,要放到消息隊列中,讓后台Worker來執行具體的sql操作)
            string msgId=SendOrder2Queue(order);     //發送order到msmq,並且獲取相應的消息ID,因為后面會用到這個消息ID //生成order的pdf

            //會在這里阻塞,直到應答隊列出現相應msgId的消息為止
            signal.Reset();                   //初始化變量
            returnedOrderInfo = null;         //初始化變量
            ThreadPool.QueueUserWorkItem(new WaitCallback(CheckResponseQueue), msgId);     //調用ms的ThreadPool進行多線程
            signal.WaitOne();                 //阻塞住,直到子線程發出Set命令 //通知客戶,email方式

            //收尾邏輯
            if (this.returnedOrderInfo != null && !this.returnedOrderInfo.OrderID.Equals(Guid.Empty))
            {
                CloneOrder(order, this.returnedOrderInfo);
                return true;
            }
            return false;
        }

 需要注意的地方是,發送email那里,必須放在最后面,想象下這種情況:插入數據庫失敗了,此時msmq的應答隊列就會返回相應的失敗消息,此時就需要根據結果來判斷,是否需要發送email了(上面代碼沒有考慮到這點)。

 考慮到斷電等情況,需要將msmq創建為Transaction類型的queue,如下:

public static class Config
    {
        public static readonly string OrderQueueConnectionString = ".\\private$\\Order";
        public static readonly string OrderResponseQueueConnectionString = ".\\private$\\OrderResponse";
        public static void PrepareMSMQ()
        {
            if (MessageQueue.Exists(OrderQueueConnectionString))
                MessageQueue.Delete(OrderQueueConnectionString);
            if (MessageQueue.Exists(OrderResponseQueueConnectionString))
                MessageQueue.Delete(OrderResponseQueueConnectionString);

            MessageQueue.Create(OrderQueueConnectionString, true);   //true,代表Transaction的隊列
            MessageQueue.Create(OrderResponseQueueConnectionString, true); //同上
        }
    }

 

大家已經看到了,其實有2個隊列需要建立,一個是發送隊列,另外一個是應答隊列;ThreadPool中的子線程就是用來Monitor這個應答隊列中是否有相應消息的,我們來看看代碼:

private void CheckResponseQueue(object state)
        {
            string msgId = (string)state;
            string sMessageConnectionString_ResponseQueue = Config.OrderResponseQueueConnectionString;
            MessageQueue mq_response = new MessageQueue(sMessageConnectionString_ResponseQueue);
            mq_response.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) });  //這個很重要,要傳輸什么樣的消息,就要寫相應的格式化程序 while (true)
            {
                System.Threading.Thread.Sleep(200);
                Message[] msgs = mq_response.GetAllMessages();      //這句會獲取所有的消息,但是不會從隊列中去掉,類似於Peek的效果 string foundMsgId = string.Empty;
                foreach(Message msg in msgs)
                {
                    if (msg.Label == msgId)
                    {
                        foundMsgId = msg.Id;
                        break;
                    }
                }
                if (foundMsgId != string.Empty)
                {
                    Message msg=mq_response.ReceiveById(foundMsgId);  //找到msgId后,這句才會真正從隊列中移除消息
                    OrderInfo replyOrderInfo=(OrderInfo)msg.Body;

                    returnedOrderInfo = new OrderInfo();              //賦值給返回變量
                    returnedOrderInfo.FirstName = replyOrderInfo.FirstName;
                    returnedOrderInfo.LastName = replyOrderInfo.LastName;
                    returnedOrderInfo.BuyWhat = replyOrderInfo.BuyWhat;
                    returnedOrderInfo.OrderID = replyOrderInfo.OrderID;
                    break;
                }
            }
            signal.Set();             //發出信號,代表完成,讓主線程繼續往下執行
        }

 

 下面再來看看消息發送后,真正的后台處理程序(是個Console程序):

class Program
    {
        static void Main(string[] args)
        {
            System.Threading.Thread.Sleep(2000);
            string msmq = Core.Config.OrderQueueConnectionString;
            if (!MessageQueue.Exists(msmq))
            {
                Console.WriteLine("msmq not exist.");
                return;
            }

            MessageQueue mq = new MessageQueue(msmq);
            MessageQueueTransaction tx = new MessageQueueTransaction();                    //事務性隊列必須用這個才能正確插入消息
            mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) });
            while (true)
            {
                Message msg = mq.Receive();
                Console.WriteLine("processing {0}", msg.Id);
                OrderInfo order = (OrderInfo)msg.Body;

                order.FirstName +=", processed on "+DateTime.Now.ToString();
                order.OrderID = Guid.NewGuid();

                if (msg.ResponseQueue != null)           //由於在發送消息的時候已經指定了應答隊列,因此此處只是簡單的判斷這個屬性就可以了
                {
                    Message msg_reply = new Message();
                    msg_reply.Body = order;
                    msg_reply.Label = msg.Id;
                    msg_reply.Formatter = new System.Messaging.XmlMessageFormatter(new Type[] { typeof(OrderInfo) });

                    tx.Begin();   //很重要
                    msg.ResponseQueue.Send(msg_reply, tx);
                    tx.Commit();  //同上
                }
                Console.WriteLine("done");
            }
        }
    }

 

我們再來看下主程序:

class Program
    {
        static void Main(string[] args)
        {
            Config.PrepareMSMQ();  //准備msmq資源,比如create等

            OrderInfo order = new OrderInfo();
            order.FirstName = "aaron";
            order.LastName = "dai";
            order.BuyWhat = "Car";

            Console.WriteLine("Old OrderID--->" + order.OrderID);
            Console.WriteLine("Old FirstName--->" + order.FirstName);

            OrderService srv = new OrderService();
            bool success=srv.PlaceOrder(order);

            Console.WriteLine("*******************************");
            Console.WriteLine("Processed OrderID--->" + order.OrderID);
            Console.WriteLine("Processed FirstName--->"+order.FirstName);
            Console.WriteLine("*******************************");
            Console.ReadLine();
        }
    }

 

就要好了,來看看效果圖:

 

 Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM