介紹
RabbitMQ是一個由erlang開發的基於AMQP(Advanced Message Queue)協議的開源實現。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面都非常的優秀。是當前最主流的消息中間件之一。
- 概念:
- Brocker:消息隊列服務器實體。
- Exchange:消息交換機,指定消息按什么規則,路由到哪個隊列。
- Queue:消息隊列,每個消息都會被投入到一個或者多個隊列里。
- Binding:綁定,它的作用是把exchange和queue按照路由規則binding起來。
- Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
- Vhost:虛擬主機,一個broker里可以開設多個vhost,用作不用用戶的權限分離。
- Producer:消息生產者,就是投遞消息的程序。
- Consumer:消息消費者,就是接受消息的程序。
- Channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
- 消息隊列的使用過程大概如下:
- 消息接收
- 客戶端連接到消息隊列服務器,打開一個channel。
- 客戶端聲明一個exchange,並設置相關屬性。
- 客戶端聲明一個queue,並設置相關屬性。
- 客戶端使用routing key,在exchange和queue之間建立好綁定關系。
- 消息發布
- 客戶端投遞消息到exchange。
- exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
- 消息接收
- AMQP 里主要要說兩個組件:
- Exchange 和 Queue
- 綠色的 X 就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker
- 這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Producer 和 Consumer 兩種類型。
- Exchange通常分為四種:
- fanout:該類型路由規則非常簡單,會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中,相當於廣播功能
- direct:該類型路由規則會將消息路由到binding key與routing key完全匹配的Queue中
- topic:與direct類型相似,只是規則沒有那么嚴格,可以模糊匹配和多條件匹配
- headers:該類型不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配
- 使用場景
下載與安裝
管理工具
- 參考官方文檔
操作起來很簡單,只需要在DOS下面,進入安裝目錄(安裝路徑\RabbitMQ Server\rabbitmq_server-3.2.2\sbin
)執行如下命令就可以成功安裝。
rabbitmq-plugins enable rabbitmq_management
可以通過訪問:http://localhost:15672
進行測試,默認的登陸賬號為:guest,密碼為:guest。
其他配置
1. 安裝完以后erlang需要手動設置ERLANG_HOME 的系統變量。
set ERLANG_HOME=F:\Program Files\erl9.0 #環境變量`path`里加入:%ERLANG_HOME%\bin #環境變量`path`里加入: 安裝路徑\RabbitMQ Server\rabbitmq_server-3.6.10\sbin
2.激活Rabbit MQ’s Management Plugin
使用Rabbit MQ 管理插件,可以更好的可視化方式查看Rabbit MQ 服務器實例的狀態,你可以在命令行中使用下面的命令激活。
rabbitmq-plugins.bat enable rabbitmq_management
3.創建管理用戶
rabbitmqctl.bat add_user sa 123456
4. 設置管理員
rabbitmqctl.bat set_user_tags sa administrator
5.設置權限
rabbitmqctl.bat set_permissions -p / sa ".*" ".*" ".*"
6. 其他命令
#查詢用戶: rabbitmqctl.bat list_users #查詢vhosts: rabbitmqctl.bat list_vhosts #啟動RabbitMQ服務: net stop RabbitMQ && net start RabbitMQ
以上這些,賬號、vhost、權限、作用域等基本就設置完了。
基於.net使用
RabbitMQ.Client 是RabbiMQ 官方提供的的客戶端
EasyNetQ 是基於RabbitMQ.Client 基礎上封裝的開源客戶端,使用非常方便
以下操作RabbitMQ的代碼例子,都是基於EasyNetQ的使用和再封裝,在文章底部有demo例子的源碼下載地址
創建 IBus
/// <summary> /// 消息服務器連接器 /// </summary> public class BusBuilder { public static IBus CreateMessageBus() { // 消息服務器連接字符串 // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"]; string connString = "host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456"; if (connString == null || connString == string.Empty) throw new Exception("messageserver connection string is missing or empty"); return RabbitHutch.CreateBus(connString); } }
Fanout Exchange
所有發送到Fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的所有Queue上。
Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每台子網內的主機都獲得了一份復制的消息。 所以,Fanout Exchange 轉發消息是最快的。
/// <summary> /// 消息消耗(fanout) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="handler">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="queueName">隊列名</param> /// <param name="routingKey">路由名</param> public static void FanoutConsume<T>(Action<T> handler, string exChangeName = "fanout_mq", string queueName = "fanout_queue_default", string routingKey = "") where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } /// <summary> /// 消息上報(fanout) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="topic">主題名</param> /// <param name="t">消息命名</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool FanoutPush<T>(T t, out string msg, string exChangeName = "fanout_mq", string routingKey = "") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } }
所有發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。
Direct模式,可以使用RabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。
/// <summary> /// 消息發送(direct) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="queue">發送到的隊列</param> /// <param name="message">發送內容</param> public static void DirectSend<T>(string queue, T message) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Send(queue, message); } } /// <summary> /// 消息接收(direct) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="queue">接收的隊列</param> /// <param name="callback">回調操作</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool DirectReceive<T>(string queue, Action<T> callback, out string msg) where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); bus.Receive<T>(queue, callback); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; } /// <summary> /// 消息發送 /// <![CDATA[(direct EasyNetQ高級API)]]> /// </summary> /// <typeparam name="T"></typeparam> /// <param name="t"></param> /// <param name="msg"></param> /// <param name="exChangeName"></param> /// <param name="routingKey"></param> /// <returns></returns> public static bool DirectPush<T>(T t, out string msg, string exChangeName = "direct_mq", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 消息接收 /// <![CDATA[(direct EasyNetQ高級API)]]> /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="handler">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="queueName">隊列名</param> /// <param name="routingKey">路由名</param> public static bool DirectConsume<T>(Action<T> handler, out string msg, string exChangeName = "direct_mq", string queueName = "direct_queue_default",