RabbitMQ簡介
消息 (Message) 是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串、 JSON 等,也可以很復雜,比如內嵌對象。
消息隊列中間件 (Message Queue Middleware,簡稱為 MQ) 是指利用高效可靠的消息傳遞機制進行與平台無關的數據交流,並基於數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。
消息隊列中間件,也可以稱為消息隊列或者消息中間件。它一般有兩種傳遞模式:點對點 (P2P, Point-to-Point) 模式和發布/訂閱 (Pub/Sub) 模式。點對點模式是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸成為可能。 發布訂閱模式定義了如何向一個內容節點發布和訂閱消息,這個內容節點稱為主題 (topic),主題可以認為是消息傳遞的中介,消息發布者將消息發布到某個主題,而消息訂閱者則從主題中訂閱消息。主題使得消息的訂閱者與消息的發布者互相保持獨立,不需要進行接觸即可保證消息的傳遞,發布/訂閱模式在消息的一對多廣播時采用 。
RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。
關於RabbitMQ的原理及詳細介紹百度上很多,這里就不做復制粘貼了。
生產者和消費者
在使用RabitMQ之前,先對幾個概念做一下說明:
生產者(Producer):
生產者就是投遞消息的一方。生產者創建消息,然后發布到 RabbitMQ 中。消息一般可以包含2個部分:消息體和標簽 (Label)。消息體也可以稱之為 payload,在實際應用中,消息體一般是一個帶有業務邏輯結構的數據,比如一個JSON字符串。當然可以進一步對這個消息體進行序列化操作。消息的標簽用來表述這條消息 ,比如一個交換器的名稱和一個路由鍵。生產者把消息交由RabbitMQ,RabbitMQ之后會根據標簽把消息發送給感興趣的消費者 (Consumer)。
消息中間件的服務節點(Broker)
對於RabbitMQ來說,一個RabbitMQ Broker可以簡單地看作一個RabbitMQ服務節點 ,或者RabbitMQ服務實例 。大多數情況下也可以將一個RabbitMQ Broker看作一台RabbitMQ服務器。
首先生產者將業務方數據進行可能的包裝, 之后封裝成消息, 發送 (AMQP 協議里這個動 作對應的命令為 Basic . Publish) 到 Broker 中 。 消費者訂閱並接收消息 (AMQP 協議里這個動作對應的命令為 Basic.Consurne 或者 Basic. Get),經過可能的解包處理得到原始的數據, 之后再進行業務處理邏輯。這個業務處理邏輯並不一定需要和接收消息的邏輯使用同一個線程。 消費者進程可以使用一個線程去接收消息,存入到內存中。業務處理邏輯使用另一個線程從內存中讀取數據,這樣可以將應用進一步解稿,提高整個應用的處理效率。
隊列(Queue)
隊列,是RabbitMQ的內部對象,用於存儲消息。RabbitMQ中消息都只能存儲在隊列中,RabbitMQ的生產者生產消息井最終技遞到隊列中,消費者可以從隊列中獲取消息並消費。多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤(Round-Robin,即輪詢) 給多個消費者進行處理,而不是每個消費者都收到所有的消息井處理。
消費者(Consumer)
消費者,就是接收消息的一方。消費者連接到RabbitMQ服務器,並訂閱到隊列上。當消費者消費一條消息時,只是消費消息的消息體 (payload)。在消息路由的過程中,消息的標簽會丟棄,存入到隊列中的消息只 有消息體,消費者也只會消費到消體,也就不知道消息的生產者是誰,當然消費者也不需要知道。
項目中簡單集成MQ點對點模式
首先先安裝好RabbitMQ,可以參考我之前寫的教程 https://www.cnblogs.com/yindi0712/p/13447814.html
1.這里我創建了一個Mvc項目作為生產者端,一個控制台作為消費者端
2.兩個項目分別引用RabbitMQ.Client.dll,可以在官網下載:
下載地址:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/
也可以直接從Nuget中安裝,二者效果是一樣的
我這邊選擇用Nuget安裝。
3.簡單集成,實現消息生產和消費
生產者端
我這邊創建了個簡單的mvc項目,引用RabbitMQ.Client.dll,在HomeController創建一個名為Send的Action
消費者端
我這里創建了一個控制台應用,同樣引用RabbitMQ.Client.dll,消費的邏輯直接寫在Main方法里了
總結:
1.首先不管是消費者還是生產者都要先創建一個連接工廠ConnectionFactory配置連接的基本信息;
2.通過ConnectionFactory.CreateConnection創建一個IConnection對象表示創建一個AMQP 0-9-1連接;
3.IConnection.CreateModel創建一個AMQP 0-9-1頻道,該對象提供了大部分 的操作(方法)協議;
4.使用IModel.QueueDeclare聲明一個消息隊列,生產者和消費者的隊列名稱要一致;
5.生產者端:使用IModel.BasicPublish發送消息到消息隊列;
6.消費者端:創建一個消費者事件對象EventingBasicConsumer;使用IModel.BasicConsume將該消費者啟動,在消息接收觸發事件EventingBasicConsumer.Received做后續操作。
划重點:Connection 可以用來創建多個 Channel 實例,但是 Channel 實例不能在線程問共享, 應用程序應該為每一個線程開辟一個 Channel。某些情況下 Channel 的操作可以並發運行,但 是在其他情況下會導致在網絡上出現錯誤的通信幀交錯,同時也會影響友送方確認( publisher confrrm)機制的運行,所以多線程問共享 Channel 實例是非線程安全的。
至此一個簡單的消費者和生產者端就搭建好了,下面我們運行起來看一下效果:
上面是我本地的配置信息,下面我創建一個getMessage隊列並發送一條123的消息。
我們直接進管理界面看吧,就不敲命令了,如果本地裝了管理插件直接訪問http://127.0.0.1:15672/,我們先進到Queues,我們可以看到剛剛創建的getMessage隊列
然后進入到隊列內部,去點一下GetMessages按鈕:
我們可以看到剛才發送的123已經在隊列中了,現在我們運行消費者端
我們可以看到這個消息已經輸出出來了,這時候再去點擊GetMessages,這時候就會提示Queue is empty,表示該消息已經消費了。可以把兩個項目同時運行,多發幾條消息基本上是實時的。
發布訂閱模式
交換器(Exchange)
我們暫時可以理解成生產者將消息投遞到隊列中。真實情況是,生產者將消息發送到Exchange(交換器,通常也可以用大寫的"X"來表示),由交換器將消息路由到一個或者多個隊列中。如果路由不到,或許會返回給生產者,或許直接丟棄。這里可以將RabbitMQ中的交換器看作一個簡單的實體。
交換器有四種類型fanout、direct、topic、headers。我這邊使用了fanout,它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。
路由鍵(RoutingKey):
生產者將消息發給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則,而這個RoutingKey需要與交換器類型和綁定鍵(BindingKey)聯合使用才能最終生效。在交換器類型和綁定鍵(BindingKey)固定的情況下,生產者可以在發送消息給交換器時,通過指定RoutingKey來決定消息流向哪里。
綁定(Binding)
RabbitMQ中通過綁定將交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵(BindingKey),這樣RabbitMQ就知道如何正確地將消息路由到隊列了。
生產者將消息發送給交換器時,需要一個RoutingKey,當BindingKey和RoutingKey相匹配時,消息會被路由到對應的隊列中。在綁定多個隊列到同一個交換器的時候,這些綁定允許使用相同的BindingKey。 BindingKey並不是在所有的情況下都生效,它依賴於交換器類型,比如fanout類型的交換器就會無視BindingKey,而是將消息路由到所有綁定到該交換器的隊列中。
上代碼,這次我創建了一個生產者兩個消費者。
生產者:
private void PublishSend(SendModel model) { //實例化一個連接工廠和其配置為使用所需的主機,虛擬主機和證書(證書) ConnectionFactory factory = new ConnectionFactory(); factory.HostName = model.Url;//RabbitMQ主機服務地址 factory.UserName = model.UserName;//用戶名 factory.Password = model.Password;//密碼 //創建一個AMQP 0-9-1連接 using (IConnection connection = factory.CreateConnection()) { //創建一個AMQP 0-9-1頻道,該對象提供了大部分 的操作(方法)協議。 using (IModel channel = connection.CreateModel()) { //創建兩個隊列 channel.QueueDeclare("PublishSubscrib01", true, false, false, null); channel.QueueDeclare("PublishSubscrib02", true, false, false, null); //聲明一個交換器 channel.ExchangeDeclare("PublishSubscribExchange", ExchangeType.Fanout, true, false, null); //將交換器與隊列綁定 channel.QueueBind("PublishSubscrib01", "PublishSubscribExchange", "RoutingKey", null); channel.QueueBind("PublishSubscrib02", "PublishSubscribExchange", "RoutingKey", null); string message = model.Content; byte[] body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("PublishSubscribExchange", "RoutingKey", null,body); } } }
消費者1
private static void Subcribe() { //實例化一個連接工廠和其配置為使用所需的主機,虛擬主機和證書(證書) ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1";//RabbitMQ主機服務地址 factory.UserName = "test";//用戶名 factory.Password = "testpwd";//密碼 //創建一個AMQP 0-9-1連接 using (IConnection connection = factory.CreateConnection()) { //創建一個AMQP 0-9-1頻道,該對象提供了大部分 的操作(方法)協議。 using (IModel channel = connection.CreateModel()) { #region 定義隊列、交換器並綁定 channel.QueueDeclare("PublishSubscrib01", true, false, false, null); channel.ExchangeDeclare("PublishSubscribExchange", ExchangeType.Fanout, true, false, null); channel.QueueBind("PublishSubscrib01", "PublishSubscribExchange", "RoutingKey", null); #endregion Console.WriteLine("訂閱者01 已經准備就緒~~"); //消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"訂閱者01收到消息:{message} ~"); }; //自動ack消費 channel.BasicConsume("PublishSubscrib01", true,consumer); Console.ReadLine(); } } }
消費者2就不放了,跟1是一樣的,復制出來把1換成2就好了(實際生產環境業務邏輯可能不同)。
下面我們同時啟動三個項目發送一條消息看看
我們可以看到兩個消費者端同時都接收到了消息,下面我們去RabbitMQ管理界面去看一下。
首先,我們看到Exchange菜單下出現了我們剛剛新增的交換器
點擊進去可以看到交換器和我們聲明的兩個隊列的綁定
說明我們這次RabbitMQ發布/訂閱的部署時成功的。
————————————————————————————————————————————————————
參考書籍:RabbitMQ實戰指南。