C# 封裝RabbitMQ消息隊列處理


現在使用.net領域使用RabbitMQ有很多解決方案,我自己使用過的有兩個,一個是EasyNetQ,一個是CAP,都挺好用的,尤其是CAP,懶人推薦使用,怎么使用的文章也很多,小伙伴可以自行搜索。

最近我自己嘗試根據目前手頭項目的需要,自行封裝一下基於RabbitMQ的使用,下面開搞,貼上我自己寫的代碼。

首先定義消息發布者/生產者接口:

 1 using System.Threading.Tasks;
 2 
 3 namespace fx.MQ
 4 {
 5     public interface IPublisher
 6     {
 7         /// <summary>
 8         /// 釋放資源。
 9         /// </summary>
10         void Dispose();
11         /// <summary>
12         /// 
13         /// </summary>
14         /// <typeparam name="T"></typeparam>
15         /// <param name="message"></param>
16         void Publish<T>(T message) where T : class;
17         /// <summary>
18         /// 
19         /// </summary>
20         /// <param name="message"></param>
21         /// <param name="channelName"></param>
22         void Publish(string message, string channelName);
23         /// <summary>
24         /// 
25         /// </summary>
26         /// <typeparam name="T"></typeparam>
27         /// <param name="message"></param>
28         /// <returns></returns>
29         Task PublishAsync<T>(T message) where T : class;
30     }
31 }

定義訂閱者/消費者接口:

 1 using System;
 2 using System.Threading.Tasks;
 3 
 4 namespace fx.MQ
 5 {
 6     public interface ISubscriber
 7     {
 8         /// <summary>
 9         /// 
10         /// </summary>
11         void Dispose();
12         /// <summary>
13         /// 
14         /// </summary>
15         /// <typeparam name="T"></typeparam>
16         /// <param name="channelName"></param>
17         /// <returns></returns>
18         void Subscribe(string channelName, Action<string> callback);
19         /// <summary>
20         /// 
21         /// </summary>
22         /// <typeparam name="T"></typeparam>
23         /// <param name="channelName"></param>
24         /// <returns></returns>
25         Task<T> SubscribeAsync<T>(string channelName) where T : class;
26     }
27 }

定義RabbmitMQProvider

 1 using RabbitMQ.Client;
 2 using System;
 3 using System.Collections.Generic;
 4 using System.Text;
 5 
 6 namespace fx.MQ
 7 {
 8     public class RabbitMQProvider
 9     {
10         private readonly string _ipAddress;
11         private readonly int? _port;
12         private readonly string _username;
13         private readonly string _password;
14 
15         public RabbitMQProvider(string ipAddress, int? port, string username, string password)
16         {
17             _ipAddress = ipAddress ?? throw new ArgumentException("IP地址不能為空!");
18             _port = port ?? throw new ArgumentException("端口不能為空");
19             _username = username ?? throw new ArgumentException("用戶名不能為空");
20             _password = password ?? throw new ArgumentException("密碼不能為空");
21 
22             ConnectionFactory = new ConnectionFactory//創建連接工廠對象
23             {
24                 HostName = _ipAddress,//IP地址
25                 Port = (int)_port,//端口號
26                 UserName = _username,//用戶賬號
27                 Password = _password//用戶密碼
28             };
29         }
30 
31         public IConnectionFactory ConnectionFactory { get; }
32 
33     }
34 }

實現生產者:

  1 using Newtonsoft.Json;
  2 using RabbitMQ.Client;
  3 using System;
  4 using System.Text;
  5 using System.Threading.Tasks;
  6 
  7 namespace fx.MQ
  8 {
  9     /// <summary>
 10     /// 消息發布者。
 11     /// </summary>
 12     public class RabbitMQPublisher : IPublisher
 13     {
 14 
 15         private readonly RabbitMQProvider _provider;
 16         private IConnection _connection;
 17         public RabbitMQPublisher(RabbitMQProvider provider)
 18         {
 19             _provider = provider;
 20             _connection = _provider.ConnectionFactory.CreateConnection();
 21         }
 22 
 23         public IConnection Connection
 24         {
 25             get
 26             {
 27                 if (_connection != null)
 28                     return _connection;
 29                 return _connection = _provider.ConnectionFactory.CreateConnection();
 30             }
 31         }
 32 
 33         private IModel _channel;
 34         public IModel Channel
 35         {
 36             get
 37             {
 38                 if (_channel != null)
 39                     return _channel;
 40                 else
 41                     return _channel = _connection.CreateModel();
 42             }
 43         }
 44 
 45         public void Dispose()
 46         {
 47             if (Channel != null)
 48             {
 49                 if (Channel.IsOpen)
 50                     Channel.Close();
 51                 Channel.Abort();
 52                 Channel.Dispose();
 53             }
 54 
 55             if (Connection != null)
 56             {
 57                 if (Connection.IsOpen)
 58                     Connection.Close();
 59             }
 60         }
 61 
 62         public void Publish<T>(T message) where T : class
 63         {
 64             var channelName = typeof(T).Name;
 65             Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
 66 
 67             var msgContent = JsonConvert.SerializeObject(message);
 68             var msgByte = Encoding.UTF8.GetBytes(msgContent);
 69             Channel.BasicPublish
 70             (
 71                 exchange: channelName,
 72                 routingKey: string.Empty,
 73                 mandatory: false,
 74                 basicProperties: null,
 75                 body: msgByte
 76             );
 77         }
 78 
 79 
 80         public void Publish(string message, string channelName)
 81         {
 82             Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
 83 
 84             var msgByte = Encoding.UTF8.GetBytes(message);
 85             Channel.BasicPublish
 86             (
 87                 exchange: channelName,
 88                 routingKey: string.Empty,
 89                 mandatory: false,
 90                 basicProperties: null,
 91                 body: msgByte
 92             );
 93         }
 94 
 95         public Task PublishAsync<T>(T message) where T : class
 96         {
 97             throw new NotImplementedException();
 98         }
 99     }
100 }

實現消費者:

  1 using RabbitMQ.Client;
  2 using RabbitMQ.Client.Events;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.Text;
  6 using System.Threading.Tasks;
  7 
  8 namespace fx.MQ
  9 {
 10     /// <summary>
 11     /// 消息訂閱者/消費者。
 12     /// </summary>
 13     public class RabbitMQSubscriber : ISubscriber
 14     {
 15         private readonly RabbitMQProvider _provider;
 16         private IConnection _connection;
 17         public RabbitMQSubscriber(RabbitMQProvider provider)
 18         {
 19             _provider = provider;
 20             _connection = _provider.ConnectionFactory.CreateConnection();
 21         }
 22 
 23         public IConnection Connection
 24         {
 25             get
 26             {
 27                 if (_connection != null)
 28                     return _connection;
 29                 return _connection = _provider.ConnectionFactory.CreateConnection();
 30             }
 31         }
 32 
 33         private IModel _channel;
 34         public IModel Channel
 35         {
 36             get
 37             {
 38                 if (_channel != null)
 39                     return _channel;
 40                 else
 41                     return _channel = _connection.CreateModel();
 42             }
 43         }
 44 
 45 
 46         public void Dispose()
 47         {
 48             if (_channel != null)
 49             {
 50                 _channel.Abort();
 51                 if (_channel.IsOpen)
 52                     _channel.Close();
 53                 
 54                 _channel.Dispose();
 55             }
 56 
 57             if (_connection != null)
 58             {
 59                 if (_connection.IsOpen)
 60                     _connection.Close();
 61 
 62                 _connection.Dispose();
 63             }
 64         }
 65 
 66         /// <summary>
 67         /// 消費消息,並執行回調。
 68         /// </summary>
 69         /// <param name="channelName"></param>
 70         /// <param name="callback"></param>
 71         public void Subscribe(string channelName, Action<string> callback)
 72         {
 73             //聲明交換機
 74             Channel.ExchangeDeclare(exchange: channelName, type: "fanout");
 75             //消息隊列名稱
 76             var queueName = channelName + "_" + Guid.NewGuid().ToString().Replace("-", "");
 77             //聲明隊列
 78             Channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
 79             //將隊列與交換機進行綁定
 80             Channel.QueueBind(queue: queueName, exchange: channelName, routingKey: "");
 81             //聲明為手動確認,每次只消費1條消息。
 82             Channel.BasicQos(0, 1, false);
 83             //定義消費者
 84             var consumer = new EventingBasicConsumer(Channel);
 85             //接收事件
 86             consumer.Received += (eventSender, args) =>
 87             {
 88                 var message = args.Body;//接收到的消息
 89 
 90                 callback(Encoding.UTF8.GetString(message));
 91                 //返回消息確認
 92                 Channel.BasicAck(args.DeliveryTag, true);
 93             };
 94             //開啟監聽
 95             Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
 96 
 97         }
 98 
 99         public Task<T> SubscribeAsync<T>(string channelName) where T : class
100         {
101             throw new NotImplementedException();
102         }
103     }
104 }

到這里為止,簡單的實現消息隊列的接受,發送,已經滿足我自己當前項目的需要了。這里我用的exchange進行消息隊列的生產消費,並且用fanout模式,就是一個生產者對應多個消費者,有點類似於消息廣播,另外還有兩種模式,可以根據需要修改。

下面是測試代碼:

 1 using System;
 2 using System.Windows.Forms;
 3 
 4 namespace fx.MQ.TestForm
 5 {
 6     public partial class Form1 : Form
 7     {
 8         private readonly RabbitMQProvider _provider;
 9         private readonly RabbitMQPublisher _publisher;
10         private readonly RabbitMQSubscriber _subscriber;
11         delegate void Callback(string msg);
12 
13         public Form1()
14         {
15             _provider = new RabbitMQProvider("192.168.101.199", 5672, "admin", "admin");
16             _publisher = new RabbitMQPublisher(_provider);
17             _subscriber = new RabbitMQSubscriber(_provider);
18             //callback = new Callback(ShowMessage);
19             InitializeComponent();
20         }
21 
22         private void button1_Click(object sender, EventArgs e)
23         {
24             _publisher.Publish(textBox1.Text, "public");
25         }
26 
27         private void Form1_Load(object sender, EventArgs e)
28         {
29             
30             _subscriber.Subscribe("public", c=> {
31                 ShowMessage(c);
32             });
33         }
34 
35 
36         private void ShowMessage(string msg)
37         {
38             if (this.richTextBox1.InvokeRequired)
39             {
40                 var cb = new Callback(ShowMessage);
41                 this.Invoke(cb, new object[] { msg });
42             }
43             else
44             {
45                 this.richTextBox1.Text = msg;
46             }
47         }
48     }
49 }

運行效果如圖所示:

 

 

OK,沒有問題。

另外注意,退出程序時消息發布者和訂閱者都需要Dispose()來釋放連接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM