Asp.NetCore輕松學-實現一個輕量級高可復用的RabbitMQ客戶端


Asp.NetCore輕松學-實現一個輕量級高可復用的RabbitMQ客戶端

 

前言

本示例通過對服務訂閱的封裝、隱藏細節實現、統一配置、自動重連、異常處理等各個方面來打造一個簡單易用的 RabbitMQ 工廠;本文適合適合有一定 RabbitMQ 使用經驗的讀者閱讀,如果你還沒有實際使用過 RabbitMQ,也沒有關系,因為本文的代碼都是基於直接運行的實例,通過簡單的修改 RabbitMQ 即可運行。

  • 解決方案如下

1. 創建基礎連接管理幫助類

首先,創建一個 .netcore 控制台項目,創建 Helper、Service、Utils 文件夾,分別用於存放通道管理、服務訂閱、公共組件。

1.1 接下來創建一個 MQConfig 類,用於存放 RabbitMQ 主機配置等信息
    public class MQConfig
    {
        /// <summary>
        /// 訪問消息隊列的用戶名
        /// </summary>
        public string UserName { get; set; }
        /// <summary>
        /// 訪問消息隊列的密碼
        /// </summary>
        public string Password { get; set; }
        /// <summary>
        /// 消息隊列的主機地址
        /// </summary>
        public string HostName { get; set; }
        /// <summary>
        /// 消息隊列的主機開放的端口
        /// </summary>
        public int Port { get; set; }
    }
1.2 創建 RabbitMQ 連接管理類,用於創建連接,關閉連接
1.3 創建一個消息體對象 MessageBody,用於解析和傳遞消息到業務系統中,在接下來的 MQChannel 類中會用到
  public class MessageBody
    {
        public EventingBasicConsumer Consumer { get; set; }
        public BasicDeliverEventArgs BasicDeliver { get; set; }
        /// <summary>
        /// 0成功
        /// </summary>
        public int Code { get; set; }
        public string Content { get; set; }
        public string ErrorMessage { get; set; }
        public bool Error { get; set; }
        public Exception Exception { get; set; }
    }
1.4 創建一個通道類,用於訂閱、發布消息,同時提供一個關閉通道連接的方法 Stop
 public class MQChannel
    {
        public string ExchangeTypeName { get; set; }
        public string ExchangeName { get; set; }
        public string QueueName { get; set; }
        public string RoutekeyName { get; set; }
        public IConnection Connection { get; set; }
        public EventingBasicConsumer Consumer { get; set; }
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  外部訂閱消費者通知委托</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-keyword">public</span> Action&lt;MessageBody&gt; OnReceivedCallback { <span class="hljs-keyword">get</span>; <span class="hljs-keyword">set</span>; }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">MQChannel</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> exchangeType, <span class="hljs-keyword">string</span> exchange, <span class="hljs-keyword">string</span> queue, <span class="hljs-keyword">string</span> routekey</span>)
    </span>{
        <span class="hljs-keyword">this</span>.ExchangeTypeName = exchangeType;
        <span class="hljs-keyword">this</span>.ExchangeName = exchange;
        <span class="hljs-keyword">this</span>.QueueName = queue;
        <span class="hljs-keyword">this</span>.RoutekeyName = routekey;
    }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  向當前隊列發送消息</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="content"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Publish</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> content</span>)
    </span>{
        <span class="hljs-keyword">byte</span>[] body = MQConnection.UTF8.GetBytes(content);
        IBasicProperties prop = <span class="hljs-keyword">new</span> BasicProperties();
        prop.DeliveryMode = <span class="hljs-number">1</span>;
        Consumer.Model.BasicPublish(<span class="hljs-keyword">this</span>.ExchangeName, <span class="hljs-keyword">this</span>.RoutekeyName, <span class="hljs-literal">false</span>, prop, body);
    }

    <span class="hljs-function"><span class="hljs-keyword">internal</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Receive</span>(<span class="hljs-params"><span class="hljs-keyword">object</span> sender, BasicDeliverEventArgs e</span>)
    </span>{
        MessageBody body = <span class="hljs-keyword">new</span> MessageBody();
        <span class="hljs-keyword">try</span>
        {
            <span class="hljs-keyword">string</span> content = MQConnection.UTF8.GetString(e.Body);
            body.Content = content;
            body.Consumer = (EventingBasicConsumer)sender;
            body.BasicDeliver = e;
        }
        <span class="hljs-keyword">catch</span> (Exception ex)
        {
            body.ErrorMessage = <span class="hljs-string">$"訂閱-出錯<span class="hljs-subst">{ex.Message}</span>"</span>;
            body.Exception = ex;
            body.Error = <span class="hljs-literal">true</span>;
            body.Code = <span class="hljs-number">500</span>;
        }
        OnReceivedCallback?.Invoke(body);
    }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  設置消息處理完成標志</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="consumer"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="deliveryTag"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="multiple"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">SetBasicAck</span>(<span class="hljs-params">EventingBasicConsumer consumer, <span class="hljs-keyword">ulong</span> deliveryTag, <span class="hljs-keyword">bool</span> multiple</span>)
    </span>{
        consumer.Model.BasicAck(deliveryTag, multiple);
    }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  關閉消息隊列的連接</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Stop</span>(<span class="hljs-params"></span>)
    </span>{
        <span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.Connection != <span class="hljs-literal">null</span> &amp;&amp; <span class="hljs-keyword">this</span>.Connection.IsOpen)
        {
            <span class="hljs-keyword">this</span>.Connection.Close();
            <span class="hljs-keyword">this</span>.Connection.Dispose();
        }
    }
}</code></pre>

1.5 在上面的 MQChannel 類中

首先是在構造函數內對當前通道的屬性進行設置,其次提供了 Publish 和 OnReceivedCallback 的委托,當通道接收到消息的時候,會進入方法 Receive 中,在 Receive 中,經過封裝成 MessageBody 對象,並調用委托 OnReceivedCallback ,將,解析好的消息傳遞到外邊訂閱者的業務中。最終在 MQChannel 中還提供了消息確認的操作方法 SetBasicAck,供業務系統手動調用。

1.6 接着再創建一個 RabbitMQ 通道管理類,用於創建通道,代碼非常簡單,只有一個公共方法 CreateReceiveChannel,傳入相關參數,創建一個 MQChannel 對象
    public class MQChannelManager
    {
        public MQConnection MQConn { get; set; }
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">MQChannelManager</span>(<span class="hljs-params">MQConnection conn</span>)
    </span>{
        <span class="hljs-keyword">this</span>.MQConn = conn;
    }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  創建消息通道</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="cfg"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> MQChannel <span class="hljs-title">CreateReceiveChannel</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> exchangeType, <span class="hljs-keyword">string</span> exchange, <span class="hljs-keyword">string</span> queue, <span class="hljs-keyword">string</span> routekey</span>)
    </span>{
        IModel model = <span class="hljs-keyword">this</span>.CreateModel(exchangeType, exchange, queue, routekey);
        model.BasicQos(<span class="hljs-number">0</span>, <span class="hljs-number">1</span>, <span class="hljs-literal">false</span>);
        EventingBasicConsumer consumer = <span class="hljs-keyword">this</span>.CreateConsumer(model, queue);
        MQChannel channel = <span class="hljs-keyword">new</span> MQChannel(exchangeType, exchange, queue, routekey)
        {
            Connection = <span class="hljs-keyword">this</span>.MQConn.Connection,
            Consumer = consumer
        };
        consumer.Received += channel.Receive;
        <span class="hljs-keyword">return</span> channel;
    }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  創建一個通道,包含交換機/隊列/路由,並建立綁定關系</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="type"&gt;</span>交換機類型<span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="exchange"&gt;</span>交換機名稱<span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="queue"&gt;</span>隊列名稱<span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="routeKey"&gt;</span>路由名稱<span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;returns&gt;</span><span class="hljs-doctag">&lt;/returns&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">private</span> IModel <span class="hljs-title">CreateModel</span>(<span class="hljs-params"><span class="hljs-keyword">string</span> type, <span class="hljs-keyword">string</span> exchange, <span class="hljs-keyword">string</span> queue, <span class="hljs-keyword">string</span> routeKey, IDictionary&lt;<span class="hljs-keyword">string</span>, <span class="hljs-keyword">object</span>&gt; arguments = <span class="hljs-literal">null</span></span>)
    </span>{
        type = <span class="hljs-keyword">string</span>.IsNullOrEmpty(type) ? <span class="hljs-string">"default"</span> : type;
        IModel model = <span class="hljs-keyword">this</span>.MQConn.Connection.CreateModel();
        model.BasicQos(<span class="hljs-number">0</span>, <span class="hljs-number">1</span>, <span class="hljs-literal">false</span>);
        model.QueueDeclare(queue, <span class="hljs-literal">true</span>, <span class="hljs-literal">false</span>, <span class="hljs-literal">false</span>, arguments);
        model.QueueBind(queue, exchange, routeKey);
        <span class="hljs-keyword">return</span> model;
    }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  接收消息到隊列中</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="model"&gt;</span>消息通道<span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="queue"&gt;</span>隊列名稱<span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="callback"&gt;</span>訂閱消息的回調事件<span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;returns&gt;</span><span class="hljs-doctag">&lt;/returns&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">private</span> EventingBasicConsumer <span class="hljs-title">CreateConsumer</span>(<span class="hljs-params">IModel model, <span class="hljs-keyword">string</span> queue</span>)
    </span>{
        EventingBasicConsumer consumer = <span class="hljs-keyword">new</span> EventingBasicConsumer(model);
        model.BasicConsume(queue, <span class="hljs-literal">false</span>, consumer);

        <span class="hljs-keyword">return</span> consumer;
    }
}</code></pre>

1.7 通道管理類的構造方法

 public MQChannelManager(MQConnection conn) {
            this.MQConn = conn;
        }

1.8 需要傳入一個 MQConnection 對象,僅是一個簡單的連接類,代碼如下

    public class MQConnection
    {
        private string vhost = string.Empty;
        private IConnection connection = null;
        private MQConfig config = null;

        /// <summary>
        /// 構造無 utf8 標記的編碼轉換器
        /// </summary>
        public static UTF8Encoding UTF8 { get; set; } = new UTF8Encoding(false);

        public MQConnection(MQConfig config, string vhost) {
            this.config = config;
            this.vhost = vhost;
        }

        public IConnection Connection
        {
            get
            {
                if (connection == null)
                {
                    ConnectionFactory factory = new ConnectionFactory
                    {
                        AutomaticRecoveryEnabled = true,
                        UserName = this.config.UserName,
                        Password = this.config.Password,
                        HostName = this.config.HostName,
                        VirtualHost = this.vhost,
                        Port = this.config.Port
                    };
                    connection = factory.CreateConnection();
                }

                return connection;
            }
        }
    }

1.9 在上面的代碼中,還初始化了一個靜態對象 UTF8Encoding ,使用無 utf8 標記的編碼轉換器來解析消息

2. 定義和實現服務契約

設想一下,有這樣的一個業務場景,通道管理和服務管理都是相同的操作,如果這些基礎操作都在一個地方定義,且有一個默認的實現,那么后來者就不需要去關注這些技術細節,直接繼承基礎類后,傳入相應的消息配置即可完成
消息訂閱和發布操作。

2.1 有了想法,接下來就先定義契約接口 IService,此接口包含創建通道、開啟/停止訂閱,一個服務可能承載多個通道,所以還需要包含通道列表
public interface IService
    {
        /// <summary>
        /// 創建通道
        /// </summary>
        /// <param name="queue">隊列名稱</param>
        /// <param name="routeKey">路由名稱</param>
        /// <param name="exchangeType">交換機類型</param>
        /// <returns></returns>
        MQChannel CreateChannel(string queue, string routeKey, string exchangeType);
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  開啟訂閱</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">Start</span>(<span class="hljs-params"></span>)</span>;

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  停止訂閱</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">Stop</span>(<span class="hljs-params"></span>)</span>;

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  通道列表</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    List&lt;MQChannel&gt; Channels { <span class="hljs-keyword">get</span>; <span class="hljs-keyword">set</span>; }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  消息隊列中定義的虛擬機</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-keyword">string</span> vHost { <span class="hljs-keyword">get</span>; }

    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  消息隊列中定義的交換機</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-keyword">string</span> Exchange { <span class="hljs-keyword">get</span>; }
}</code></pre>

2.2 接下來創建一個抽象類來實現該接口,將實現細節進行封裝,方便后面的業務服務繼承調用

  public abstract class MQServiceBase : IService
    {
        internal bool started = false;
        internal MQServiceBase(MQConfig config) {
            this.Config = config;
        }

        public MQChannel CreateChannel(string queue, string routeKey, string exchangeType) {
            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager cm = new MQChannelManager(conn);
            MQChannel channel = cm.CreateReceiveChannel(exchangeType, this.Exchange, queue, routeKey);
            return channel;
        }

        /// <summary>
        /// 啟動訂閱
        /// </summary>
        public void Start() {
            if (started)
            {
                return;
            }

            MQConnection conn = new MQConnection(this.Config, this.vHost);
            MQChannelManager manager = new MQChannelManager(conn);
            foreach (var item in this.Queues)
            {
                MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
                channel.OnReceivedCallback = item.OnReceived;
                this.Channels.Add(channel);
            }
            started = true;
        }

        /// <summary>
        /// 停止訂閱
        /// </summary>
        public void Stop() {
            foreach (var c in this.Channels)
            {
                c.Stop();
            }
            this.Channels.Clear();
            started = false;
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="message"></param>
        public abstract void OnReceived(MessageBody message);

        public List<MQChannel> Channels { get; set; } = new List<MQChannel>();

        /// <summary>
        /// 消息隊列配置
        /// </summary>
        public MQConfig Config { get; set; }

        /// <summary>
        /// 消息隊列中定義的虛擬機
        /// </summary>
        public abstract string vHost { get; }

        /// <summary>
        /// 消息隊列中定義的交換機
        /// </summary>
        public abstract string Exchange { get; }

        /// <summary>
        /// 定義的隊列列表
        /// </summary>
        public List<QueueInfo> Queues { get; } = new List<QueueInfo>();
    }

上面的抽象類,原封不動的實現接口契約,代碼非常簡單,在 Start 方法中,創建通道和啟動消息訂閱;同時,將通道加入屬性 Channels 中,方便后面的自檢服務使用;在 Start 方法中

 /// <summary>
        /// 啟動訂閱
        /// </summary>
        public void Start() {
            if (started)
            {
                return;
            }
        MQConnection conn = <span class="hljs-keyword">new</span> MQConnection(<span class="hljs-keyword">this</span>.Config, <span class="hljs-keyword">this</span>.vHost);
        MQChannelManager manager = <span class="hljs-keyword">new</span> MQChannelManager(conn);
        <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Queues)
        {
            MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, <span class="hljs-keyword">this</span>.Exchange, item.Queue, item.RouterKey);
            channel.OnReceivedCallback = item.OnReceived;
            <span class="hljs-keyword">this</span>.Channels.Add(channel);
        }
        started = <span class="hljs-literal">true</span>;
    }</code></pre>

使用 MQChannelManager 創建了一個通道,並將通道的回調委托 OnReceivedCallback 設置為 item.OnReceived 方法,該方法將有子類實現;在將當前訂閱服務通道創建完成后,標記服務狀態 started 為 true,防止重復啟動;同時,在該抽象類中,不實現契約的 OnReceived(MessageBody message);強制基礎業務服務類去自我實現,因為各種業務的特殊性,這塊對消息的處理不能再基礎服務中完成

接下來要介紹的是服務監控管理類,該類內部定義一個簡單的定時器功能,不間斷的對 RabbitMQ 的通訊進行偵聽,一旦發現有斷開的連接,就自動創建一個新的通道,並移除舊的通道;同時,提供 Start/Stop 兩個方法,以供程序 啟動/停止 的時候對

2.3 RabbitMQ 的連接和通道進行清理;代碼如下
public class MQServcieManager
    {
        public int Timer_tick { get; set; } = 10 * 1000;
        private Timer timer = null;
    <span class="hljs-keyword">public</span> Action&lt;MessageLevel, <span class="hljs-keyword">string</span>, Exception&gt; OnAction = <span class="hljs-literal">null</span>;
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">MQServcieManager</span>(<span class="hljs-params"></span>)
    </span>{
        timer = <span class="hljs-keyword">new</span> Timer(OnInterval, <span class="hljs-string">""</span>, Timer_tick, Timer_tick);
    }


    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span>  自檢,配合 RabbitMQ 內部自動重連機制</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="sender"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">void</span> <span class="hljs-title">OnInterval</span>(<span class="hljs-params"><span class="hljs-keyword">object</span> sender</span>)
    </span>{
        <span class="hljs-keyword">int</span> error = <span class="hljs-number">0</span>, reconnect = <span class="hljs-number">0</span>;
        OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{DateTime.Now}</span> 正在執行自檢"</span>, <span class="hljs-literal">null</span>);
        <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Services)
        {
            <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i &lt; item.Channels.Count; i++)
            {
                <span class="hljs-keyword">var</span> c = item.Channels[i];
                <span class="hljs-keyword">if</span> (c.Connection == <span class="hljs-literal">null</span> || !c.Connection.IsOpen)
                {
                    error++;
                    OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{c.ExchangeName}</span> <span class="hljs-subst">{c.QueueName}</span> <span class="hljs-subst">{c.RoutekeyName}</span> 重新創建訂閱"</span>, <span class="hljs-literal">null</span>);
                    <span class="hljs-keyword">try</span>
                    {
                        c.Stop();
                        <span class="hljs-keyword">var</span> channel = item.CreateChannel(c.QueueName, c.RoutekeyName, c.ExchangeTypeName);
                        item.Channels.Remove(c);
                        item.Channels.Add(channel);

                        OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{c.ExchangeName}</span> <span class="hljs-subst">{c.QueueName}</span> <span class="hljs-subst">{c.RoutekeyName}</span> 重新創建完成"</span>, <span class="hljs-literal">null</span>);
                        reconnect++;
                    }
                    <span class="hljs-keyword">catch</span> (Exception ex)
                    {
                        OnAction?.Invoke(MessageLevel.Information, ex.Message, ex);
                    }
                }
            }
        }
        OnAction?.Invoke(MessageLevel.Information, <span class="hljs-string">$"<span class="hljs-subst">{DateTime.Now}</span> 自檢完成,錯誤數:<span class="hljs-subst">{error}</span>,重連成功數:<span class="hljs-subst">{reconnect}</span>"</span>, <span class="hljs-literal">null</span>);
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Start</span>(<span class="hljs-params"></span>)
    </span>{
        <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Services)
        {
            <span class="hljs-keyword">try</span>
            {
                item.Start();
            }
            <span class="hljs-keyword">catch</span> (Exception e)
            {
                OnAction?.Invoke(MessageLevel.Error, <span class="hljs-string">$"啟動服務出錯 | <span class="hljs-subst">{e.Message}</span>"</span>, e);
            }
        }
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Stop</span>(<span class="hljs-params"></span>)
    </span>{
        <span class="hljs-keyword">try</span>
        {
            <span class="hljs-keyword">foreach</span> (<span class="hljs-keyword">var</span> item <span class="hljs-keyword">in</span> <span class="hljs-keyword">this</span>.Services)
            {
                item.Stop();
            }
            Services.Clear();
            timer.Dispose();
        }
        <span class="hljs-keyword">catch</span> (Exception e)
        {
            OnAction?.Invoke(MessageLevel.Error, <span class="hljs-string">$"停止服務出錯 | <span class="hljs-subst">{e.Message}</span>"</span>, e);
        }
    }

    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">AddService</span>(<span class="hljs-params">IService service</span>)
    </span>{
        Services.Add(service);
    }
    <span class="hljs-keyword">public</span> List&lt;IService&gt; Services { <span class="hljs-keyword">get</span>; <span class="hljs-keyword">set</span>; } = <span class="hljs-keyword">new</span> List&lt;IService&gt;();
}</code></pre>

代碼比較簡單,就不在一一介紹,為了將異常等內部信息傳遞到外邊,方便使用第三方組件進行日志記錄等需求,MQServcieManager 還使用了 MessageLevel 這個定義,方便業務根據不同的消息級別對消息進行處理

    public enum MessageLevel {
        Trace = 0,
        Debug = 1,
        Information = 2,
        Warning = 3,
        Error = 4,
        Critical = 5,
        None = 6
    }

3. 開始使用

終於來到了這一步,我們將要開始使用這個基礎服務;首先,創建一個 DemoService 繼承自 MQServiceBase ;同時,

3.1 實現 MQServiceBase 的抽象方法 OnReceived(MessageBody message)
 public class DemoService : MQServiceBase
    {
        public Action<MessageLevel, string, Exception> OnAction = null;
        public DemoService(MQConfig config) : base(config) {
            base.Queues.Add(new QueueInfo()
            {
                ExchangeType = ExchangeType.Direct,
                Queue = "login-message",
                RouterKey = "pk",
                OnReceived = this.OnReceived
            });
        }
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">override</span> <span class="hljs-keyword">string</span> vHost { <span class="hljs-keyword">get</span> { <span class="hljs-keyword">return</span> <span class="hljs-string">"gpush"</span>; } }
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">override</span> <span class="hljs-keyword">string</span> Exchange { <span class="hljs-keyword">get</span> { <span class="hljs-keyword">return</span> <span class="hljs-string">"user"</span>; } }


    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> 接收消息</span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;/summary&gt;</span></span>
    <span class="hljs-comment"><span class="hljs-doctag">///</span> <span class="hljs-doctag">&lt;param name="message"&gt;</span><span class="hljs-doctag">&lt;/param&gt;</span></span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">override</span> <span class="hljs-keyword">void</span> <span class="hljs-title">OnReceived</span>(<span class="hljs-params">MessageBody message</span>)
    </span>{
        <span class="hljs-keyword">try</span>
        {
            Console.WriteLine(message.Content);
        }
        <span class="hljs-keyword">catch</span> (Exception ex)
        {
            OnAction?.Invoke(MessageLevel.Error, ex.Message, ex);
        }
        message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, <span class="hljs-literal">true</span>);

    }
}</code></pre>

以上的代碼非常簡單,幾乎不需要業務開發者做更多的其它工作,開發者只需要在構造方法內部傳入一個 QueueInfo 對象,如果有多個,可一並傳入

    public partial class QueueInfo
    {
        /// <summary>
        /// 隊列名稱
        /// </summary>
        public string Queue { get; set; }
        /// <summary>
        /// 路由名稱
        /// </summary>
        public string RouterKey { get; set; }
        /// <summary>
        /// 交換機類型
        /// </summary>
        public string ExchangeType { get; set; }
        /// <summary>
        /// 接受消息委托
        /// </summary>
        public Action<MessageBody> OnReceived { get; set; }
        /// <summary>
        /// 輸出信息到客戶端
        /// </summary>
        public Action<MQChannel, MessageLevel, string> OnAction { get; set; }
    }

並設置 vHost 和 Exchange 的值,然后剩下的就是在 OnReceived(MessageBody message) 方法中專心的處理自己的業務了;在這里,我們僅輸出接收到的消息,並設置 ack 為已成功處理。

4. 測試代碼

