1、RabbitMQListener,自定義消息監聽器
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; namespace MQ_Receive { /// <summary> /// RabbitMq消息監聽器 /// </summary> public class RabbitMqListener { private ConnectionFactory _factory; private IConnection _con; private IModel _channel; private EventingBasicConsumer _consumer; private readonly string _rabbitMqUri; private readonly string _exchangeType; private readonly string _exchangeName; private readonly string _queueName; private readonly string _routeKey; private Func<string, bool> _messageHandler; /// <summary> /// RabbitMQ消息監聽器,若指定的隊列不存在,則自動創建隊列。並在消息交換機上綁定指定的消息路由規則(路由key) /// </summary> /// <param name="rabbitMqUri">連接串,如 amqp://guest:guest@localhost:5672/</param> /// <param name="exchangeName">消息交換機</param> /// <param name="exchangeType">交換機類型,如 ExchangeType.Direct</param> /// <param name="queueName">要監聽的隊列</param> /// <param name="routeKey">消息路由key</param> public RabbitMqListener(string rabbitMqUri, string exchangeName, string exchangeType, string queueName, string routeKey = "") { this._rabbitMqUri = rabbitMqUri; this._exchangeName = exchangeName; this._exchangeType = exchangeType; this._queueName = queueName; this._routeKey = routeKey; } /// <summary> /// 創建連接 /// </summary> private void CreateConnection() { _factory = new ConnectionFactory { Uri = new Uri(_rabbitMqUri), RequestedHeartbeat = 20, AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(10) }; _con = _factory.CreateConnection(); _con.ConnectionShutdown += (_sender, _e) => ReMessageListen();//掉線重新連接並監聽隊列消息 } /// <summary> /// 創建信道 /// </summary> private void CreateChannel() { _channel = _con.CreateModel(); _channel.ExchangeDeclare(_exchangeName, _exchangeType, true, false, null); _channel.QueueDeclare(_queueName, true, false, false, null); //創建一個消息隊列,用來存儲消息 _channel.QueueBind(_queueName, _exchangeName, _routeKey, null); _channel.BasicQos(0, 3, true); //在非自動確認消息的前提下,如果一定數目的消息(通過基於consume或者channel設置Qos的值)未被確認前,不進行消費新的消息 } /// <summary> /// 監聽隊列消息 /// </summary> /// <param name="messageHandler">消息處理器,當監測到隊列消息時回調該處理器</param> /// <returns>監聽狀態</returns> public bool MessageListen(Func<string, bool> messageHandler) { try { this.CreateConnection(); this.CreateChannel(); _consumer = new EventingBasicConsumer(_channel); //基於事件的消息推送方式 _consumer.Received += (_sender, _e) => { string msg = Encoding.UTF8.GetString(_e.Body); if (messageHandler != null) { this._messageHandler = messageHandler; try { var isOk = this._messageHandler(msg); if (isOk) { _channel.BasicAck(_e.DeliveryTag, false); } } catch (Exception ex) { LoggerManager.ErrorLog.Error("消息處理器執行異常:" + ex.Message, ex); } } }; _channel.BasicConsume(_queueName, false, _consumer); //手動確認 return true; } catch (Exception ex) { LoggerManager.ErrorLog.Error("嘗試監聽隊列消息出現錯誤:" + ex.Message, ex); } return false; } public void ReMessageListen() { try { //清除連接及頻道 CleanupResource(); var mres = new ManualResetEventSlim(false); //初始化狀態為false while (!mres.Wait(3000)) //每3秒監測一次狀態,直到狀態為true { if (MessageListen(_messageHandler)) { mres.Set(); //設置狀態為true並跳出循環 } } } catch (Exception ex) { LoggerManager.ErrorLog.Error("嘗試連接RabbitMQ服務器出現錯誤:" + ex.Message, ex); } } /// <summary> /// 清理資源 /// </summary> private void CleanupResource() { if (_channel != null && _channel.IsOpen) { try { _channel.Close(); } catch (Exception ex) { LoggerManager.ErrorLog.Error("嘗試關閉RabbitMQ信道遇到錯誤", ex); } _channel = null; } if (_con != null && _con.IsOpen) { try { _con.Close(); } catch (Exception ex) { LoggerManager.ErrorLog.Error("嘗試關閉RabbitMQ連接遇到錯誤", ex); } _con = null; } } } }
2、調用代碼
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; namespace MQ_Receive { public partial class Form1 : Form { private delegate void ChangeText(string text); private readonly ChangeText _changeText; private static string rabbitHostUri = "amqp://guest:guest@localhost:5672/"; private static string exchangeName = "order-exchange"; private static string queueName = "order-message-test-queue"; private static string routeKey = "order-message-routeKey"; private static readonly object lockObj = new object(); private static RabbitMQListener _listener; public static RabbitMQListener RabbitMQListener { get { if (_listener == null) { lock (lockObj) { if (_listener == null) { _listener = new RabbitMQListener(rabbitHostUri, exchangeName, ExchangeType.Direct, queueName, routeKey); } } } return _listener; } } private Func<string, bool> MessageHandler { get { return (msg) => { this.label1.Invoke(_changeText, new object[1] { msg }); return true; }; } } public Form1() { InitializeComponent(); this.label1.Text = ""; this._changeText = SetText; } private void Form1_Load(object sender, EventArgs e) { RabbitMQListener.MessageListen(MessageHandler); } private void SetText(string text) { this.label1.Text += text + "\n"; } } }