現在使用.net領域使用RabbitMQ有很多解決方案,我自己使用過的有兩個,一個是EasyNetQ,一個是CAP,都挺好用的,尤其是CAP,懶人推薦使用,怎么使用的文章也很多,小伙伴可以自行搜索。
最近我自己嘗試根據目前手頭項目的需要,自行封裝一下基於RabbitMQ的使用,下面開搞,貼上我自己寫的代碼。
首先定義消息發布者/生產者接口:
1 using System.Threading.Tasks; 2 3 namespace fx.MQ 4 { 5 public interface IPublisher 6 { 7 /// <summary> 8 /// 釋放資源。 9 /// </summary> 10 void Dispose(); 11 /// <summary> 12 /// 13 /// </summary> 14 /// <typeparam name="T"></typeparam> 15 /// <param name="message"></param> 16 void Publish<T>(T message) where T : class; 17 /// <summary> 18 /// 19 /// </summary> 20 /// <param name="message"></param> 21 /// <param name="channelName"></param> 22 void Publish(string message, string channelName); 23 /// <summary> 24 /// 25 /// </summary> 26 /// <typeparam name="T"></typeparam> 27 /// <param name="message"></param> 28 /// <returns></returns> 29 Task PublishAsync<T>(T message) where T : class; 30 } 31 }
定義訂閱者/消費者接口:
1 using System; 2 using System.Threading.Tasks; 3 4 namespace fx.MQ 5 { 6 public interface ISubscriber 7 { 8 /// <summary> 9 /// 10 /// </summary> 11 void Dispose(); 12 /// <summary> 13 /// 14 /// </summary> 15 /// <typeparam name="T"></typeparam> 16 /// <param name="channelName"></param> 17 /// <returns></returns> 18 void Subscribe(string channelName, Action<string> callback); 19 /// <summary> 20 /// 21 /// </summary> 22 /// <typeparam name="T"></typeparam> 23 /// <param name="channelName"></param> 24 /// <returns></returns> 25 Task<T> SubscribeAsync<T>(string channelName) where T : class; 26 } 27 }
定義RabbmitMQProvider
1 using RabbitMQ.Client; 2 using System; 3 using System.Collections.Generic; 4 using System.Text; 5 6 namespace fx.MQ 7 { 8 public class RabbitMQProvider 9 { 10 private readonly string _ipAddress; 11 private readonly int? _port; 12 private readonly string _username; 13 private readonly string _password; 14 15 public RabbitMQProvider(string ipAddress, int? port, string username, string password) 16 { 17 _ipAddress = ipAddress ?? throw new ArgumentException("IP地址不能為空!"); 18 _port = port ?? throw new ArgumentException("端口不能為空"); 19 _username = username ?? throw new ArgumentException("用戶名不能為空"); 20 _password = password ?? throw new ArgumentException("密碼不能為空"); 21 22 ConnectionFactory = new ConnectionFactory//創建連接工廠對象 23 { 24 HostName = _ipAddress,//IP地址 25 Port = (int)_port,//端口號 26 UserName = _username,//用戶賬號 27 Password = _password//用戶密碼 28 }; 29 } 30 31 public IConnectionFactory ConnectionFactory { get; } 32 33 } 34 }
實現生產者:
1 using Newtonsoft.Json; 2 using RabbitMQ.Client; 3 using System; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace fx.MQ 8 { 9 /// <summary> 10 /// 消息發布者。 11 /// </summary> 12 public class RabbitMQPublisher : IPublisher 13 { 14 15 private readonly RabbitMQProvider _provider; 16 private IConnection _connection; 17 public RabbitMQPublisher(RabbitMQProvider provider) 18 { 19 _provider = provider; 20 _connection = _provider.ConnectionFactory.CreateConnection(); 21 } 22 23 public IConnection Connection 24 { 25 get 26 { 27 if (_connection != null) 28 return _connection; 29 return _connection = _provider.ConnectionFactory.CreateConnection(); 30 } 31 } 32 33 private IModel _channel; 34 public IModel Channel 35 { 36 get 37 { 38 if (_channel != null) 39 return _channel; 40 else 41 return _channel = _connection.CreateModel(); 42 } 43 } 44 45 public void Dispose() 46 { 47 if (Channel != null) 48 { 49 if (Channel.IsOpen) 50 Channel.Close(); 51 Channel.Abort(); 52 Channel.Dispose(); 53 } 54 55 if (Connection != null) 56 { 57 if (Connection.IsOpen) 58 Connection.Close(); 59 } 60 } 61 62 public void Publish<T>(T message) where T : class 63 { 64 var channelName = typeof(T).Name; 65 Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null); 66 67 var msgContent = JsonConvert.SerializeObject(message); 68 var msgByte = Encoding.UTF8.GetBytes(msgContent); 69 Channel.BasicPublish 70 ( 71 exchange: channelName, 72 routingKey: string.Empty, 73 mandatory: false, 74 basicProperties: null, 75 body: msgByte 76 ); 77 } 78 79 80 public void Publish(string message, string channelName) 81 { 82 Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null); 83 84 var msgByte = Encoding.UTF8.GetBytes(message); 85 Channel.BasicPublish 86 ( 87 exchange: channelName, 88 routingKey: string.Empty, 89 mandatory: false, 90 basicProperties: null, 91 body: msgByte 92 ); 93 } 94 95 public Task PublishAsync<T>(T message) where T : class 96 { 97 throw new NotImplementedException(); 98 } 99 } 100 }
實現消費者:
1 using RabbitMQ.Client; 2 using RabbitMQ.Client.Events; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 using System.Threading.Tasks; 7 8 namespace fx.MQ 9 { 10 /// <summary> 11 /// 消息訂閱者/消費者。 12 /// </summary> 13 public class RabbitMQSubscriber : ISubscriber 14 { 15 private readonly RabbitMQProvider _provider; 16 private IConnection _connection; 17 public RabbitMQSubscriber(RabbitMQProvider provider) 18 { 19 _provider = provider; 20 _connection = _provider.ConnectionFactory.CreateConnection(); 21 } 22 23 public IConnection Connection 24 { 25 get 26 { 27 if (_connection != null) 28 return _connection; 29 return _connection = _provider.ConnectionFactory.CreateConnection(); 30 } 31 } 32 33 private IModel _channel; 34 public IModel Channel 35 { 36 get 37 { 38 if (_channel != null) 39 return _channel; 40 else 41 return _channel = _connection.CreateModel(); 42 } 43 } 44 45 46 public void Dispose() 47 { 48 if (_channel != null) 49 { 50 _channel.Abort(); 51 if (_channel.IsOpen) 52 _channel.Close(); 53 54 _channel.Dispose(); 55 } 56 57 if (_connection != null) 58 { 59 if (_connection.IsOpen) 60 _connection.Close(); 61 62 _connection.Dispose(); 63 } 64 } 65 66 /// <summary> 67 /// 消費消息,並執行回調。 68 /// </summary> 69 /// <param name="channelName"></param> 70 /// <param name="callback"></param> 71 public void Subscribe(string channelName, Action<string> callback) 72 { 73 //聲明交換機 74 Channel.ExchangeDeclare(exchange: channelName, type: "fanout"); 75 //消息隊列名稱 76 var queueName = channelName + "_" + Guid.NewGuid().ToString().Replace("-", ""); 77 //聲明隊列 78 Channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 79 //將隊列與交換機進行綁定 80 Channel.QueueBind(queue: queueName, exchange: channelName, routingKey: ""); 81 //聲明為手動確認,每次只消費1條消息。 82 Channel.BasicQos(0, 1, false); 83 //定義消費者 84 var consumer = new EventingBasicConsumer(Channel); 85 //接收事件 86 consumer.Received += (eventSender, args) => 87 { 88 var message = args.Body;//接收到的消息 89 90 callback(Encoding.UTF8.GetString(message)); 91 //返回消息確認 92 Channel.BasicAck(args.DeliveryTag, true); 93 }; 94 //開啟監聽 95 Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); 96 97 } 98 99 public Task<T> SubscribeAsync<T>(string channelName) where T : class 100 { 101 throw new NotImplementedException(); 102 } 103 } 104 }
到這里為止,簡單的實現消息隊列的接受,發送,已經滿足我自己當前項目的需要了。這里我用的exchange進行消息隊列的生產消費,並且用fanout模式,就是一個生產者對應多個消費者,有點類似於消息廣播,另外還有兩種模式,可以根據需要修改。
下面是測試代碼:
1 using System; 2 using System.Windows.Forms; 3 4 namespace fx.MQ.TestForm 5 { 6 public partial class Form1 : Form 7 { 8 private readonly RabbitMQProvider _provider; 9 private readonly RabbitMQPublisher _publisher; 10 private readonly RabbitMQSubscriber _subscriber; 11 delegate void Callback(string msg); 12 13 public Form1() 14 { 15 _provider = new RabbitMQProvider("192.168.101.199", 5672, "admin", "admin"); 16 _publisher = new RabbitMQPublisher(_provider); 17 _subscriber = new RabbitMQSubscriber(_provider); 18 //callback = new Callback(ShowMessage); 19 InitializeComponent(); 20 } 21 22 private void button1_Click(object sender, EventArgs e) 23 { 24 _publisher.Publish(textBox1.Text, "public"); 25 } 26 27 private void Form1_Load(object sender, EventArgs e) 28 { 29 30 _subscriber.Subscribe("public", c=> { 31 ShowMessage(c); 32 }); 33 } 34 35 36 private void ShowMessage(string msg) 37 { 38 if (this.richTextBox1.InvokeRequired) 39 { 40 var cb = new Callback(ShowMessage); 41 this.Invoke(cb, new object[] { msg }); 42 } 43 else 44 { 45 this.richTextBox1.Text = msg; 46 } 47 } 48 } 49 }
運行效果如圖所示:
OK,沒有問題。
另外注意,退出程序時消息發布者和訂閱者都需要Dispose()來釋放連接。