RabbitMQ是一個消息代理,基本思路很簡單:接收和推送消息。你可以把它當做是郵局,當你把信件放進郵箱里時,你可以非常的確信郵遞員一定會把信件交給收件人。RabbitMQ就是一個郵箱,郵局以及快遞員。
RabbitMQ就是一個郵箱,郵局以及快遞員。
和郵局的不同的是,RabbitMQ處理的不是信紙,而是接收,存儲,轉發二進制的消息數據。
以下是RabbitMQ中的一些術語:
- 生產(Producing)就是發送。發送消息的程序就是生產者(Producer)。生產者用“P”代表,如下圖
- 隊列(queue)是信箱在RabbitMQ中的名稱。雖然消息是RabbitMQ和應用程序之間進行傳遞,但它們也可以被存儲在隊列中。隊列的大小沒有任何限制,你可以存儲任意多的消息進隊列到無限緩存當中。可以多個生產者發送消息到同一個隊列,也可以多個消費者從同一個隊列接收消息。隊列表示如下圖
- 消費(Consuming)的意思是接收。等待接收消息的程序稱為消費者(Consumer),用一個“C“代表,如下圖
注意:生產者,消費者,代理一般放在不同的服務器上。
Hello World
這部分實現兩個程序,生產者發送一個簡單的消息,然后接收者接收消息並顯示在屏幕上,發送的消息內容是”Hello world”。
如下圖示,”P”是生產者,”C”是消費者。中間的長方體是隊列,用作RabbitMQ的消息緩存,代消費者使用。
可以通過Nuget安裝RabbitMQ的客戶端程序包
發送
命名消息發送器為Sende.cs,消息接收器為Receive.cs。發送器連接RabbitMQ,發送一個消息,然后退出。
在Send.cs中創建連接,代碼如下
using System; using RabbitMQ.Client; using System.Text; class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { ... } } } }
這個連接封裝了Socket連接,提供協議版本制定和認證等。這里我們連接本機上的代理,如果要連接其它服務器,只要指定機器名或IP地址。
接下來是創建信道,大部分的API都已經封裝。
為了發送消息,需要定義一個隊列,然后發送消息到這個隊列。
using System; using RabbitMQ.Client; using System.Text; class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
定義的隊列如果不存的話就會創建。消息的內容是字節數組,所以你可以編碼任何的你想發送的消息。
上邊的代碼運行結束后,信道和連接會被釋放。
接收
接收器接收RabbitMQ推送過來的消息,所以不像發送器只發送單一的消息,接收器需要監聽消息並顯示。
Receive.cs中的代碼和發送者類似,建立一個連接和信道,並定義要使用的隊列。注意隊列名要和發送器里的隊列一樣。
class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); ... } } } }
注意這里我們定義了隊列。因為可能在發送器之前先啟動接收器,我們需要確保需要使用的隊列已經存在。
我們要告訴服務器從隊列里推送消息,因為消息是民步發送的,所以我們需要提供一個回調事件EventingBasicConsumer,用於處理接收到的消息。
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
運行
建立兩個控制台項目Send和Receive,分別運行以上代碼。
接收器通過RabbitMQ接收來自發送器的消息並顯示,接收器會一直運行等待消息。
如果要檢查隊列,可以使用命令 rabbitmqctl list_queues