一、RabbitMQ是什么?
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
二、消息隊列的特性
解耦:消息的生產者與消費者均基於AMQP協議(相同的接口與規范)進行發送與接收消息,互相不存依賴;
冗余:消息只有處理了才會被刪除,除非明確允許多個消費者可以收到同一消息的多個副本,否則每個消息只會被單個消費者接收並處理;
擴展性:可增加或減少多個消息的生產者與消費者,兩者的改動均不會影響到雙方;
靈活性 & 峰值處理能力:因為有良好的擴展性,所以可視服務器的處理情況【可稱為:消費者】(比如:高並發負載過大)動態的增減服務器,以提提高處理能力(可稱為:負載均衡);
可恢復性:消息的生產者與消費者不論哪一方出現問題,均不會影響消息的正常發出與接收(當然單一的生產者與消費者除外,如果是這樣也就沒有必要使用分布式消息隊列);
送達保證:只有消息被確認成功處理后才會被刪除,否則會重新分發給其它的消費者進行處理,直到確認處理成功為止;
排序保證:先進先出是隊列的基本特性;
緩沖:同一時間有多個消息進入消息隊列,但是同一時間可以指定一個多個消息被消息者接收並處理,其余的消息處理等待狀態,這樣可以降低服務器的壓力,起到緩沖的作用;
理解數據流:傳遞的消息內容以字節數組為主,但可以將對象序列化后成字節數組,然后在消費者接收到消息后,可反序列化成對象並進行相關的處理,應用場景:CQRS;
異步通信:允許將一個或多個消息放入消息隊列,但並不立即處理它,而是在恰當的時候再去由一個或多個消費者分別接收並處理它們;
以上是我的個人理解,也可參看《使用消息隊列的 10 個理由》
應用場景:針對高並發且無需立即返回處理結果的時候,可以考慮使用消息隊列,如果處理需要立即返回結果則不適合;
三、RabbitMQ環境的安裝
1.服務器端:
A.需要先安裝Erlang環境,下載地址:http://www.erlang.org/download.html,可能有時無法正常訪問,可以通過VPN代理來訪問該網站或在其它網站上下載(比如:CSDN)
B.安裝RabbitMQ Server(有針對多個操作系統的下載,我這邊以WINDOWS平台為主),下載地址:http://www.rabbitmq.com/download.html,
說明:最新版的Erlang及abbitMQ Server安裝后,一般WINDOWS環境變量及服務均都已正常安裝與並正常啟動,可不是最新版或沒有安裝好,則可執行以下命令:
Setx ERLANG_HOME “C:\Program Files\erl7.1″ -Erlang的-安裝目錄,也可通過系統屬性-->高級-->環境變量來手動設置;
cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.5.6\sbin --切換到RabbitMQ Server的sbin目錄下,然后執行如下命令:
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
安裝並設置OK后,可以通過:rabbitmqctl status查看運行情況、rabbitmqctl list_users查看當前用戶、以下命令增加一個新用戶:
rabbitmqctl add_user username password
rabbitmqctl set_permissions username ".*" ".*" ".*"
rabbitmqctl set_user_tags username administrator
修改密碼:rabbitmqctl change_password username newpassowrd
刪除指定的用戶:rabbitmqctl delete_user username
列出所有queue:rabbitmqctl list_queues
列出指定queue的信息:rabbitmqctl list_queues [the queue name] messages_ready messages_unacknowledged
列出所有exchange:rabbitmqctl list_exchanges
列出所有binding:rabbitmqctl list_bindings
安裝基於web的管理插件:rabbitmq-plugins.bat enable rabbitmq_management
當然還有其它的命令,大家可以去查看官網及其它資料,但我認為知道以上的命令足夠用了
四、RabbitMQ的基本用法
使用RabbitMQ客戶端就必需在項目中引用其相關的組件,這里可以通過NuGet安裝或從官網下載再引用均可,方法很簡單,不再重述;
1.普通用法:采用默認的exchange(交換機,或稱路由器)+默認的exchange類型:direct+noAck(自動應答,接收就應答)
/// <summary> /// 消息發送者,一般用在客戶端 /// </summary> class RabbitMQPublish { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel()) //創建一個通道 { channel.QueueDeclare("hello", false, false, false, null);//創建一個隊列 string message = ""; while (message!="exit") { Console.Write("Please enter the message to be sent:"); message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", "hello", null, body); //發送消息 Console.WriteLine("set message: {0}", message); } } } } } /// <summary> /// 消費者,一般用在服務端 /// </summary> class RabbitMQConsume { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel())//創建一個通道 { channel.QueueDeclare("hello", false, false, false, null);//創建一個隊列 var consumer = new QueueingBasicConsumer(channel);//創建一個消費者 channel.BasicConsume("hello", true, consumer);//開啟消息者與通道、隊列關聯 Console.WriteLine(" waiting for message."); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息並出列 var body = ea.Body;//消息主體 var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); if (message == "exit") { Console.WriteLine("exit!"); break; } } } } } }
2.負載均衡處理模式:采用默認的exchange(交換機)+智能分發+默認的exchange類型:direct+手動應答
消息生產者/發布者代碼與上面相同;
以下是消費者代碼:
/// <summary> /// 消費者,一般用在服務端 /// </summary> class RabbitMQConsume { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel())//創建一個通道 { channel.QueueDeclare("hello", false, false, false, null);//創建一個隊列 channel.BasicQos(0, 1, false);//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。 var consumer = new QueueingBasicConsumer(channel);//創建一個消費者 channel.BasicConsume("hello", false, consumer);//開啟消息者與通道、隊列關聯 Console.WriteLine(" waiting for message."); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息並出列 var body = ea.Body;//消息主體 var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); channel.BasicAck(ea.DeliveryTag, false); if (message == "exit") { Console.WriteLine("exit!"); break; } Thread.Sleep(1000); } } } } }
3.消息持久化模式:在2的基礎上加上持久化,這樣即使生產者或消費者或服務端斷開,消息均不會丟失
/// <summary> /// 消息發送者,一般用在客戶端 /// </summary> class RabbitMQPublish { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel()) //創建一個通道 { channel.QueueDeclare("hello", true, false, false, null);//創建一個隊列,第2個參數為true表示為持久隊列 var properties = channel.CreateBasicProperties(); //properties.SetPersistent(true);這個方法提示過時,不建議使用 properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 string message = ""; while (message!="exit") { Console.Write("Please enter the message to be sent:"); message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", "hello", properties, body); //發送消息 Console.WriteLine("set message: {0}", message); } } } } } /// <summary> /// 消費者,一般用在服務端 /// </summary> class RabbitMQConsume { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel())//創建一個通道 { channel.QueueDeclare("hello", true, false, false, null);//創建一個隊列,第2個參數為true表示為持久隊列 channel.BasicQos(0, 1, false);//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。 var consumer = new QueueingBasicConsumer(channel);//創建一個消費者 channel.BasicConsume("hello", false, consumer);//開啟消息者與通道、隊列關聯 Console.WriteLine(" waiting for message."); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息並出列 var body = ea.Body;//消息主體 var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); channel.BasicAck(ea.DeliveryTag, false); if (message == "exit") { Console.WriteLine("exit!"); break; } Thread.Sleep(1000); } } } } }
4.廣播訂閱模式:定義一個交換機,其類型設為廣播類型,發送消息時指定這個交換機,消費者的消息隊列綁定到該交換機實現消息的訂閱,訂閱后則可接收消息,未訂閱則無法收到消息
/// <summary> /// 消息發送者/生產者,一般用在客戶端 /// </summary> class RabbitMQPublish { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel()) //創建一個通道 { channel.ExchangeDeclare("publish", "fanout",true);//定義一個交換機,且采用廣播類型,並設為持久化 string queueName = channel.QueueDeclare("hello", true, false, false, null);//創建一個隊列,第2個參數為true表示為持久隊列,這里將結果隱式轉換成string var properties = channel.CreateBasicProperties(); //properties.SetPersistent(true);這個方法提示過時,不建議使用 properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 string message = ""; while (message!="exit") { Console.Write("Please enter the message to be sent:"); message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("publish", "hello", properties, body); //發送消息,這里指定了交換機名稱,且routeKey會被忽略 Console.WriteLine("set message: {0}", message); } } } } } /// <summary> /// 消費者,一般用在服務端 /// </summary> class RabbitMQConsume { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel())//創建一個通道 { channel.ExchangeDeclare("publish", "fanout", true);//定義一個交換機,且采用廣播類型,並持久化該交換機,並設為持久化 string queueName = channel.QueueDeclare("hello", true, false, false, null);//創建一個隊列,第2個參數為true表示為持久隊列 channel.QueueBind(queueName, "publish", "");//將隊列綁定到名publish的交換機上,實現消息訂閱 channel.BasicQos(0, 1, false);//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。 var consumer = new QueueingBasicConsumer(channel);//創建一個消費者 channel.BasicConsume(queueName, false, consumer);//開啟消息者與通道、隊列關聯 Console.WriteLine(" waiting for message."); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息並出列 var body = ea.Body;//消息主體 var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); channel.BasicAck(ea.DeliveryTag, false);//應答 if (message == "exit") { Console.WriteLine("exit!"); break; } Thread.Sleep(1000); } } } } }
5.主題訂閱模式:定義一個交換機,其類型設為主題訂閱類型,發送消息時指定這個交換機及RoutingKey,消費者的消息隊列綁定到該交換機並匹配到RoutingKey實現消息的訂閱,訂閱后則可接收消息,未訂閱則無法收到消息
/// <summary> /// 消息發送者/生產者,一般用在客戶端 /// </summary> class RabbitMQPublish { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel()) //創建一個通道 { channel.ExchangeDeclare("publish-topic", "topic", true);//定義一個交換機,且采用廣播類型,並持久化該交換機 channel.QueueDeclare("hello-mq", true, false, false, null);//創建一個隊列,第2個參數為true表示為持久隊列 var properties = channel.CreateBasicProperties(); //properties.SetPersistent(true);這個方法提示過時,不建議使用 properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 string message = ""; while (message!="exit") { Console.Write("Please enter the message to be sent:"); message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("publish-topic", "hello.test", properties, body); //發送消息,這里指定了交換機名稱,且routeKey會被忽略 Console.WriteLine("set message: {0}", message); } } } } } /// <summary> /// 消費者,一般用在服務端 /// </summary> class RabbitMQConsume { static void Main(string[] args) { var factory = new ConnectionFactory();//創建連接工廠並初始連接 factory.HostName = "localhost"; factory.UserName = "zwj"; factory.Password = "www.zuowenjun.cn"; using (var connection = factory.CreateConnection())//創建一個連接 { using (var channel = connection.CreateModel())//創建一個通道 { channel.ExchangeDeclare("publish-topic", "topic",true);//定義一個交換機,且采用廣播類型,並持久化該交換機 string queueName = channel.QueueDeclare("hello-mq", true, false, false, null);//創建一個隊列,第2個參數為true表示為持久隊列 channel.QueueBind(queueName, "publish-topic", "*.test");//將隊列綁定到路由上,實現消息訂閱 channel.BasicQos(0, 1, false);//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。 var consumer = new QueueingBasicConsumer(channel);//創建一個消費者 channel.BasicConsume(queueName, false, consumer);//開啟消息者與通道、隊列關聯 Console.WriteLine(" waiting for message."); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息並出列 var body = ea.Body;//消息主體 var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received {0}", message); channel.BasicAck(ea.DeliveryTag, false);//應答 if (message == "exit") { Console.WriteLine("exit!"); break; } Thread.Sleep(1000); } } } } }
交換機路由類型如下:
Direct Exchange:直接匹配,通過Exchange名稱+RoutingKey來發送與接收消息;
Fanout Exchange:廣播訂閱,向所有消費者發布消息,但只有消費者將隊列綁定到該路由才能收到消息,忽略RoutingKey;
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個詞,只有消費者將隊列綁定到該路由且指定的RoutingKey符合匹配規則時才能收到消息;
Headers Exchange:消息頭訂閱,消息發布前,為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息時同樣需要定義類似的鍵值對請求頭,里面需要多包含一個匹配模式(有:x-mactch=all,或者x-mactch=any),只有請求頭與消息頭相匹配,才能接收到消息,忽略RoutingKey;
本文內容參考了以下文章: