what
MSMQ(Microsoft Message Queue),微軟消息隊列,用於應用程序之間相互通信的一種異步傳輸模式。應用程序可以分布在同台機器上,也可以分布於互聯的網絡中的任意位置。
基本原理:消息發送者把要發送的消息放入容器,也就是Message(消息),然后保存到系統公用空間的消息隊列中(Message Queue)中,本地或互聯位置上的消息接收程序再從隊列中取出發給它的消息進行處理。
消息類型可以是文本,圖像,自定義對象等。
消息隊列分為公共隊列和私有隊列。
why
一、用於進程間的通信。A進程把消息放進消息隊列,B進程去消息隊列獲取,屬於異步通信。
二、用於WEB並發請求時作緩沖排隊寫入。當多個請求並發插入數據庫時可能會造成服務器不可用,用戶等待時間過長 ,這時可以把所有請求全部放到消息隊列中,然后監視進程按先進先出順序依次寫入數據庫。
how
下面以一個精簡版的petshop4.0訂單提交代碼為例講解用消息隊列實現異步提交的方法步驟。
一、安裝MSMQ,在添加刪除組件界面勾選安裝。
二、新建私有隊列OrderQueue,可以安裝一個QueueExplorer隨時查看消息隊列信息。
三、提交訂單時如果選擇異步提交,則先把訂單信息對象插入到消息隊列,立即返回信息給用戶。
四、服務器運行一個多線程的監視進程不斷從消息隊列提取訂單對象,然后插入到數據庫。注意入隊列和出隊列的對象Formatter一定要相同不然會報異常。
五、部分代碼實現如下:
訂單實體類:
[Serializable]
public class Order
{
public string OrderID { get; set; }
public int MemberID { get; set; }
public string GoodsID { get; set; }
public DateTime SubmitTime { get; set; }
}
基本隊列類:
public class BaseQueue: IDisposable {
protected MessageQueue queue;
public BaseQueue(string queuePath)
{
queue = new MessageQueue(queuePath);
}
/// <summary>
/// 出隊列
/// </summary>
public virtual object Receive()
{
try
{
using (Message message = queue.Receive())
{
message.Formatter = queue.Formatter;
return message;
}
}
catch (MessageQueueException mqex)
{
throw;
}
}
/// <summary>
/// 入隊列
/// </summary>
public virtual void Send(object msg)
{
queue.Send(msg);
}
public void Dispose()
{
queue.Dispose();
}
}
訂單隊列類:
public class OrderQueue : BaseQueue
{
// Path example - FormatName:DIRECT=OS:MyMachineName\Private$\OrderQueueName
private static readonly string queuePath = ".\\private$\\orderqueue";
public OrderQueue()
: base(queuePath)
{
// 可以設置序列化格式
queue.Formatter = new BinaryMessageFormatter();
}
/// <summary>
/// 從消息隊列出一個訂單對象,先入先出
/// </summary>
public Order Receive()
{
object obj = base.Receive();
return (Order)((Message)obj).Body;
}
/// <summary>
/// 把一個訂單對象壓入消息隊列
/// </summary>
public void Send(Order orderMessage)
{
base.Send(orderMessage);
}
}
提交訂單代碼:
Order or = new Order();
or.OrderID = DateTime.Now.ToString("yyyyMMddHHmmss");
or.MemberID = int.Parse(TextBox2.Text.Trim());
or.GoodsID = DropDownList1.SelectedItem.Text.ToString();
or.SubmitTime = DateTime.Now;
try
{
OrderQueue orQ = new OrderQueue();
orQ.Send(or);
Label1.Text = "提交成功!";
}
catch (Exception ex)
{
Label1.Text = "提交失敗!詳細原因:" + ex.Message;
}
多線程讀取代碼:
private static int batchSize = 4;//int.Parse(ConfigurationManager.AppSettings["BatchSize"]);
private static int threadCount = 1;//int.Parse(ConfigurationManager.AppSettings["ThreadCount"]);
Thread[] workerThreads = new Thread[threadCount];
private static int totalOrdersProcessed = 0;
public Form1()
{
InitializeComponent();
}
//啟動線程
private void button1_Click(object sender, EventArgs e)
{
Thread workTicketThread;
for (int i = 0; i < threadCount; i++)
{
workTicketThread = new Thread(new ThreadStart(ProcessOrders));
// Make this a background thread, so it will terminate when the main thread/process is de-activated
workTicketThread.IsBackground = true;
workTicketThread.SetApartmentState(ApartmentState.STA);
// Start the Work
workTicketThread.Start();
workerThreads[i] = workTicketThread;
}
MessageBox.Show("starting...");
}
//結束線程
private void button2_Click(object sender, EventArgs e)
{
//abort all threads
for (int i = 0; i < workerThreads.Length; i++)
{
workerThreads[i].Abort();
}
MessageBox.Show(totalOrdersProcessed + " Orders processed.");
}
/// <summary>
/// Process a batch of asynchronous orders from the queue and submit them to the database within a transaction
/// </summary>
private static void ProcessOrders()
{
OrderQueue orQueue = new OrderQueue();
while (true)
{
int processedItems = 0;
ArrayList queueOrders = new ArrayList();
for (int j = 0; j < batchSize; j++)
{
try
{
queueOrders.Add(orQueue.Receive());
}
catch (Exception)
{
j = batchSize;
}
}
//process the queued orders
for (int k = 0; k < queueOrders.Count; k++)
{
//orQueue.Insert((Order)queueOrders[k]); PETSHOP是插入到數據庫方法,以下改為寫到日志文件
Order o = (Order)queueOrders[k];
MyLog.WriteLog("訂單號:" + o.OrderID + " 商品ID:" + o.GoodsID);
processedItems++;
totalOrdersProcessed++;
}
MessageBox.Show("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items");
}
}
