EasyNetQ 是一個簡潔而適用的RabbitMQ .NET類庫,本質上是一個在RabbitMQ.Client之上提供服務的組件集合。
應用使用rabbitmq需要經過總線接口IBus或者IAdvanceBus,大部分時候我們使用的是IBus,它提供了三種消息模式:Publish/Subscribe, Request/Response和 Send/Receive,可以滿足大多數需求。
EasyNetQ規定,每一個你想發送的消息必須用一個class表示,簡單說就是一個實體類。默認情況下,當你發布一個消息,EasyNetQ會檢查消息類型,基於類型名稱、命名空間和程序集創建交換機和隊列,當然也可以使用指定的名稱去創建交換機和隊列,而消息默認使用Newtonsoft.Json 序列化成JSON格式,當然我們也可以替換成自己序列化方式,實現很簡單,比如我們要使用xml格式序列化,因為EasyNetQ內部使用Ioc容器組織在一起,那么我們只需要實現ISerializer接口並注冊到DI容器中即可。
1、EasyNetQ安裝
使用Nuget搜索EasyNetQ直接安裝即可:

2、EasyNetQ連接
using System; namespace EasyNetQ.Demo { public class TextMessage { public string Text { get; set; } } class Program { static void Main(string[] args) { //方式1 var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60"; var bus1 = RabbitHutch.CreateBus(connectionString);
//方式2 HostConfiguration host = new HostConfiguration(); host.Host = "192.168.18.129"; host.Port = 5672; ConnectionConfiguration connection = new ConnectionConfiguration(); connection.Port = 5672; connection.Password = "123456"; connection.UserName = "admin"; connection.VirtualHost = "/"; connection.Timeout = 60; connection.Hosts = new HostConfiguration[] { host }; var bus2 = RabbitHutch.CreateBus(connection, services => { }); //使用bus實現業務 //關閉連接字需要調用bus的Dispose方法 Console.ReadKey(); } } }
其實EasyNetQ連接過程其實就是IBus的創建過程,IBus的創建有兩種方式,一種是以連接字符串的形式,類似數據庫的連接,一種是通過ConnectionConfiguration來創建,其實連接字符串最終也是構建這個對象來連接rabbitmq,而連接字符串的連接類型主要有一下幾個:
host 這個字段是必選的。如要具體指定你要連接服務器端口,你用標准格式 host:port。假如你省略了端口號,AMQP默認端口是5672。如果連接到RabbitMQ集群,需要指定每一個集群節點用逗號(.)分隔
virtualhost 默認虛擬主機是’/’
username 默認是'guest'
password 默認為'guest'
requestedHearbeat 默認為10秒鍾。沒有心跳設置為0
prefetchcount 默認為50.這個值是在EasyNetQ發送ack之前發送給RabbitMQ的消息數。不限制設置為0(不推薦). 為了在消費者之間保持公平和平衡設置為1.
persistentMessages 默認為true。這個決定了在發送消息時采用什么樣的delivery_mode。設置為true,RabbitMQ將會把消息持久化到磁盤,並且在服務器重啟后仍會存在。設置為false可以提高性能收益。
timeout 模式值為10秒。不限制超時時間設置為0.當超時事時拋出System.TimeoutException.
關閉連接字需要調用Dispose方法即可,所以,在使用IBus時,切記不要無腦的使用using
3、EasyNetQ模式:Publish/Subscribe
using System; namespace EasyNetQ.Demo { public class TextMessage { public string Text { get; set; } } class Program { static void Main(string[] args) { var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60"; //訂閱/Subscribe { var bus = RabbitHutch.CreateBus(connectionString); bus.Subscribe<TextMessage>("subscriptionId", tm => { Console.WriteLine("Recieve Message: {0}", tm.Text); }); } //發布/Publish using (var bus = RabbitHutch.CreateBus(connectionString, registerServices => { })) { var input = ""; Console.WriteLine("Please enter a message. 'q'/'Q' to quit."); while ((input = Console.ReadLine()).ToLower() != "q") { bus.Publish(new TextMessage { Text = input }); } } } } }
這個例子是EasyNetQ三種模式中的Publish/Subscribe,上面的代碼執行后,在rabbitmq中會創建一個名為 EasyNetQ.Demo.TextMessage, EasyNetQ.Demo 的交換機和一個名為 EasyNetQ.Demo.TextMessage, EasyNetQ.Demo_subscriptionId 的隊列,當然,交換機和隊列的名稱也可以自定義。
發布訂閱模式有主要有以下特點:
1、發布消息使用Publish方法
bus.Publish(new TextMessage
{
Text = input
});
一個消息類型對應一個交換機,如果交換機不存在,會自動創建一個topic類型的交換機,默認情況下,交換機名稱為消息類型的全限定名(命名空間.類型,程序集名),例如上面的例子將會創建名為EasyNetQ.Demo.TextMessage, EasyNetQ.Demo 的交換機,當然,名稱可以自定義。
2、訂閱消息調用Subscribe方法
bus.Subscribe<TextMessage>("subscriptionId", tm => { Console.WriteLine("Recieve Message: {0}", tm.Text); });
每次調用Subscribe方法都會去創建一個新的隊列(如果不存在), Subscribe方法有一個subscriptionId參數,默認情況下,隊列名稱為全限定名(命名空間.類型,程序集名)+subscriptionId,而且隊列將綁定到對應的交換機,默認綁定的路由是#,即隊列可以接受所有發布到交換機的消息,例如上面的例子會創建名為 EasyNetQ.Demo.TextMessage, EasyNetQ.Demo_subscriptionId 的隊列,綁定的路由是#。隊列名和路由均可自定義。
因為Subscribe方法的subscriptionId參數,在未明確指定隊列的情況下,如果使用同一個subscriptionId參數,則表示從同一個隊列消費,因此可以使用subscriptionId來區分隊列。
注:因為隊列是在Subscribe方法執行后創建的,如果未調用Subscribe方法,即隊列還未創建,那么調用Publish方法時,只會生成一個交換機,而且發布的消息將會丟失。
3、隊列名有兩種方式自定義。
一種是使用QueueAttribute特性標識消息類型,指定隊列名和交換機名,這個也是交換機自定義名稱的方式
[Queue("my_queue_name", ExchangeName = "my_exchange_name")] public class TextMessage { public string Text { get; set; } }
當使用特性聲明隊列名稱時,真實創建的隊列名其實是:my_queue_name + subscriptionId,如上面的例子會創建名為my_queue_name_subscriptionId的隊列。
另一種是在使用Subscribe方法訂閱消息是指定,當在這里指定時,subscriptionId就無效了,生成的隊列名就是自定義的名稱,不帶subscriptionId。
bus.Subscribe<TextMessage>("subscriptionId", tm => { Console.WriteLine("Recieve Message: {0}", tm.Text); }, cfg => { cfg.WithQueueName("my_queue_name")//生成的隊列名是my_queue_name,不帶subscriptionId .WithTopic("a#");//路由匹配以a開頭的路由,WithTopic是可以多次調用,讓隊列可以以不同的路由綁定到交換機 });
另外,這種方式還可以指定隊列綁定到交換機的路由,當訂閱處指定路由后,需要在發布消息時也指定對應路由才能將消息發布到隊列。
bus.Publish(new TextMessage { Text = input }, "a#");//以a#為路由發布消息
或者
bus.Publish(new TextMessage { Text = input }, cfg => { cfg.WithTopic("a#");//以a#為路由發布消息 });
4、可以訂閱消息,當然也可以取消訂閱,當使用Subscribe訂閱消息時,會返回一個ISubscriptionResult,取消訂閱只需要調用ISubscriptionResult的Dispose方法即可
subscriptionResult.Dispose(); // 這個等價與 subscriptionResult.ConsumerCancellation.Dispose();
調用Dispose方法將停止EasyNetQ對隊列的消費,並且關閉這個消費者的channel。
注意:IBus和IAndvancedBus的dispose,能夠取消所有消費者,並關閉對RabbitMQ的連接。
5、消息的發布訂閱也提供了異步操作,我們可以根據自己的需求來決定是否使用
//異步發布 bus.PublishAsync(new TextMessage { Text = input });
//異步訂閱 bus.SubscribeAsync<TextMessage>("subscriptionId", async tm => { await Task.Run(() => { Console.WriteLine("Recieve Message: {0}", tm.Text); }); });
6、消息的發布與訂閱過程不必再同一個項目中,也不需要使用同一個IBus,只需要注意發布訂閱過程中的交換機和隊列即可,因為默認情況下,交換機名和隊列名都是根據消息類型生成,所以此時最好具體指明交換機名和隊列名。
4、EasyNetQ模式:Request/Response
using System; namespace EasyNetQ.Demo { public class MyRequest { public string Text { get; set; } } public class MyResponse { public string Text { get; set; } } class Program { static void Main(string[] args) { var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60"; //響應/Respond { var bus = RabbitHutch.CreateBus(connectionString); bus.Respond<MyRequest, MyResponse>(request => { return new MyResponse { Text = "Respond: " + request.Text }; }); }//請求/Request using (var bus = RabbitHutch.CreateBus(connectionString)) { var input = ""; Console.WriteLine("Please enter a message. 'q'/'Q' to quit."); while ((input = Console.ReadLine()).ToLower() != "q") { var response = bus.Request<MyRequest, MyResponse>(new MyRequest { Text = input }, cfg => { }); Console.WriteLine(response.Text); } } } } }
Request/Response模式類似於請求響應的過程——發送一個請求到服務器,服務器然后處理請求后返回一個響應。上述代碼執行后,會在rabbitmq中創建一個名為easy_net_q_rpc的交換機,但是類型是direct,另外還會創建兩個隊列,名為EasyNetQ.Demo.MyRequest, EasyNetQ.Demo和easynetq.response.xxxxxxxxxxxxxxx。
請求響應模式的特點:
1、通過Request方法發送請求
var response = bus.Request<MyRequest, MyResponse>(new MyRequest { Text = input }, cfg => { });
請求之后會創建一個名為easy_net_q_rpc的交換機(如果不存在),類型是direct,同時,當前線程會阻塞,等待消息被消費處理,或者等待一段時間后(如例子中連接字符串中的timeout=60),會拋出System.TimeoutException異常。另外這里還會生成一個easynetq.response.xxxxxxxxxxxxxxx的隊列,這是一個auto-delete類型的隊列,同時與當前連接綁定,當連接關閉后,這個隊列就會自動刪除,而且這個隊列將會自動綁定到easy_net_q_rpc交換機,路由名就是隊列名。。
2、通過Respond方法處理請求發送的消息
bus.Respond<MyRequest, MyResponse>(request => { return new MyResponse { Text = "Respond: " + request.Text }; });
Respond方法執行后,會生成一個隊列,隊列名是請求消息的全限定名(命名空間.類型,程序集名),同時隊列將自動綁定到easy_net_q_rpc交換機,路由名就是隊列名,之后所有該請求類型的消息都將發送到這個隊列。而且Respond方法也將從這個隊列獲取消息消費。另外,我們是可以指定這個隊列名:
bus.Respond<MyRequest, MyResponse>(request => { return new MyResponse { Text = "Respond: " + request.Text }; }, cfg => { cfg.WithQueueName("my_queue_name"); });
只有當請求發送的消息被Respond方法中的處理過程處理后,Request才能得到結果繼續執行,所以,盡可能的不要在Respond方法中寫過多的耗時操作。
注:因為隊列是在Respond方法執行后創建的,如果在未調用Respond方法時,即隊列未創建,那么Request方法發送的消息將會丟失,而Request方法發送消息后將會造成線程阻塞,那么最終的結果就是一直等待到System.TimeoutException異常拋出。
3、Request方法和Respond方法都提供了異步操作
//異步請求 var task = bus.RequestAsync<MyRequest, MyResponse>(new MyRequest { Text = input }, cfg => { }); task.Wait();
//異步響應 bus.RespondAsync<MyRequest, MyResponse>(async request => { return await Task.Run(() => new MyResponse { Text = "Respond: " + request.Text }); });
4、同樣的,請求和響應過程可以在不同項目中,但是要注意所對應的隊列,因為隊列名和交換機轉發路由都是根據請求消息生成的,所以此時建議使用自定義隊列名的方式。
5、EasyNetQ模式:Send/Receive
using System; namespace EasyNetQ.Demo { public class MyMessage { public string Text { get; set; } } class Program { static void Main(string[] args) { var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60"; //接收/Receive { var bus = RabbitHutch.CreateBus(connectionString); bus.Receive<MyMessage>("my_queue_name", r => Console.WriteLine("Receive:" + r.Text)); } //發送/Send using (var bus = RabbitHutch.CreateBus(connectionString)) { var input = ""; Console.WriteLine("Please enter a message. 'q'/'Q' to quit."); while ((input = Console.ReadLine()).ToLower() != "q") { bus.Send("my_queue_name", new MyMessage { Text = input }); } } } } }
上述代碼執行完成后,會創建一個名為my_queue_name的隊列,但是不會創建交換機,也就是說Send/Receive不是基於交換機,為什么有了Publish/Subsrcibe和Request/Response模式,還要添加一個Send/Receive模式?發送接收模式主要是針對隊列,前面說了,EasyNetQ規定每個發送的消息必須是一個實體類型,消息經過序列化之后放到rabbitmq中去的,而Publish/Subsrcibe和Request/Response模式默認都是一個隊列對應一個實體,也就是說,一個隊列中只能存放一個類型序列化后的數據。而Send/Receive模式允許一個隊列存放多種類型格式化后的數據,在接收時再根據類型匹配。
發送接收模式特點:
1、發送消息使用Send方法
bus.Send("my_queue_name", new MyMessage { Text = input });
Send方法執行后會創建隊列(如果不存在),而且可以往同一隊列中發送不同類型的消息
bus.Send("my_queue_name", new CatMessage { Miao = "Miao" }); bus.Send("my_queue_name", new DogMessage { Wang = "Wang" });
2、接收消息使用Receive方法
bus.Receive<MyMessage>("my_queue_name", r => { Console.WriteLine("Receive:" + r.Text); });
Receive方法也會創建隊列(如果不存在),當隊列中有多種類型的消息時,可以在Receive方法中的hander中添加不同類型的接收者
bus.Receive("my_queue_name", hander => { hander.Add<CatMessage>(r => { Console.WriteLine("Cat:" + r.Miao); }); hander.Add<DogMessage>(r => { Console.WriteLine("Dog:" + r.Wang); }); });
3、每次調用Receive方法都會創建一個消費者,多次調用Receive方法后,同一隊列的消息會分發到不同消費者中。對於一個消費者,如果消息送達了消費者的接收隊列,會根據添加的類型進行匹配,如果匹配成功,則用對應的邏輯處理,如果沒有匹配到任何接收者,EasyNetQ將會把消息帶着一個異常“No handler for message type寫進EasyNetQ的錯誤隊列”。
4、同樣的,發送和接收可以在不同項目中,但是由於接收過程是根據類型匹配的,那么就需要發送和接收中使用的是同一類型,而且,每一個Receive方法中添加的接收器要盡可能的覆蓋所接收的隊列中所有的消息類型,這樣才能保證不會導致消息匹配不到接受者。
5、Send方法和Receive方法也有異步操作。
bus.SendAsync("my_queue_name", new MyMessage { Text = input });
bus.Receive<MyMessage>("my_queue_name", async r => { await Task.Run(() => { Console.WriteLine("Receive:" + r.Text); }); });
