RabbitMQ心跳檢測與掉線重連


1、RabbitMQListener,自定義消息監聽器

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;

namespace MQ_Receive
{
    /// <summary>
    /// RabbitMq消息監聽器
    /// </summary>
    public class RabbitMqListener
    {
        private ConnectionFactory _factory;
        private IConnection _con;
        private IModel _channel;
        private EventingBasicConsumer _consumer;

        private readonly string _rabbitMqUri;
        private readonly string _exchangeType;
        private readonly string _exchangeName;
        private readonly string _queueName;
        private readonly string _routeKey;
        private Func<string, bool> _messageHandler;

        /// <summary>
        /// RabbitMQ消息監聽器,若指定的隊列不存在,則自動創建隊列。並在消息交換機上綁定指定的消息路由規則(路由key)
        /// </summary>
        /// <param name="rabbitMqUri">連接串,如 amqp://guest:guest@localhost:5672/</param>
        /// <param name="exchangeName">消息交換機</param>
        /// <param name="exchangeType">交換機類型,如 ExchangeType.Direct</param>
        /// <param name="queueName">要監聽的隊列</param>
        /// <param name="routeKey">消息路由key</param>
        public RabbitMqListener(string rabbitMqUri, string exchangeName, string exchangeType, string queueName, string routeKey = "")
        {
            this._rabbitMqUri = rabbitMqUri;
            this._exchangeName = exchangeName;
            this._exchangeType = exchangeType;
            this._queueName = queueName;
            this._routeKey = routeKey;
        }

        /// <summary>
        /// 創建連接
        /// </summary>
        private void CreateConnection()
        {
            _factory = new ConnectionFactory
            {
                Uri = new Uri(_rabbitMqUri),
                RequestedHeartbeat = 20,
                AutomaticRecoveryEnabled = true,
                TopologyRecoveryEnabled = true,
                NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
            };

            _con = _factory.CreateConnection();
            _con.ConnectionShutdown += (_sender, _e) => ReMessageListen();//掉線重新連接並監聽隊列消息
        }

        /// <summary>
        /// 創建信道
        /// </summary>
        private void CreateChannel()
        {
            _channel = _con.CreateModel();
            _channel.ExchangeDeclare(_exchangeName, _exchangeType, true, false, null);
            _channel.QueueDeclare(_queueName, true, false, false, null); //創建一個消息隊列,用來存儲消息
            _channel.QueueBind(_queueName, _exchangeName, _routeKey, null);
            _channel.BasicQos(0, 3, true); //在非自動確認消息的前提下,如果一定數目的消息(通過基於consume或者channel設置Qos的值)未被確認前,不進行消費新的消息
        }

        /// <summary>
        /// 監聽隊列消息
        /// </summary>
        /// <param name="messageHandler">消息處理器,當監測到隊列消息時回調該處理器</param>
        /// <returns>監聽狀態</returns>
        public bool MessageListen(Func<string, bool> messageHandler)
        {
            try
            {
                this.CreateConnection();
                this.CreateChannel();

                _consumer = new EventingBasicConsumer(_channel); //基於事件的消息推送方式
                _consumer.Received += (_sender, _e) =>
                {
                    string msg = Encoding.UTF8.GetString(_e.Body);
                    if (messageHandler != null)
                    {
                        this._messageHandler = messageHandler;
                        try
                        {
                            var isOk = this._messageHandler(msg);
                            if (isOk)
                            {
                                _channel.BasicAck(_e.DeliveryTag, false);
                            }
                        }
                        catch (Exception ex)
                        {
                            LoggerManager.ErrorLog.Error("消息處理器執行異常:" + ex.Message, ex);
                        } 
                    }
                };

                _channel.BasicConsume(_queueName, false, _consumer); //手動確認
                return true;
            }
            catch (Exception ex)
            {
                LoggerManager.ErrorLog.Error("嘗試監聽隊列消息出現錯誤:" + ex.Message, ex);
            }
            return false;
        }

        public void ReMessageListen()
        {
            try
            {
                //清除連接及頻道
                CleanupResource();

                var mres = new ManualResetEventSlim(false); //初始化狀態為false
                while (!mres.Wait(3000)) //每3秒監測一次狀態,直到狀態為true
                {
                    if (MessageListen(_messageHandler))
                    {
                        mres.Set(); //設置狀態為true並跳出循環
                    }
                }
            }
            catch (Exception ex)
            {
                LoggerManager.ErrorLog.Error("嘗試連接RabbitMQ服務器出現錯誤:" + ex.Message, ex);
            }
        }

        /// <summary>
        /// 清理資源
        /// </summary>
        private void CleanupResource()
        {
            if (_channel != null && _channel.IsOpen)
            {
                try
                {
                    _channel.Close();
                }
                catch (Exception ex)
                {
                    LoggerManager.ErrorLog.Error("嘗試關閉RabbitMQ信道遇到錯誤", ex);
                }
                _channel = null;
            }

            if (_con != null && _con.IsOpen)
            {
                try
                {
                    _con.Close();
                }
                catch (Exception ex)
                {
                    LoggerManager.ErrorLog.Error("嘗試關閉RabbitMQ連接遇到錯誤", ex);
                }
                _con = null;
            }
        }
    }
}

2、調用代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace MQ_Receive
{
    public partial class Form1 : Form
    {
        private delegate void ChangeText(string text);
        private readonly ChangeText _changeText;

        private static string rabbitHostUri = "amqp://guest:guest@localhost:5672/";
        private static string exchangeName = "order-exchange";
        private static string queueName = "order-message-test-queue";
        private static string routeKey = "order-message-routeKey";
        private static readonly object lockObj = new object();

        private static RabbitMQListener _listener;
        public static RabbitMQListener RabbitMQListener
        {
            get
            {
                if (_listener == null)
                {
                    lock (lockObj)
                    {
                        if (_listener == null)
                        {
                            _listener = new RabbitMQListener(rabbitHostUri, exchangeName, ExchangeType.Direct, queueName, routeKey);
                        }
                    }
                }

                return _listener;
            }
        }
        private Func<string, bool> MessageHandler
        {
            get {
                     return (msg) =>
                     {
                         this.label1.Invoke(_changeText, new object[1] { msg });
                         return true;
                     };
                }
        }

        public Form1()
        {
            InitializeComponent();
            this.label1.Text = "";
            this._changeText = SetText;
        }

        private void Form1_Load(object sender, EventArgs e)
        {
            RabbitMQListener.MessageListen(MessageHandler);
        }

        private void SetText(string text)
        {
            this.label1.Text += text + "\n";
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM