總結消息隊列RabbitMQ的基本用法


一、RabbitMQ是什么?

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

二、消息隊列的特性

解耦:消息的生產者與消費者均基於AMQP協議(相同的接口與規范)進行發送與接收消息,互相不存依賴;

冗余:消息只有處理了才會被刪除,除非明確允許多個消費者可以收到同一消息的多個副本,否則每個消息只會被單個消費者接收並處理;

擴展性:可增加或減少多個消息的生產者與消費者,兩者的改動均不會影響到雙方;

靈活性 & 峰值處理能力:因為有良好的擴展性,所以可視服務器的處理情況【可稱為:消費者】(比如:高並發負載過大)動態的增減服務器,以提提高處理能力(可稱為:負載均衡);

可恢復性:消息的生產者與消費者不論哪一方出現問題,均不會影響消息的正常發出與接收(當然單一的生產者與消費者除外,如果是這樣也就沒有必要使用分布式消息隊列);

送達保證:只有消息被確認成功處理后才會被刪除,否則會重新分發給其它的消費者進行處理,直到確認處理成功為止;

排序保證:先進先出是隊列的基本特性;

緩沖:同一時間有多個消息進入消息隊列,但是同一時間可以指定一個多個消息被消息者接收並處理,其余的消息處理等待狀態,這樣可以降低服務器的壓力,起到緩沖的作用;

理解數據流:傳遞的消息內容以字節數組為主,但可以將對象序列化后成字節數組,然后在消費者接收到消息后,可反序列化成對象並進行相關的處理,應用場景:CQRS;

異步通信:允許將一個或多個消息放入消息隊列,但並不立即處理它,而是在恰當的時候再去由一個或多個消費者分別接收並處理它們;

以上是我的個人理解,也可參看《使用消息隊列的 10 個理由

應用場景:針對高並發且無需立即返回處理結果的時候,可以考慮使用消息隊列,如果處理需要立即返回結果則不適合;

三、RabbitMQ環境的安裝

1.服務器端:

A.需要先安裝Erlang環境,下載地址:http://www.erlang.org/download.html,可能有時無法正常訪問,可以通過VPN代理來訪問該網站或在其它網站上下載(比如:CSDN)

B.安裝RabbitMQ Server(有針對多個操作系統的下載,我這邊以WINDOWS平台為主),下載地址:http://www.rabbitmq.com/download.html

說明:最新版的Erlang及abbitMQ Server安裝后,一般WINDOWS環境變量及服務均都已正常安裝與並正常啟動,可不是最新版或沒有安裝好,則可執行以下命令:

Setx ERLANG_HOME “C:\Program Files\erl7.1″ -Erlang的-安裝目錄,也可通過系統屬性-->高級-->環境變量來手動設置;

cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.5.6\sbin --切換到RabbitMQ Server的sbin目錄下,然后執行如下命令:

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

安裝並設置OK后,可以通過:rabbitmqctl status查看運行情況、rabbitmqctl list_users查看當前用戶、以下命令增加一個新用戶:

rabbitmqctl add_user username password
rabbitmqctl set_permissions username ".*" ".*" ".*"
rabbitmqctl set_user_tags username administrator

修改密碼:rabbitmqctl change_password username newpassowrd

刪除指定的用戶:rabbitmqctl delete_user username 

列出所有queue:rabbitmqctl list_queues

列出指定queue的信息:rabbitmqctl list_queues [the queue name] messages_ready messages_unacknowledged

列出所有exchange:rabbitmqctl list_exchanges

列出所有binding:rabbitmqctl list_bindings

安裝基於web的管理插件:rabbitmq-plugins.bat enable rabbitmq_management

當然還有其它的命令,大家可以去查看官網及其它資料,但我認為知道以上的命令足夠用了

四、RabbitMQ的基本用法

使用RabbitMQ客戶端就必需在項目中引用其相關的組件,這里可以通過NuGet安裝或從官網下載再引用均可,方法很簡單,不再重述;

1.普通用法:采用默認的exchange(交換機,或稱路由器)+默認的exchange類型:direct+noAck(自動應答,接收就應答)

/// <summary>

/// 消息發送者,一般用在客戶端
/// </summary>
class  RabbitMQPublish
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel())  //創建一個通道
             {
                 channel.QueueDeclare( "hello" false false false null ); //創建一個隊列
 
                 string  message =  "" ;
                 while  (message!= "exit" )
                 {
                     Console.Write( "Please enter the message to be sent:" );
                     message = Console.ReadLine();
                     var  body = Encoding.UTF8.GetBytes(message);
                     channel.BasicPublish( "" "hello" null , body);  //發送消息
                     Console.WriteLine( "set message: {0}" , message);
                 }
             }
         }
     }
}
 
 
 
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class  RabbitMQConsume
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel()) //創建一個通道
             {
                 channel.QueueDeclare( "hello" false false false null ); //創建一個隊列
 
                 var  consumer =  new  QueueingBasicConsumer(channel); //創建一個消費者
                 channel.BasicConsume( "hello" true , consumer); //開啟消息者與通道、隊列關聯
 
                 Console.WriteLine( " waiting for message." );
                 while  ( true )
                 {
                     var  ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //接收消息並出列
 
                     var  body = ea.Body; //消息主體
                     var  message = Encoding.UTF8.GetString(body);
                     Console.WriteLine( "Received {0}" , message);
                     if  (message ==  "exit" )
                     {
                         Console.WriteLine( "exit!" );
                         break ;
                     }
 
                 }
             }
         }
         
     }
}

2.負載均衡處理模式:采用默認的exchange(交換機)+智能分發+默認的exchange類型:direct+手動應答

消息生產者/發布者代碼與上面相同;

以下是消費者代碼:

/// <summary>

/// 消費者,一般用在服務端
/// </summary>
class  RabbitMQConsume
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel()) //創建一個通道
             {
                 channel.QueueDeclare( "hello" false false false null ); //創建一個隊列
                 channel.BasicQos(0, 1,  false ); //在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
 
                 var  consumer =  new  QueueingBasicConsumer(channel); //創建一個消費者
                 channel.BasicConsume( "hello" false , consumer); //開啟消息者與通道、隊列關聯
 
                 Console.WriteLine( " waiting for message." );
                 while  ( true )
                 {
                     var  ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //接收消息並出列
 
                     var  body = ea.Body; //消息主體
                     var  message = Encoding.UTF8.GetString(body);
                     Console.WriteLine( "Received {0}" , message);
                     channel.BasicAck(ea.DeliveryTag,  false );
                     if  (message ==  "exit" )
                     {
                         Console.WriteLine( "exit!" );
                         break ;
                     }
                     Thread.Sleep(1000);
                 }
             }
         }
         
     }
}
3.消息持久化模式:在2的基礎上加上持久化,這樣即使生產者或消費者或服務端斷開,消息均不會丟失
/// <summary>
/// 消息發送者,一般用在客戶端
/// </summary>
class  RabbitMQPublish
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel())  //創建一個通道
             {
                 channel.QueueDeclare( "hello" true false false null ); //創建一個隊列,第2個參數為true表示為持久隊列
                 var  properties = channel.CreateBasicProperties();
                 //properties.SetPersistent(true);這個方法提示過時,不建議使用
                 properties.DeliveryMode = 2; //1表示不持久,2.表示持久化
                 string  message =  "" ;
                 while  (message!= "exit" )
                 {
                     Console.Write( "Please enter the message to be sent:" );
                     message = Console.ReadLine();
                     var  body = Encoding.UTF8.GetBytes(message);
                     channel.BasicPublish( "" "hello" , properties, body);  //發送消息
                     Console.WriteLine( "set message: {0}" , message);
                 }
             }
         }
     }
}
 
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class  RabbitMQConsume
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel()) //創建一個通道
             {
                 channel.QueueDeclare( "hello" true false false null ); //創建一個隊列,第2個參數為true表示為持久隊列
                 channel.BasicQos(0, 1,  false ); //在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
 
                 var  consumer =  new  QueueingBasicConsumer(channel); //創建一個消費者
                 channel.BasicConsume( "hello" false , consumer); //開啟消息者與通道、隊列關聯
 
                 Console.WriteLine( " waiting for message." );
                 while  ( true )
                 {
                     var  ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //接收消息並出列
 
                     var  body = ea.Body; //消息主體
                     var  message = Encoding.UTF8.GetString(body);
                     Console.WriteLine( "Received {0}" , message);
                     channel.BasicAck(ea.DeliveryTag,  false );
                     if  (message ==  "exit" )
                     {
                         Console.WriteLine( "exit!" );
                         break ;
                     }
                     Thread.Sleep(1000);
                 }
             }
         }
         
     }
}
4.廣播訂閱模式:定義一個交換機,其類型設為廣播類型,發送消息時指定這個交換機,消費者的消息隊列綁定到該交換機實現消息的訂閱,訂閱后則可接收消息,未訂閱則無法收到消息
/// <summary>
/// 消息發送者/生產者,一般用在客戶端
/// </summary>
class  RabbitMQPublish
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel())  //創建一個通道
             {
                 channel.ExchangeDeclare( "publish" "fanout" , true ); //定義一個交換機,且采用廣播類型,並設為持久化
                 string  queueName = channel.QueueDeclare( "hello" true false false null ); //創建一個隊列,第2個參數為true表示為持久隊列,這里將結果隱式轉換成string
                 var  properties = channel.CreateBasicProperties();
                 //properties.SetPersistent(true);這個方法提示過時,不建議使用
                 properties.DeliveryMode = 2; //1表示不持久,2.表示持久化
                 string  message =  "" ;
                 while  (message!= "exit" )
                 {
                     Console.Write( "Please enter the message to be sent:" );
                     message = Console.ReadLine();
                     var  body = Encoding.UTF8.GetBytes(message);
                     channel.BasicPublish( "publish" "hello" , properties, body);  //發送消息,這里指定了交換機名稱,且routeKey會被忽略
                     Console.WriteLine( "set message: {0}" , message);
                 }
             }
         }
     }
}
 
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class  RabbitMQConsume
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel()) //創建一個通道
             {
                 channel.ExchangeDeclare( "publish" "fanout" true ); //定義一個交換機,且采用廣播類型,並持久化該交換機,並設為持久化
                 string  queueName = channel.QueueDeclare( "hello" true false false null ); //創建一個隊列,第2個參數為true表示為持久隊列
                 channel.QueueBind(queueName,  "publish" "" ); //將隊列綁定到名publish的交換機上,實現消息訂閱
                 channel.BasicQos(0, 1,  false ); //在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
 
                 var  consumer =  new  QueueingBasicConsumer(channel); //創建一個消費者
                 channel.BasicConsume(queueName,  false , consumer); //開啟消息者與通道、隊列關聯
 
                 Console.WriteLine( " waiting for message." );
                 while  ( true )
                 {
                     var  ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //接收消息並出列
 
                     var  body = ea.Body; //消息主體
                     var  message = Encoding.UTF8.GetString(body);
                     Console.WriteLine( "Received {0}" , message);
                     channel.BasicAck(ea.DeliveryTag,  false ); //應答
                     if  (message ==  "exit" )
                     {
                         Console.WriteLine( "exit!" );
                         break ;
                     }
                     Thread.Sleep(1000);
                 }
             }
         }
         
     }
}
5.主題訂閱模式:定義一個交換機,其類型設為主題訂閱類型,發送消息時指定這個交換機及RoutingKey,消費者的消息隊列綁定到該交換機並匹配到RoutingKey實現消息的訂閱,訂閱后則可接收消息,未訂閱則無法收到消息
/// <summary>
/// 消息發送者/生產者,一般用在客戶端
/// </summary>
class  RabbitMQPublish
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel())  //創建一個通道
             {
                 channel.ExchangeDeclare( "publish-topic" "topic" true ); //定義一個交換機,且采用廣播類型,並持久化該交換機
                channel.QueueDeclare( "hello-mq" true false false null ); //創建一個隊列,第2個參數為true表示為持久隊列
                 var  properties = channel.CreateBasicProperties();
                 //properties.SetPersistent(true);這個方法提示過時,不建議使用
                 properties.DeliveryMode = 2; //1表示不持久,2.表示持久化
                 string  message =  "" ;
                 while  (message!= "exit" )
                 {
                     Console.Write( "Please enter the message to be sent:" );
                     message = Console.ReadLine();
                     var  body = Encoding.UTF8.GetBytes(message);
                     channel.BasicPublish( "publish-topic" "hello.test" , properties, body);  //發送消息,這里指定了交換機名稱,且routeKey會被忽略
                     Console.WriteLine( "set message: {0}" , message);
                 }
             }
         }
     }
}
 
 
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class  RabbitMQConsume
{
     static  void  Main( string [] args)
     {
         var  factory =  new  ConnectionFactory(); //創建連接工廠並初始連接
         factory.HostName =  "localhost" ;
         factory.UserName =  "zwj" ;
         factory.Password =  "www.zuowenjun.cn" ;
 
         using  ( var  connection = factory.CreateConnection()) //創建一個連接
         {
             using  ( var  channel = connection.CreateModel()) //創建一個通道
             {
                 channel.ExchangeDeclare( "publish-topic" "topic" , true ); //定義一個交換機,且采用廣播類型,並持久化該交換機
                 string  queueName = channel.QueueDeclare( "hello-mq" true false false null ); //創建一個隊列,第2個參數為true表示為持久隊列
                 channel.QueueBind(queueName,  "publish-topic" "*.test" ); //將隊列綁定到路由上,實現消息訂閱
                 channel.BasicQos(0, 1,  false ); //在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
 
                 var  consumer =  new  QueueingBasicConsumer(channel); //創建一個消費者
                 channel.BasicConsume(queueName,  false , consumer); //開啟消息者與通道、隊列關聯
 
                 Console.WriteLine( " waiting for message." );
                 while  ( true )
                 {
                     var  ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //接收消息並出列
 
                     var  body = ea.Body; //消息主體
                     var  message = Encoding.UTF8.GetString(body);
                     Console.WriteLine( "Received {0}" , message);
                     channel.BasicAck(ea.DeliveryTag,  false ); //應答
                     if  (message ==  "exit" )
                     {
                         Console.WriteLine( "exit!" );
                         break ;
                     }
                     Thread.Sleep(1000);
                 }
             }
         }
         
     }
}

交換機路由類型如下:

Direct Exchange:直接匹配,通過Exchange名稱+RoutingKey來發送與接收消息;

Fanout Exchange:廣播訂閱,向所有消費者發布消息,但只有消費者將隊列綁定到該路由才能收到消息,忽略RoutingKey;

Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個詞,只有消費者將隊列綁定到該路由且指定的RoutingKey符合匹配規則時才能收到消息;

Headers Exchange:消息頭訂閱,消息發布前,為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息時同樣需要定義類似的鍵值對請求頭,里面需要多包含一個匹配模式(有:x-mactch=all,或者x-mactch=any),只有請求頭與消息頭相匹配,才能接收到消息,忽略RoutingKey;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM