一、消息隊列
消息隊列作為分布式系統中的重要組件,常用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。至於各種消息隊列的優缺點比較,在這里就不做擴展了,網上資源很多。
更多內容可參考 消息隊列及常見消息隊列介紹。我在這里選用的是RabbitMq。
安裝和配置:Windows下RabbitMq安裝及配置
二、RabbitMq簡單介紹
RabbitMQ是一款基於AMQP(高級消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、 Erlang等。在RabbitMq中首先要弄清楚的概念是 交換機、隊列、綁定。基本的消息通訊步驟就是首先定義ExChange,然后定義隊列,然后綁定交換機和隊列。
需要明確的一點兒是,發布者在發送消息是,並不是把消息直接發送到隊列中,而是發送到Exchang,然后由交互機根據定義的消息匹配規則,在將消息發送到隊列中。
Exchange有四種消息消息分發規則:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。
詳細的概念介紹推薦查看:消息隊列之RabbitMq
三、EasyNetQ使用
Easynetq是一個簡單易用的Rabbitmq Net客戶端。同時支持 NetFramework和NetCore。GitHub地址。它是針對RabbitMq Net客戶端的進一步封裝。關於EasyNetQ的簡單使用推薦教程:EasyNetQ的介紹。
本文主要介紹基於EasyNeq的高級API的使用。EasyNetQ的作者在核心的IBus接口中盡量避免暴露AMQP中的交換機、隊列、綁定這些概念,使用者即使不去了解這些概念,也能完成消息的發送接收。這相當簡潔,但某些情況下,基於應用場景的需要,我們需要自定義交換機、隊列、綁定這些信息,EasyNetQ允許你這么做,這些都是通過IAdvanceBus接口實現。
3.1 項目裝備
這里為了演示,首先新建一個項目,包括一個發布者,兩個接收者,一個公共的類庫
安裝EasyNetQ: NuGet>Install-Package EasyNetQ
3.2 簡單封裝
在Common項目里面是針對Easynetq的使用封裝,主要目錄如下
在RabbitMq文件夾下,是針對消息發送接收的簡單封裝。
首先來看下RabbitMqManage,主要的發送和訂閱操作都在這個類中。其中ISend接口定義了發送消息的規范,SendMessageManage是ISend的實現。IMessageConsume接口定義訂閱規范。
MesArg 和PushMsg分別是訂閱和發送需用到的參數類。RabbitMQManage是暴露在外的操作類。
首先看發送的代碼
public enum SendEnum { 訂閱模式 = 1, 推送模式 = 2, 主題路由模式 = 3 } public class PushMsg { /// <summary> /// 發送的數據 /// </summary> public object sendMsg { get; set; } /// <summary> /// 消息推送的模式 /// 現在支持:訂閱模式,推送模式,主題路由模式 /// </summary> public SendEnum sendEnum { get; set; } /// <summary> /// 管道名稱 /// </summary> public string exchangeName { get; set; } /// <summary> /// 路由名稱 /// </summary> public string routeName { get; set; } } internal interface ISend { Task SendMsgAsync(PushMsg pushMsg, IBus bus); void SendMsg(PushMsg pushMsg, IBus bus); } internal class SendMessageMange : ISend { public async Task SendMsgAsync(PushMsg pushMsg, IBus bus) { //一對一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判斷推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.訂閱模式) { //廣播訂閱模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主題路由模式) { //主題路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } await bus.Advanced.PublishAsync(ex, pushMsg.routeName.ToSafeString(""), false, message) .ContinueWith(task => { if (!task.IsCompleted && task.IsFaulted)//消息投遞失敗 { //記錄投遞失敗的消息信息 } }); } public void SendMsg(PushMsg pushMsg, IBus bus) { //一對一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判斷推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.訂閱模式) { //廣播訂閱模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主題路由模式) { //主題路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } bus.Advanced.Publish(ex, pushMsg.routeName.ToSafeString(""), false, message); } }
在EasyNetQ中對於異步發送消息的時候,消息是否送達Broker只需要查看異步發送方法最終執行成功還是失敗,成功就表示消息送達,如果失敗可以將失敗后的消息存入數據庫中,然后用后台線程輪詢
數據庫表,將失敗后的消息進行重新 發送。這種方式還可以進一步變成消息表,就是先將要發送的消息存入消息表中,然后后台線程輪詢消息表來進行消息發送。一般這種方式被廣泛用於分布式事務中,
將本地數據庫操作和消息表寫入放入同一個本地事務中,來保證消息發送和本地數據操作的同步成功,因為我的系統中,分布式事務的涉及很少,所以就沒這樣去做,只是簡單的在異步發送的時候監控下
是否發送失敗,然后針對失敗的消息做一個重新發送的機制。這里,推薦大佬的NetCore分布式事務解決方案 CAP GitHub地址。
接着看一下消息訂閱接收涉及的代碼
public class MesArgs { /// <summary> /// 消息推送的模式 /// 現在支持:訂閱模式,推送模式,主題路由模式 /// </summary> public SendEnum sendEnum { get; set; } /// <summary> /// 管道名稱 /// </summary> public string exchangeName { get; set; } /// <summary> /// 對列名稱 /// </summary> public string rabbitQueeName { get; set; } /// <summary> /// 路由名稱 /// </summary> public string routeName { get; set; } } public interface IMessageConsume { void Consume(string message); }
在訂閱中我定義了一個接口,最終業務代碼中,所有的消息訂閱類,都需要繼續此接口
最后,我們來看下對外使用的操作類
public class RabbitMQManage { private volatile static IBus bus = null; private static readonly object lockHelper = new object(); /// <summary> /// 創建服務總線 /// </summary> /// <param name="config"></param> /// <returns></returns> public static IBus CreateEventBus() { //獲取RabbitMq的連接地址 //SystemJsonConfigManage 是我簡單封裝的一個json操作類,用於針對json文件的讀寫操作 var config = SystemJsonConfigManage.GetInstance().AppSettings["MeessageService"]; if (string.IsNullOrEmpty(config)) throw new Exception("消息地址未配置"); if (bus == null && !string.IsNullOrEmpty(config)) { lock (lockHelper) { if (bus == null) bus = RabbitHutch.CreateBus(config); } } return bus; } /// <summary> /// 釋放服務總線 /// </summary> public static void DisposeBus() { bus?.Dispose(); } /// <summary> /// 消息同步投遞 /// </summary> /// <param name="pushMsg"></param> /// <returns></returns> public static bool PushMessage(PushMsg pushMsg) { bool b = true; try { if (bus == null) CreateEventBus(); new SendMessageMange().SendMsg(pushMsg, bus); b = true; } catch (Exception ex) { b = false; } return b; } /// <summary> /// 消息異步投遞 /// </summary> /// <param name="pushMsg"></param> public static async Task PushMessageAsync(PushMsg pushMsg) { try { if (bus == null) CreateEventBus(); await new SendMessageMange().SendMsgAsync(pushMsg, bus); } catch (Exception ex) { throw ex; } } /// <summary> /// 消息訂閱 /// </summary> public static void Subscribe<TConsum>(MesArgs args) where TConsum : IMessageConsume,new() { if (bus == null) CreateEventBus(); if (string.IsNullOrEmpty(args.exchangeName)) return; Expression<Action<TConsum>> methodCall; IExchange ex = null; //判斷推送模式 if (args.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Direct); } if (args.sendEnum == SendEnum.訂閱模式) { //廣播訂閱模式 ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Fanout); } if (args.sendEnum == SendEnum.主題路由模式) { //主題路由模式 ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Topic); } IQueue qu; if (string.IsNullOrEmpty(args.rabbitQueeName)) { qu = bus.Advanced.QueueDeclare(); } else qu = bus.Advanced.QueueDeclare(args.rabbitQueeName); bus.Advanced.Bind(ex, qu, args.routeName.ToSafeString("")); bus.Advanced.Consume(qu, (body, properties, info) => Task.Factory.StartNew(() => { try { lock (lockHelper) { var message = Encoding.UTF8.GetString(body); //處理消息 methodCall = job => job.Consume(message); methodCall.Compile()(new TConsum()); } } catch (Exception e) { throw e; } })); } }
這里面主要封裝了消息的發送和訂閱,以及IBus單例的創建。在后續的消息發送和訂閱主要就通過此處來實現。我們看到一開始的類目結構中還有一個RaExMessageHandleJob類,這個類就是一個后台
循環任務,用來監測數據庫中是否保存了發送失敗的消息,如果有,則將消息取出,嘗試重新發送。在此就不做多的介紹,大家可以根據自己的實際需求來實現。
3.3 發布者
現在來看一下消息發布者的代碼
主要的發送代碼都在Send類中,其中appsettings.json里面配置了Rabbitmq的連接地址,TestDto只是一個為了方便演示的參數類。
下面看一下Program里面的代碼
很簡單的一個發送消息調用。
然后來看一下Send類中的代碼
public class Send { /// <summary> /// 發送消息 /// </summary> public static void SendMessage() { //需要注意一點兒,如果發送的時候,在該管道下找不到相匹配的隊列框架將默認丟棄該消息 //推送模式 //推送模式下,需指定管道名稱和路由鍵值名稱 //消息只會被發送到和指定路由鍵值完全匹配的隊列中 var directdto = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是推送模式" }, exchangeName = "message.directdemo", routeName= "routekey", sendEnum =SendEnum.推送模式 }; //同步發送 ,返回true或fasle true 發送成功,消息已存儲到Rabbitmq中,false表示發送失敗 var b= RabbitMQManage.PushMessage(directdto); //異步發送,如果失敗,失敗的消息會被寫入數據庫,會有后台線程輪詢數據庫進行重新發送 //RabbitMQManage.PushMessageAsync(directlist); //訂閱模式 //訂閱模式只需要指定管道名稱 //消息會被發送到該管道下的所有隊列中 var fanoutdto = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是訂閱模式" }, exchangeName = "message.fanoutdemo", sendEnum = SendEnum.訂閱模式 }; //同步發送 var fb = RabbitMQManage.PushMessage(fanoutdto); //異步發送 //RabbitMQManage.PushMessageAsync(fanoutdto); //主題路由模式 //路由模式下需指定 管道名稱和路由值 //消息會被發送到該管道下,和路由值匹配的隊列中去 var routedto = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是主題路由模式1", }, exchangeName = "message.topicdemo", routeName="a.log", sendEnum=SendEnum.主題路由模式 }; var routedto2 = new PushMsg() { sendMsg = new TestDto() { Var1 = "這是主題路由模式2", }, exchangeName = "message.topicdemo", routeName = "a.log.a.b", sendEnum = SendEnum.主題路由模式 }; //同步發送 var rb = RabbitMQManage.PushMessage(routedto); var rb2 = RabbitMQManage.PushMessage(routedto2); //異步發送 //RabbitMQManage.PushMessageAsync(routedto); } }
3.4 消費者
首先來看下消費者端的目錄結構
其中appsettings.json中配置Rabbitmq的連接信息,Program中只是簡單調用消息訂閱
主要的消息訂閱代碼都在MessageManage文件夾下,MessageManService用於定義消息訂閱類型
public class MessageManService { public static void Subsribe() { Task.Run(() => { //概念 一個管道下面可以綁定多個隊列。 //發送消息 是指將消息發送到管道中,然后由rabbitmq根據發送規則在將消息具體的轉發到對應到管道下面的隊列中 //消費消息 是指消費者(即服務)從管道下面的隊列中獲取消息 //同一個隊列 可以有多個消費者(即不同的服務,都可以連接到同一個隊列去獲取消息) //但注意 當一個隊列有多個消費者的時候,消息會被依次分發到不同的消費者中。比如第一條消息給第一個消費者,第二條消息給第二個消費者(框架內部有一個公平分發的機制) //推送模式時 需指定管道名稱和路由值 //隊列名稱可自己指定 //注意 ,管道名稱和路由名稱一定要和發送方的管道名稱和路由名稱一致 //無論這個管道下面掛靠有多少個隊列,只有路由名稱和此處指定的路由名稱完全一致的隊列,才會收到這條消息。 var dirarg = new MesArgs() { sendEnum = SendEnum.推送模式, exchangeName = "message.directdemo", rabbitQueeName = "meesage.directmessagequene", routeName = "routekey" }; RabbitMQManage.Subscribe<DirectMessageConsume>(dirarg); //訂閱模式時需指定管道名稱,並且管道名稱要和發送方管道名稱一致 //隊列名稱可自己指定 //所有這個管道下面的隊列,都將收到該條消息 var fanoutrg = new MesArgs() { sendEnum = SendEnum.訂閱模式, exchangeName = "message.fanoutdemo", rabbitQueeName = "meesage.fanoutmessagequene" }; RabbitMQManage.Subscribe<FanoutMessageConsume>(fanoutrg); //路由模式時需指定管道名稱,路由關鍵字並且管道名稱,路由關鍵字要和發送方的一致 //隊列名稱可自己指定 //消息將被發送到管道下面的能匹配路由關鍵字的隊列中 //也就是說 路由模式時,有多少隊列能收到消息,取決於該隊列的路由關鍵字是否匹配,只要匹配就能收到消息 //符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞 var topicrg = new MesArgs() { sendEnum = SendEnum.主題路由模式, exchangeName = "message.topicdemo", rabbitQueeName = "message.topicmessagequene", routeName = "#.log.#" }; RabbitMQManage.Subscribe<TopicMessageConsume>(topicrg); }); } }
Consume文件夾下主要定義了消息的業務處理
//推送模式過來的消息 public class DirectMessageConsume : IMessageConsume { //消息的處理方法中最好不要進行try catch操作 //如果發送異常,EasyNetQ會自動將消息放入錯誤隊列中 //如果在Consume方法體中捕獲了異常並且沒有拋出,會默認消息處理成功 //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次 //(比如說第一次正在處理消息的時候服務掛掉,服務重啟后這條消息又會重新推送過來) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } } //廣播模式過來的消息 public class FanoutMessageConsume : IMessageConsume { //消息的處理方法中最好不要進行try catch操作 //如果發送異常,EasyNetQ會自動將消息放入錯誤隊列中 //如果在Consume方法體中捕獲了異常並且沒有拋出,會默認消息處理成功 //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次 //(比如說第一次正在處理消息的時候服務掛掉,服務重啟后這條消息又會重新推送過來) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } } //主題路由模式過來的消息 public class TopicMessageConsume : IMessageConsume { //消息的處理方法中最好不要進行try catch操作 //如果發送異常,EasyNetQ會自動將消息放入錯誤隊列中 //如果在Consume方法體中捕獲了異常並且沒有拋出,會默認消息處理成功 //消息的冪等性需業務方自行處理,也就是說同一條消息可能會接收到兩次 //(比如說第一次正在處理消息的時候服務掛掉,服務重啟后這條消息又會重新推送過來) public void Consume(string message) { var dto = JsonConvert.DeserializeObject<TestDto>(message); Console.WriteLine(dto.Var1 + ";" + dto.Var2 + ";" + dto.Var3); } }
可以看到,所有的類都集成自我們定義的接口IMessageConsume。
四、總結
在EasyNetQ中如果需要生產者確認功能,則需要在Rabbitmq的連接配置中設置publisherConfirms=true,這將會開啟自動確認。在使用高級api定義交換機和隊列時可以自己定義多種參數,比如消息是否持久化,消息最大長度等等,具體大家可以去看官方文檔,上面有詳細介紹。Easynetq會自動去捕獲消費異常的消息並將其放入到錯誤隊列中,而且官方提供了重新發送錯誤隊列中消息的方法,當然你也可以自己去監視錯誤列隊,對異常消息進行處理。EasyNetQ里面作者針對消息的發布確認和消費確認都做了封裝。在EasyNetQ中發布消息的時候如果選用的同步發送,只要沒有拋出異常,我們就可以認為任務消息已經正確到達Broker,而異步發送的話需要我們自己去監視Task是否成功 。如果開啟了自動確認,並不需要我們在消息處理的方法體中手動返回ack信息,只要消息被 正確處理就會自動ack。雖然RabbitMq中也有事務消息,但由於性能比較差,並不推薦使用。其實,只要我們能明確消息是否發布成功和消費成功,就將會很容易在這個基礎上擴展出分布式事務的處理。