C# 線程手冊 第五章 多線程應用程序 一個多線程微軟消息隊列(MSMQ)監聽器


  在這部分,我們將使用ThreadPool 和MSMQ 進行消息收發。MSMQ 是一個分布式隊列,通過MSMQ 一個應用程序可以異步地與另外一個應用程序通信。

  在一個典型的場景中,我們要向維護一個隊列的MSMQ 服務器發送消息,MSMQ 發送方與MSMQ 服務器(特定隊列)之間創建一個連接並向那個隊列發送消息。一個MSMQ 接收器接收由MSMQ發送方發送的消息。MSMQ 接收方需要監聽一個特定的隊列以接收發送到這個隊列上的消息。MSMQ服務器在MSMQ發送方和接收方之間起到了一個中轉的作用,但MSMQ發送方不知道還有一個MSMQ接收方,反之亦然。

  在我們的程序中,我們將開發一個MSMQ發送程序(Windows 窗體應用)和一個MSMQ接收程序(控制台應用)。在我們的MSMQ發送方應用程序中(MSMQUI.cs), 我們使用ThreadPool 向隊列中發送一條消息:

private void SendButton_Click(object sender, EventArgs e)
{
    int count = Convert.ToInt32(countTxt.Text);
    while (count > 0)
    {
        ThreadPool.QueueUserWorkItem(new WaitCallback(SendMessage), countTxt.Text);
        count--;
    }
}

  在上面的代碼片段中(MSMQUI.cs), 我們創建了一個計數器來向MSMQ發送消息。我們通過一個靜態對象將消息傳遞給ThreadPool 並傳遞給WaitCallback 委托一個SendMessage() 方法的引用。換句話說,SendMessage() 方法將會給MSMQ 發送消息。

private void SendMessage(object state)
{
    if (mq != null)
    {
        try
        {
            msg.Body = state.ToString();
            mq.Send(msg);
        }
        catch (InvalidCastException ex)
        {
        }
    }
}

  SendMessage() 方法把靜態對象轉成字符串並發送給MSMQ。

  MSMQUI 應用程序運行起來與如下截圖類似:

2012-4-15 15-10-36

  在MSMQ 監聽器(MSMQListener.cs)中, 當一條消息到達隊列中以后我們從MSMQ服務端異步地接收消息通知。由於這個原因,我們創建了MSMQ 事件處理器並把MessageReceived() 方法名傳遞給ReceiveCompletedEventHandler 委托。

mq1 = new MessageQueue(@".\private$\myfirstq3");
mq1.ReceiveCompleted += new ReceiveCompletedEventHandler(MessageReceived);

  為了開始從MSMQ 接收消息,我們需要在消息隊列對象上調用BeginReceive() 方法。

mq1.BeginReceive(new TimeSpan(0,0,2));

  BeginReceive() 方法有一個TimeSpan 參數,它用來指示監聽器在消息到達隊列之前應該等待多長時間。如果沒有使用TimeSpan參數,應用程序將會阻塞在BeginReceive() 方發處直到消息到達MSMQ。

  當一條消息到達隊列后,MessageReceived() 方法由MSMQ事件處理器調用:

private static void MessageReceived(object source, ReceiveCompletedEventArgs asyncResult)
{
    bool isReceivedSucceed = true;
    MessageQueue mq = null;

    try
    {
        //Connect to the queue
        mq = (MessageQueue)source;
        Message m = mq.EndReceive(asyncResult.AsyncResult);
        m.Formatter = new System.Messaging.XmlMessageFormatter(new Type[] { typeof(string) });
        //Process the message here.
        Console.WriteLine(m.Body.ToString());
        ThreadPool.QueueUserWorkItem(new WaitCallback(InvokeMDAO), m.Body);
    }
    catch (MessageQueueException)
    {
        isReceivedSucceed = false;
    }
    catch (Exception ex)
    {
        isReceivedSucceed = false;
        Console.WriteLine(ex.Message);
    }
    finally
    {
        if (isReceivedSucceed)
        {
            mq1.BeginReceive(new TimeSpan(0, 0, 2));
        }
    }
}

  為了獲取接收到的消息,我們需要在消息隊列對象上調用EndReceive() 方法。在我們獲取到System.Messaging.Message 對象后,如何處理這個對象就完全取決於我們了。在一個現實世界應用程序中,我們通常會調用System.EnterpriseServices(COM + )對象來進行一些數據庫更新操作或者將消息傳遞給其他對象處理。這里我們可以使用ThreadPool 來保證對象調用可控。我們把消息體放到ThreadPool 中並把InvokeMDAO() 方法名傳遞給WaitCallback 委托。InvokeMDAO() 方法調用對象上的一個方法並在控制台上打印消息。如之前討論的那樣,我們可以在InvokeMDAO() 方法中調用一個System.EnterpriseServices(COM+)組件。這將直接控制創建的COM+對象數量。

下一篇介紹.NET 中的擴展性…


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM