本文是基於http://www.cnblogs.com/cheng-lei/articles/7274513.html的項目結構進行搭建的,了解之前請先閱讀http://www.cnblogs.com/cheng-lei/category/1047427.html中的前四篇文章。
工具 — Nuget包管理器 —程序包管理器控制台
PM> Install-Package RabbitMQ.Client -Version 5.1.0
PM> Install-Package EasyNetQ -Version 3.2.0
一、項目搭建
1. Weiz.MQ 項目,消息隊列的通用處理類庫,用於正在的訂閱和發布消息。
1、在BusBuilder.cs中添加了對CreateAdvancedBus函數的實現。

1 public static IAdvancedBus CreateAdvancedBus() 2 { 3 // 消息服務器連接字符串 4 string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456"; 5 if (connString == null || connString == string.Empty) 6 { 7 throw new Exception("messageserver connection string is missing or empty"); 8 } 9 10 return RabbitHutch.CreateBus(connString).Advanced; 11 }
2、在MQHelper.cs中添加了對Send、Receive函數的實現。

1 public static void Send(MyMessage msg) 2 { 3 // 創建消息bus 4 IBus bus = BusBuilder.CreateMessageBus(); 5 6 try 7 { 8 bus.Send(msg.MessageRouter, msg); 9 } 10 catch (EasyNetQException ex) 11 { 12 //處理連接消息服務器異常 13 Console.WriteLine("Send Error!!!"); 14 } 15 16 bus.Dispose();//與數據庫connection類似,使用后記得銷毀bus對象 17 } 18 19 public static void Receive(MyMessage msg, IProcessMessage ipro) 20 { 21 // 創建消息bus 22 IBus bus = BusBuilder.CreateMessageBus(); 23 24 try 25 { 26 bus.Receive<MyMessage>(msg.MessageRouter, message => ipro.ProcessMsg(message)); 27 } 28 catch (EasyNetQException ex) 29 { 30 //處理連接消息服務器異常 31 Console.WriteLine("Receive Error!!!"); 32 } 33 }
3、在MQHelper.cs中添加了對采用Fanout、Direct、Topic交換機類型進行消息收發功能的實現。

1 public static void ProducerFanoutMessage(MyMessage msg, string exchangeName = "chending.fanout") 2 { 3 var advancedBus = BusBuilder.CreateAdvancedBus(); 4 5 if (advancedBus.IsConnected) 6 { 7 var exchange = advancedBus.ExchangeDeclare(exchangeName, ExchangeType.Fanout); 8 9 advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg)); 10 } 11 else 12 { 13 Console.WriteLine("Can't connect"); 14 } 15 16 } 17 18 public static void ConsumeFanoutMessage(string exchageName = "chending.fanout", string queueName = "chending.fanout.queue") 19 { 20 var advancedBus = BusBuilder.CreateAdvancedBus(); 21 var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Fanout); 22 23 var queue = advancedBus.QueueDeclare(queueName); 24 advancedBus.Bind(exchange, queue, queueName); 25 advancedBus.Consume(queue, registration => 26 { 27 registration.Add<MyMessage>((message, info) => { Console.WriteLine("Fanout Content: {0}", message.Body.MessageBody); }); 28 }); 29 } 30 31 public static void ProducerDirectMessage(MyMessage msg, string queueName = "chending.direct.queue") 32 { 33 var advancedBus = BusBuilder.CreateAdvancedBus(); 34 35 if (advancedBus.IsConnected) 36 { 37 var queue = advancedBus.QueueDeclare(queueName); 38 39 advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg)); 40 } 41 else 42 { 43 Console.WriteLine("Can't connect"); 44 } 45 46 } 47 48 public static void ConsumeDirectMessage(string exchageName = "chending.direct", string queueName = "chending.direct.queue") 49 { 50 var advancedBus = BusBuilder.CreateAdvancedBus(); 51 var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Direct); 52 53 var queue = advancedBus.QueueDeclare(queueName); 54 advancedBus.Bind(exchange, queue, queueName); 55 advancedBus.Consume(queue, registration => 56 { 57 registration.Add<MyMessage>((message, info) => { Console.WriteLine("Direct Content: {0}", message.Body.MessageBody); }); 58 }); 59 } 60 61 public static void ProducerTopicMessage(MyMessage msg) 62 { 63 //// 創建消息bus 64 IBus bus = BusBuilder.CreateMessageBus(); 65 66 try 67 { 68 bus.Publish(msg, x => x.WithTopic(msg.MessageRouter)); 69 } 70 catch (EasyNetQException ex) 71 { 72 //處理連接消息服務器異常 73 } 74 75 bus.Dispose();//與數據庫connection類似,使用后記得銷毀bus對象 76 } 77 78 public static void ConsumeTopicMessage(MyMessage msg) 79 { 80 //// 創建消息bus 81 IBus bus = BusBuilder.CreateMessageBus(); 82 83 try 84 { 85 bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine("Topic Content: {0}", message.MessageBody), x => x.WithTopic(msg.MessageRouter)); 86 } 87 catch (EasyNetQException ex) 88 { 89 //處理連接消息服務器異常 90 } 91 }
4、在ProduceThread.cs中添加了消息發布線程對前面實現的功能進行測試(也可以不作為線程直接調用)。

1 public class ProduceThread 2 { 3 public static void ProduceMessage() { 4 MyMessage msg1 = new MyMessage(); 5 msg1.MessageID = "0-1"; 6 msg1.MessageBody = DateTime.Now.ToString(); 7 msg1.MessageRouter = "chending.fanout"; 8 msg1.MessageTitle = "0-1"; 9 MyMessage msg2 = new MyMessage(); 10 msg2.MessageID = "0-2"; 11 msg2.MessageBody = DateTime.Now.ToString(); 12 msg2.MessageRouter = "chending.direct"; 13 msg2.MessageTitle = "0-2"; 14 MyMessage msg3 = new MyMessage(); 15 msg3.MessageID = "0-3"; 16 msg3.MessageBody = DateTime.Now.ToString(); 17 msg3.MessageRouter = "chending.topic.a.b"; 18 msg3.MessageTitle = "0-3"; 19 20 //MQHelper.Send(msg1); 21 MQHelper.ProducerFanoutMessage(msg1); 22 MQHelper.ProducerDirectMessage(msg2); 23 MQHelper.ProducerTopicMessage(msg3); 24 25 for (int i = 0; i < 10; i++) { 26 MyMessage msg = new MyMessage(); 27 msg.MessageID = (i+1).ToString(); 28 msg.MessageBody = DateTime.Now.ToString(); 29 if (i % 2 == 0) 30 msg.MessageRouter = "cd.test.demo.a.b"; 31 else 32 msg.MessageRouter = "cd.test.demo.a"; 33 msg.MessageTitle = (i+1).ToString(); 34 35 MQHelper.Publish(msg); 36 //Console.WriteLine("Message{0} is published!!!", i + 1); 37 Thread.Sleep(200); 38 } 39 }
2. Weiz.Producer(生成者)已棄用(改用ProduceThread.cs)
3. Weiz.Consumer 就是Consumer(消費者)
1、修改OrderProcessMessage.cs,實現不同的消息處理方式。

1 public class OrderProcessMessage : MQ.IProcessMessage 2 { 3 public void ProcessMsg(MQ.MyMessage msg) 4 { 5 Console.WriteLine("ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody); 6 } 7 } 8 public class OrderProcessMessage1:MQ.IProcessMessage 9 { 10 public void ProcessMsg(MQ.MyMessage msg) 11 { 12 Console.WriteLine("Process1 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody); 13 } 14 } 15 16 public class OrderProcessMessage2 : MQ.IProcessMessage 17 { 18 public void ProcessMsg(MQ.MyMessage msg) 19 { 20 Console.WriteLine("Process2 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody); 21 } 22 }
2、對Program.cs中的Main調用進行了修改。

1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //OrderProcessMessage order = new OrderProcessMessage(); 6 OrderProcessMessage1 order1 = new OrderProcessMessage1(); 7 OrderProcessMessage2 order2 = new OrderProcessMessage2(); 8 9 //MyMessage msg = new MyMessage(); 10 MyMessage msg1 = new MyMessage(); 11 MyMessage msg2 = new MyMessage(); 12 MyMessage msg3 = new MyMessage(); 13 14 //msg.MessageRouter = "cd.test.demo"; 15 msg1.MessageRouter = "cd.test.demo.*"; 16 msg2.MessageRouter = "cd.test.demo.#"; 17 msg3.MessageRouter = "chending.topic.#"; 18 19 //MQHelper.Receive(msg, order); 20 MQHelper.ConsumeFanoutMessage(); 21 MQHelper.ConsumeDirectMessage(); 22 MQHelper.ConsumeTopicMessage(msg3); 23 MQHelper.Subscribe(msg1, order1); 24 //MQHelper.Subscribe(msg1, order2); 25 MQHelper.Subscribe(msg2, order2); 26 27 Console.WriteLine("Listening for messages."); 28 29 ProduceThread.ProduceMessage(); 30 31 //ThreadStart threadStart = ProduceThread.ProduceMessage; 32 //Thread thread = new Thread(threadStart); 33 //thread.Start(); 34 } 35 }
二、項目運行
啟動 Weiz.Consumer (消費者),啟動消費者,會自動在RabbitMQ 服務器上創建相關的exchange 和 queue ,同時調用的ProduceThread.ProduceMessage函數會發送消息,接收到的信息會在Console命令行中進行顯示。
項目源碼:百度雲鏈接:https://pan.baidu.com/s/1sCJqY2fKphXV0ntMIytcVw 密碼:hfz5