C# 如何使用 RabbitMQ 實現消息收發


本文是基於http://www.cnblogs.com/cheng-lei/articles/7274513.html的項目結構進行搭建的,了解之前請先閱讀http://www.cnblogs.com/cheng-lei/category/1047427.html中的前四篇文章。

 

工具 — Nuget包管理器 —程序包管理器控制台

PM> Install-Package RabbitMQ.Client -Version 5.1.0

PM> Install-Package EasyNetQ -Version 3.2.0

 

一、項目搭建

1. Weiz.MQ 項目,消息隊列的通用處理類庫,用於正在的訂閱和發布消息。

  1、在BusBuilder.cs中添加了對CreateAdvancedBus函數的實現。

 1 public static IAdvancedBus CreateAdvancedBus()
 2         {
 3             // 消息服務器連接字符串
 4             string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456";
 5             if (connString == null || connString == string.Empty)
 6             {
 7                 throw new Exception("messageserver connection string is missing or empty");
 8             }
 9 
10             return RabbitHutch.CreateBus(connString).Advanced;
11         }
View Code

 

  2、在MQHelper.cs中添加了對Send、Receive函數的實現。

 1 public static void Send(MyMessage msg)
 2         {
 3             // 創建消息bus
 4             IBus bus = BusBuilder.CreateMessageBus();
 5 
 6             try
 7             {
 8                 bus.Send(msg.MessageRouter, msg);
 9             }
10             catch (EasyNetQException ex)
11             {
12                 //處理連接消息服務器異常 
13                 Console.WriteLine("Send Error!!!");
14             }
15 
16             bus.Dispose();//與數據庫connection類似,使用后記得銷毀bus對象
17         }
18 
19         public static void Receive(MyMessage msg, IProcessMessage ipro)
20         {
21             // 創建消息bus
22             IBus bus = BusBuilder.CreateMessageBus();
23 
24             try
25             {
26                 bus.Receive<MyMessage>(msg.MessageRouter, message => ipro.ProcessMsg(message));
27             }
28             catch (EasyNetQException ex)
29             {
30                 //處理連接消息服務器異常 
31                 Console.WriteLine("Receive Error!!!");
32             }
33         }
View Code

 

  3、在MQHelper.cs中添加了對采用Fanout、Direct、Topic交換機類型進行消息收發功能的實現。

 1 public static void ProducerFanoutMessage(MyMessage msg, string exchangeName = "chending.fanout")
 2         {
 3             var advancedBus = BusBuilder.CreateAdvancedBus();
 4 
 5             if (advancedBus.IsConnected)
 6             {
 7                 var exchange = advancedBus.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
 8 
 9                 advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg));
10             }
11             else
12             {
13                 Console.WriteLine("Can't connect");
14             }
15 
16         }
17 
18         public static void ConsumeFanoutMessage(string exchageName = "chending.fanout", string queueName = "chending.fanout.queue")
19         {
20             var advancedBus = BusBuilder.CreateAdvancedBus();
21             var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Fanout);
22 
23             var queue = advancedBus.QueueDeclare(queueName);
24             advancedBus.Bind(exchange, queue, queueName);
25             advancedBus.Consume(queue, registration =>
26             {
27                 registration.Add<MyMessage>((message, info) => { Console.WriteLine("Fanout Content: {0}", message.Body.MessageBody); });
28             });
29         }
30 
31         public static void ProducerDirectMessage(MyMessage msg, string queueName = "chending.direct.queue")
32         {
33             var advancedBus = BusBuilder.CreateAdvancedBus();
34 
35             if (advancedBus.IsConnected)
36             {
37                 var queue = advancedBus.QueueDeclare(queueName);
38 
39                 advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg));
40             }
41             else
42             {
43                 Console.WriteLine("Can't connect");
44             }
45 
46         }
47 
48         public static void ConsumeDirectMessage(string exchageName = "chending.direct", string queueName = "chending.direct.queue")
49         {
50             var advancedBus = BusBuilder.CreateAdvancedBus();
51             var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Direct);
52 
53             var queue = advancedBus.QueueDeclare(queueName);
54             advancedBus.Bind(exchange, queue, queueName);
55             advancedBus.Consume(queue, registration =>
56             {
57                 registration.Add<MyMessage>((message, info) => { Console.WriteLine("Direct Content: {0}", message.Body.MessageBody); });
58             });
59         }
60 
61         public static void ProducerTopicMessage(MyMessage msg)
62         {
63             //// 創建消息bus
64             IBus bus = BusBuilder.CreateMessageBus();
65 
66             try
67             {
68                 bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
69             }
70             catch (EasyNetQException ex)
71             {
72                 //處理連接消息服務器異常 
73             }
74 
75             bus.Dispose();//與數據庫connection類似,使用后記得銷毀bus對象
76         }
77 
78         public static void ConsumeTopicMessage(MyMessage msg)
79         {
80             //// 創建消息bus
81             IBus bus = BusBuilder.CreateMessageBus();
82 
83             try
84             {
85                 bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine("Topic Content: {0}", message.MessageBody), x => x.WithTopic(msg.MessageRouter));
86             }
87             catch (EasyNetQException ex)
88             {
89                 //處理連接消息服務器異常 
90             }
91         }
View Code

  4、在ProduceThread.cs中添加了消息發布線程對前面實現的功能進行測試(也可以不作為線程直接調用)。

 1 public class ProduceThread
 2     {
 3         public static void ProduceMessage() {
 4             MyMessage msg1 = new MyMessage();
 5             msg1.MessageID = "0-1";
 6             msg1.MessageBody = DateTime.Now.ToString();
 7             msg1.MessageRouter = "chending.fanout";
 8             msg1.MessageTitle = "0-1";
 9             MyMessage msg2 = new MyMessage();
10             msg2.MessageID = "0-2";
11             msg2.MessageBody = DateTime.Now.ToString();
12             msg2.MessageRouter = "chending.direct";
13             msg2.MessageTitle = "0-2";
14             MyMessage msg3 = new MyMessage();
15             msg3.MessageID = "0-3";
16             msg3.MessageBody = DateTime.Now.ToString();
17             msg3.MessageRouter = "chending.topic.a.b";
18             msg3.MessageTitle = "0-3";
19 
20             //MQHelper.Send(msg1);
21             MQHelper.ProducerFanoutMessage(msg1);
22             MQHelper.ProducerDirectMessage(msg2);
23             MQHelper.ProducerTopicMessage(msg3);
24 
25             for (int i = 0; i < 10; i++) {
26                 MyMessage msg = new MyMessage();
27                 msg.MessageID = (i+1).ToString();
28                 msg.MessageBody = DateTime.Now.ToString();
29                 if (i % 2 == 0)
30                     msg.MessageRouter = "cd.test.demo.a.b";
31                 else
32                     msg.MessageRouter = "cd.test.demo.a";
33                 msg.MessageTitle = (i+1).ToString();
34                 
35                 MQHelper.Publish(msg);
36                 //Console.WriteLine("Message{0} is published!!!", i + 1);
37                 Thread.Sleep(200);
38             }
39         }
View Code

 

2. Weiz.Producer(生成者)已棄用(改用ProduceThread.cs)

3. Weiz.Consumer 就是Consumer(消費者)

  1、修改OrderProcessMessage.cs,實現不同的消息處理方式。

 1  public class OrderProcessMessage : MQ.IProcessMessage
 2     {
 3         public void ProcessMsg(MQ.MyMessage msg)
 4         {
 5             Console.WriteLine("ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
 6         }
 7     }
 8     public class OrderProcessMessage1:MQ.IProcessMessage
 9     {
10         public void ProcessMsg(MQ.MyMessage msg)
11         {
12             Console.WriteLine("Process1 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
13         }
14     }
15 
16     public class OrderProcessMessage2 : MQ.IProcessMessage
17     {
18         public void ProcessMsg(MQ.MyMessage msg)
19         {
20             Console.WriteLine("Process2 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
21         }
22     }
View Code

 

  2、對Program.cs中的Main調用進行了修改。

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //OrderProcessMessage order = new OrderProcessMessage();
 6             OrderProcessMessage1 order1 = new OrderProcessMessage1();
 7             OrderProcessMessage2 order2 = new OrderProcessMessage2();
 8 
 9             //MyMessage msg = new MyMessage();
10             MyMessage msg1 = new MyMessage();
11             MyMessage msg2 = new MyMessage();
12             MyMessage msg3 = new MyMessage();
13 
14             //msg.MessageRouter = "cd.test.demo";
15             msg1.MessageRouter = "cd.test.demo.*";
16             msg2.MessageRouter = "cd.test.demo.#";
17             msg3.MessageRouter = "chending.topic.#";
18 
19             //MQHelper.Receive(msg, order);
20             MQHelper.ConsumeFanoutMessage();
21             MQHelper.ConsumeDirectMessage();
22             MQHelper.ConsumeTopicMessage(msg3);
23             MQHelper.Subscribe(msg1, order1);
24             //MQHelper.Subscribe(msg1, order2);
25             MQHelper.Subscribe(msg2, order2);
26 
27             Console.WriteLine("Listening for messages.");
28 
29             ProduceThread.ProduceMessage();
30 
31             //ThreadStart threadStart = ProduceThread.ProduceMessage;
32             //Thread thread = new Thread(threadStart);
33             //thread.Start();
34         }
35     }
View Code

 

二、項目運行

啟動 Weiz.Consumer (消費者),啟動消費者,會自動在RabbitMQ 服務器上創建相關的exchange 和 queue ,同時調用的ProduceThread.ProduceMessage函數會發送消息,接收到的信息會在Console命令行中進行顯示。

 

項目源碼:百度雲鏈接:https://pan.baidu.com/s/1sCJqY2fKphXV0ntMIytcVw 密碼:hfz5


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM