前言
本示例通過對服務訂閱的封裝、隱藏細節實現、統一配置、自動重連、異常處理等各個方面來打造一個簡單易用的 RabbitMQ 工廠;本文適合適合有一定 RabbitMQ 使用經驗的讀者閱讀,如果你還沒有實際使用過 RabbitMQ,也沒有關系,因為本文的代碼都是基於直接運行的實例,通過簡單的修改 RabbitMQ 即可運行。
- 解決方案如下
1. 創建基礎連接管理幫助類
首先,創建一個 .netcore 控制台項目,創建 Helper、Service、Utils 文件夾,分別用於存放通道管理、服務訂閱、公共組件。
1.1 接下來創建一個 MQConfig 類,用於存放 RabbitMQ 主機配置等信息
public class MQConfig
{
/// <summary>
/// 訪問消息隊列的用戶名
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 訪問消息隊列的密碼
/// </summary>
public string Password { get; set; }
/// <summary>
/// 消息隊列的主機地址
/// </summary>
public string HostName { get; set; }
/// <summary>
/// 消息隊列的主機開放的端口
/// </summary>
public int Port { get; set; }
}
1.2 創建 RabbitMQ 連接管理類,用於創建連接,關閉連接
1.3 創建一個消息體對象 MessageBody,用於解析和傳遞消息到業務系統中,在接下來的 MQChannel 類中會用到
public class MessageBody
{
public EventingBasicConsumer Consumer { get; set; }
public BasicDeliverEventArgs BasicDeliver { get; set; }
/// <summary>
/// 0成功
/// </summary>
public int Code { get; set; }
public string Content { get; set; }
public string ErrorMessage { get; set; }
public bool Error { get; set; }
public Exception Exception { get; set; }
}
1.4 創建一個通道類,用於訂閱、發布消息,同時提供一個關閉通道連接的方法 Stop
public class MQChannel
{
public string ExchangeTypeName { get; set; }
public string ExchangeName { get; set; }
public string QueueName { get; set; }
public string RoutekeyName { get; set; }
public IConnection Connection { get; set; }
public EventingBasicConsumer Consumer { get; set; }
/// <summary>
/// 外部訂閱消費者通知委托
/// </summary>
public Action<MessageBody> OnReceivedCallback { get; set; }
public MQChannel(string exchangeType, string exchange, string queue, string routekey)
{
this.ExchangeTypeName = exchangeType;
this.ExchangeName = exchange;
this.QueueName = queue;
this.RoutekeyName = routekey;
}
/// <summary>
/// 向當前隊列發送消息
/// </summary>
/// <param name="content"></param>
public void Publish(string content)
{
byte[] body = MQConnection.UTF8.GetBytes(content);
IBasicProperties prop = new BasicProperties();
prop.DeliveryMode = 1;
Consumer.Model.BasicPublish(this.ExchangeName, this.RoutekeyName, false, prop, body);
}
internal void Receive(object sender, BasicDeliverEventArgs e)
{
MessageBody body = new MessageBody();
try
{
string content = MQConnection.UTF8.GetString(e.Body);
body.Content = content;
body.Consumer = (EventingBasicConsumer)sender;
body.BasicDeliver = e;
}
catch (Exception ex)
{
body.ErrorMessage = $"訂閱-出錯{ex.Message}";
body.Exception = ex;
body.Error = true;
body.Code = 500;
}
OnReceivedCallback?.Invoke(body);
}
/// <summary>
/// 設置消息處理完成標志
/// </summary>
/// <param name="consumer"></param>
/// <param name="deliveryTag"></param>
/// <param name="multiple"></param>
public void SetBasicAck(EventingBasicConsumer consumer, ulong deliveryTag, bool multiple)
{
consumer.Model.BasicAck(deliveryTag, multiple);
}
/// <summary>
/// 關閉消息隊列的連接
/// </summary>
public void Stop()
{
if (this.Connection != null && this.Connection.IsOpen)
{
this.Connection.Close();
this.Connection.Dispose();
}
}
}
1.5 在上面的 MQChannel 類中
首先是在構造函數內對當前通道的屬性進行設置,其次提供了 Publish 和 OnReceivedCallback 的委托,當通道接收到消息的時候,會進入方法 Receive 中,在 Receive 中,經過封裝成 MessageBody 對象,並調用委托 OnReceivedCallback ,將,解析好的消息傳遞到外邊訂閱者的業務中。最終在 MQChannel 中還提供了消息確認的操作方法 SetBasicAck,供業務系統手動調用。
1.6 接着再創建一個 RabbitMQ 通道管理類,用於創建通道,代碼非常簡單,只有一個公共方法 CreateReceiveChannel,傳入相關參數,創建一個 MQChannel 對象
public class MQChannelManager
{
public MQConnection MQConn { get; set; }
public MQChannelManager(MQConnection conn)
{
this.MQConn = conn;
}
/// <summary>
/// 創建消息通道
/// </summary>
/// <param name="cfg"></param>
public MQChannel CreateReceiveChannel(string exchangeType, string exchange, string queue, string routekey)
{
IModel model = this.CreateModel(exchangeType, exchange, queue, routekey);
model.BasicQos(0, 1, false);
EventingBasicConsumer consumer = this.CreateConsumer(model, queue);
MQChannel channel = new MQChannel(exchangeType, exchange, queue, routekey)
{
Connection = this.MQConn.Connection,
Consumer = consumer
};
consumer.Received += channel.Receive;
return channel;
}
/// <summary>
/// 創建一個通道,包含交換機/隊列/路由,並建立綁定關系
/// </summary>
/// <param name="type">交換機類型</param>
/// <param name="exchange">交換機名稱</param>
/// <param name="queue">隊列名稱</param>
/// <param name="routeKey">路由名稱</param>
/// <returns></returns>
private IModel CreateModel(string type, string exchange, string queue, string routeKey, IDictionary<string, object> arguments = null)
{
type = string.IsNullOrEmpty(type) ? "default" : type;
IModel model = this.MQConn.Connection.CreateModel();
model.BasicQos(0, 1, false);
model.QueueDeclare(queue, true, false, false, arguments);
model.QueueBind(queue, exchange, routeKey);
return model;
}
/// <summary>
/// 接收消息到隊列中
/// </summary>
/// <param name="model">消息通道</param>
/// <param name="queue">隊列名稱</param>
/// <param name="callback">訂閱消息的回調事件</param>
/// <returns></returns>
private EventingBasicConsumer CreateConsumer(IModel model, string queue)
{
EventingBasicConsumer consumer = new EventingBasicConsumer(model);
model.BasicConsume(queue, false, consumer);
return consumer;
}
}
1.7 通道管理類的構造方法
public MQChannelManager(MQConnection conn)
{
this.MQConn = conn;
}
1.8 需要傳入一個 MQConnection 對象,僅是一個簡單的連接類,代碼如下
public class MQConnection
{
private string vhost = string.Empty;
private IConnection connection = null;
private MQConfig config = null;
/// <summary>
/// 構造無 utf8 標記的編碼轉換器
/// </summary>
public static UTF8Encoding UTF8 { get; set; } = new UTF8Encoding(false);
public MQConnection(MQConfig config, string vhost)
{
this.config = config;
this.vhost = vhost;
}
public IConnection Connection
{
get
{
if (connection == null)
{
ConnectionFactory factory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
UserName = this.config.UserName,
Password = this.config.Password,
HostName = this.config.HostName,
VirtualHost = this.vhost,
Port = this.config.Port
};
connection = factory.CreateConnection();
}
return connection;
}
}
}
1.9 在上面的代碼中,還初始化了一個靜態對象 UTF8Encoding ,使用無 utf8 標記的編碼轉換器來解析消息
2. 定義和實現服務契約
設想一下,有這樣的一個業務場景,通道管理和服務管理都是相同的操作,如果這些基礎操作都在一個地方定義,且有一個默認的實現,那么后來者就不需要去關注這些技術細節,直接繼承基礎類后,傳入相應的消息配置即可完成
消息訂閱和發布操作。
2.1 有了想法,接下來就先定義契約接口 IService,此接口包含創建通道、開啟/停止訂閱,一個服務可能承載多個通道,所以還需要包含通道列表
public interface IService
{
/// <summary>
/// 創建通道
/// </summary>
/// <param name="queue">隊列名稱</param>
/// <param name="routeKey">路由名稱</param>
/// <param name="exchangeType">交換機類型</param>
/// <returns></returns>
MQChannel CreateChannel(string queue, string routeKey, string exchangeType);
/// <summary>
/// 開啟訂閱
/// </summary>
void Start();
/// <summary>
/// 停止訂閱
/// </summary>
void Stop();
/// <summary>
/// 通道列表
/// </summary>
List<MQChannel> Channels { get; set; }
/// <summary>
/// 消息隊列中定義的虛擬機
/// </summary>
string vHost { get; }
/// <summary>
/// 消息隊列中定義的交換機
/// </summary>
string Exchange { get; }
}
2.2 接下來創建一個抽象類來實現該接口,將實現細節進行封裝,方便后面的業務服務繼承調用
public abstract class MQServiceBase : IService
{
internal bool started = false;
internal MQServiceBase(MQConfig config)
{
this.Config = config;
}
public MQChannel CreateChannel(string queue, string routeKey, string exchangeType)
{
MQConnection conn = new MQConnection(this.Config, this.vHost);
MQChannelManager cm = new MQChannelManager(conn);
MQChannel channel = cm.CreateReceiveChannel(exchangeType, this.Exchange, queue, routeKey);
return channel;
}
/// <summary>
/// 啟動訂閱
/// </summary>
public void Start()
{
if (started)
{
return;
}
MQConnection conn = new MQConnection(this.Config, this.vHost);
MQChannelManager manager = new MQChannelManager(conn);
foreach (var item in this.Queues)
{
MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
channel.OnReceivedCallback = item.OnReceived;
this.Channels.Add(channel);
}
started = true;
}
/// <summary>
/// 停止訂閱
/// </summary>
public void Stop()
{
foreach (var c in this.Channels)
{
c.Stop();
}
this.Channels.Clear();
started = false;
}
/// <summary>
/// 接收消息
/// </summary>
/// <param name="message"></param>
public abstract void OnReceived(MessageBody message);
public List<MQChannel> Channels { get; set; } = new List<MQChannel>();
/// <summary>
/// 消息隊列配置
/// </summary>
public MQConfig Config { get; set; }
/// <summary>
/// 消息隊列中定義的虛擬機
/// </summary>
public abstract string vHost { get; }
/// <summary>
/// 消息隊列中定義的交換機
/// </summary>
public abstract string Exchange { get; }
/// <summary>
/// 定義的隊列列表
/// </summary>
public List<QueueInfo> Queues { get; } = new List<QueueInfo>();
}
上面的抽象類,原封不動的實現接口契約,代碼非常簡單,在 Start 方法中,創建通道和啟動消息訂閱;同時,將通道加入屬性 Channels 中,方便后面的自檢服務使用;在 Start 方法中
/// <summary>
/// 啟動訂閱
/// </summary>
public void Start()
{
if (started)
{
return;
}
MQConnection conn = new MQConnection(this.Config, this.vHost);
MQChannelManager manager = new MQChannelManager(conn);
foreach (var item in this.Queues)
{
MQChannel channel = manager.CreateReceiveChannel(item.ExchangeType, this.Exchange, item.Queue, item.RouterKey);
channel.OnReceivedCallback = item.OnReceived;
this.Channels.Add(channel);
}
started = true;
}
使用 MQChannelManager 創建了一個通道,並將通道的回調委托 OnReceivedCallback 設置為 item.OnReceived 方法,該方法將有子類實現;在將當前訂閱服務通道創建完成后,標記服務狀態 started 為 true,防止重復啟動;同時,在該抽象類中,不實現契約的 OnReceived(MessageBody message);強制基礎業務服務類去自我實現,因為各種業務的特殊性,這塊對消息的處理不能再基礎服務中完成
接下來要介紹的是服務監控管理類,該類內部定義一個簡單的定時器功能,不間斷的對 RabbitMQ 的通訊進行偵聽,一旦發現有斷開的連接,就自動創建一個新的通道,並移除舊的通道;同時,提供 Start/Stop 兩個方法,以供程序 啟動/停止 的時候對
2.3 RabbitMQ 的連接和通道進行清理;代碼如下
public class MQServcieManager
{
public int Timer_tick { get; set; } = 10 * 1000;
private Timer timer = null;
public Action<MessageLevel, string, Exception> OnAction = null;
public MQServcieManager()
{
timer = new Timer(OnInterval, "", Timer_tick, Timer_tick);
}
/// <summary>
/// 自檢,配合 RabbitMQ 內部自動重連機制
/// </summary>
/// <param name="sender"></param>
private void OnInterval(object sender)
{
int error = 0, reconnect = 0;
OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 正在執行自檢", null);
foreach (var item in this.Services)
{
for (int i = 0; i < item.Channels.Count; i++)
{
var c = item.Channels[i];
if (c.Connection == null || !c.Connection.IsOpen)
{
error++;
OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 重新創建訂閱", null);
try
{
c.Stop();
var channel = item.CreateChannel(c.QueueName, c.RoutekeyName, c.ExchangeTypeName);
item.Channels.Remove(c);
item.Channels.Add(channel);
OnAction?.Invoke(MessageLevel.Information, $"{c.ExchangeName} {c.QueueName} {c.RoutekeyName} 重新創建完成", null);
reconnect++;
}
catch (Exception ex)
{
OnAction?.Invoke(MessageLevel.Information, ex.Message, ex);
}
}
}
}
OnAction?.Invoke(MessageLevel.Information, $"{DateTime.Now} 自檢完成,錯誤數:{error},重連成功數:{reconnect}", null);
}
public void Start()
{
foreach (var item in this.Services)
{
try
{
item.Start();
}
catch (Exception e)
{
OnAction?.Invoke(MessageLevel.Error, $"啟動服務出錯 | {e.Message}", e);
}
}
}
public void Stop()
{
try
{
foreach (var item in this.Services)
{
item.Stop();
}
Services.Clear();
timer.Dispose();
}
catch (Exception e)
{
OnAction?.Invoke(MessageLevel.Error, $"停止服務出錯 | {e.Message}", e);
}
}
public void AddService(IService service)
{
Services.Add(service);
}
public List<IService> Services { get; set; } = new List<IService>();
}
代碼比較簡單,就不在一一介紹,為了將異常等內部信息傳遞到外邊,方便使用第三方組件進行日志記錄等需求,MQServcieManager 還使用了 MessageLevel 這個定義,方便業務根據不同的消息級別對消息進行處理
public enum MessageLevel
{
Trace = 0,
Debug = 1,
Information = 2,
Warning = 3,
Error = 4,
Critical = 5,
None = 6
}
3. 開始使用
終於來到了這一步,我們將要開始使用這個基礎服務;首先,創建一個 DemoService 繼承自 MQServiceBase ;同時,
3.1 實現 MQServiceBase 的抽象方法 OnReceived(MessageBody message)
public class DemoService : MQServiceBase
{
public Action<MessageLevel, string, Exception> OnAction = null;
public DemoService(MQConfig config) : base(config)
{
base.Queues.Add(new QueueInfo()
{
ExchangeType = ExchangeType.Direct,
Queue = "login-message",
RouterKey = "pk",
OnReceived = this.OnReceived
});
}
public override string vHost { get { return "gpush"; } }
public override string Exchange { get { return "user"; } }
/// <summary>
/// 接收消息
/// </summary>
/// <param name="message"></param>
public override void OnReceived(MessageBody message)
{
try
{
Console.WriteLine(message.Content);
}
catch (Exception ex)
{
OnAction?.Invoke(MessageLevel.Error, ex.Message, ex);
}
message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag, true);
}
}
以上的代碼非常簡單,幾乎不需要業務開發者做更多的其它工作,開發者只需要在構造方法內部傳入一個 QueueInfo 對象,如果有多個,可一並傳入
public partial class QueueInfo
{
/// <summary>
/// 隊列名稱
/// </summary>
public string Queue { get; set; }
/// <summary>
/// 路由名稱
/// </summary>
public string RouterKey { get; set; }
/// <summary>
/// 交換機類型
/// </summary>
public string ExchangeType { get; set; }
/// <summary>
/// 接受消息委托
/// </summary>
public Action<MessageBody> OnReceived { get; set; }
/// <summary>
/// 輸出信息到客戶端
/// </summary>
public Action<MQChannel, MessageLevel, string> OnAction { get; set; }
}
並設置 vHost 和 Exchange 的值,然后剩下的就是在 OnReceived(MessageBody message) 方法中專心的處理自己的業務了;在這里,我們僅輸出接收到的消息,並設置 ack 為已成功處理。
4. 測試代碼
4.1 在 Program,我們執行該測試
class Program
{
static void Main(string[] args)
{
Test();
}
static void Test()
{
MQConfig config = new MQConfig()
{
HostName = "127.0.0.1",
Password = "123456",
Port = 5672,
UserName = "dotnet"
};
MQServcieManager manager = new MQServcieManager();
manager.AddService(new DemoService(config));
manager.OnAction = OnActionOutput;
manager.Start();
Console.WriteLine("服務已啟動");
Console.ReadKey();
manager.Stop();
Console.WriteLine("服務已停止,按任意鍵退出...");
Console.ReadKey();
}
static void OnActionOutput(MessageLevel level, string message, Exception ex)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("{0} | {1} | {2}", level, message, ex?.StackTrace);
Console.ForegroundColor = ConsoleColor.Gray;
}
}
4.2 利用 MQServcieManager 對象,完成了對所有消息訂閱者的管理和監控,
4.3 首先我們到 RabbitMQ 的 web 控制台發布一條消息到隊列 login-message 中
4.3 然后查看輸出結果
消息已經接收並處理,為了查看監控效果,我還手動將網絡進行中斷,然后監控服務檢測到無法連接,嘗試重建通道,並將消息輸出
- 圖中步驟說明
- 0:服務啟動
- 1:自檢啟動
- 2:服務報錯,嘗試重建,重建失敗,繼續監測
- 3:RabbitMQ 內部監控自動重連,監控程序檢測到已恢復,收到消息並處理
- 4:后續監控服務繼續進行監控
結語
在文章中,我們建立了 RabbitMQ 的通道管理、基礎服務管理、契約實現等操作,讓業務開發人員通過簡單的繼承實現去快速的處理業務系統的邏輯,后續如果有增加消費者的情況下,只需要通過 MQServcieManager.AddService 進行簡單的調用操作即可,無需對底層技術細節進行過多的改動。
源碼下載:
https://github.com/lianggx/EasyAspNetCoreDemo/tree/master/Ron.MQTest