4.1 在 Program,我們執行該測試
   class Program {
        static void Main(string[] args) {
            Test();
        }
    <span class="hljs-function"><span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">Test</span><span class="hljs-params">()</span>
    </span>{
        MQConfig config = <span class="hljs-keyword">new</span> MQConfig()
        {
            HostName = <span class="hljs-string">"127.0.0.1"</span>,
            Password = <span class="hljs-string">"123456"</span>,
            Port = <span class="hljs-number">5672</span>,
            UserName = <span class="hljs-string">"dotnet"</span>
        };

        MQServcieManager manager = <span class="hljs-keyword">new</span> MQServcieManager();
        manager.AddService(<span class="hljs-keyword">new</span> DemoService(config));
        manager.OnAction = OnActionOutput;
        manager.Start();

        Console.WriteLine(<span class="hljs-string">"服務已啟動"</span>);
        Console.ReadKey();

        manager.Stop();
        Console.WriteLine(<span class="hljs-string">"服務已停止,按任意鍵退出..."</span>);
        Console.ReadKey();
    }

    <span class="hljs-function"><span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">OnActionOutput</span><span class="hljs-params">(MessageLevel level, <span class="hljs-built_in">string</span> message, Exception ex)</span>
    </span>{
        Console.ForegroundColor = ConsoleColor.Yellow;
        Console.WriteLine(<span class="hljs-string">"{0} | {1} | {2}"</span>, level, message, ex?.StackTrace);
        Console.ForegroundColor = ConsoleColor.Gray;
    }
}</code></pre>

4.2 利用 MQServcieManager 對象,完成了對所有消息訂閱者的管理和監控,

4.3 首先我們到 RabbitMQ 的 web 控制台發布一條消息到隊列 login-message 中

4.3 然后查看輸出結果

消息已經接收並處理,為了查看監控效果,我還手動將網絡進行中斷,然后監控服務檢測到無法連接,嘗試重建通道,並將消息輸出

  • 圖中步驟說明
  • 0:服務啟動
  • 1:自檢啟動
  • 2:服務報錯,嘗試重建,重建失敗,繼續監測
  • 3:RabbitMQ 內部監控自動重連,監控程序檢測到已恢復,收到消息並處理
  • 4:后續監控服務繼續進行監控

結語

在文章中,我們建立了 RabbitMQ 的通道管理、基礎服務管理、契約實現等操作,讓業務開發人員通過簡單的繼承實現去快速的處理業務系統的邏輯,后續如果有增加消費者的情況下,只需要通過 MQServcieManager.AddService 進行簡單的調用操作即可,無需對底層技術細節進行過多的改動。

源碼下載:
https://github.com/lianggx/EasyAspNetCoreDemo/tree/master/Ron.MQTest

微信公眾號:DotNet程序園
歡迎關注收取閱讀最新文章
  • 您隨手點贊是我不斷書寫的動力,如有錯誤,歡迎指正
  • 出處:http://www.cnblogs.com/viter/
  • 推薦一個快速開發腳手架,基於 .netcore+pgsql,GitHub地址: https://github.com/lianggx/mystaging
  • 本文版權歸作者和博客園共有,歡迎個人轉載,必須保留此段聲明;商業轉載請聯系授權,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。
  • 歡迎大家關注我的微信公眾號,一起學習一起進步
標簽: rabbitmq, .netcore
2
0
« 上一篇: Asp.NetCore輕松學-配置服務 apollo 部署實踐
» 下一篇: Asp.NetCore依賴注入和管道方式的異常處理及日志記錄
	</div>
	<div class="postDesc">posted @ <span id="post-date">2018-11-23 10:26</span> <a href="https://www.cnblogs.com/viter/">Ron.Liang</a> 閱讀(<span id="post_view_count">956</span>) 評論(<span id="post_comment_count">8</span>)  <a href="https://i.cnblogs.com/EditPosts.aspx?postid=10003185" rel="nofollow">編輯</a> <a href="#" onclick="AddToWz(10003185);return false;">收藏</a></div>
</div>
<script src="//common.cnblogs.com/highlight/9.12.0/highlight.min.js"></script><script>markdown_highlight();</script><script type="text/javascript">var allowComments=true,cb_blogId=30433,cb_entryId=10003185,cb_blogApp=currentBlogApp,cb_blogUserGuid='e9823d0b-63cf-dd11-9e4d-001cf0cd104b',cb_entryCreatedDate='2018/11/23 10:26:00';loadViewCount(cb_entryId);var cb_postType=1;var isMarkdown=true;</script>
posted @ 2019-07-19 16:06  kennard_owen  閱讀( 405)  評論( 0編輯  收藏


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM