RabbitMQ指南(C#)(一)Hello World


RabbitMQ是一個消息代理,基本思路很簡單:接收和推送消息。你可以把它當做是郵局,當你把信件放進郵箱里時,你可以非常的確信郵遞員一定會把信件交給收件人。RabbitMQ就是一個郵箱,郵局以及快遞員。

RabbitMQ就是一個郵箱,郵局以及快遞員。

和郵局的不同的是,RabbitMQ處理的不是信紙,而是接收,存儲,轉發二進制的消息數據。

 

以下是RabbitMQ中的一些術語:

  • 生產(Producing)就是發送。發送消息的程序就是生產者(Producer)。生產者用“P”代表,如下圖

  • 隊列(queue)是信箱在RabbitMQ中的名稱。雖然消息是RabbitMQ和應用程序之間進行傳遞,但它們也可以被存儲在隊列中。隊列的大小沒有任何限制,你可以存儲任意多的消息進隊列到無限緩存當中。可以多個生產者發送消息到同一個隊列,也可以多個消費者從同一個隊列接收消息。隊列表示如下圖

  • 消費(Consuming)的意思是接收。等待接收消息的程序稱為消費者(Consumer),用一個“C“代表,如下圖

 

   注意:生產者,消費者,代理一般放在不同的服務器上。

 

Hello World

這部分實現兩個程序,生產者發送一個簡單的消息,然后接收者接收消息並顯示在屏幕上,發送的消息內容是”Hello world”。

如下圖示,”P”是生產者,”C”是消費者。中間的長方體是隊列,用作RabbitMQ的消息緩存,代消費者使用。

可以通過Nuget安裝RabbitMQ的客戶端程序包

發送

 

命名消息發送器為Sende.cs,消息接收器為Receive.cs。發送器連接RabbitMQ,發送一個消息,然后退出。

在Send.cs中創建連接,代碼如下

using System;
using RabbitMQ.Client;
using System.Text;
class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                ...
            }
        }
    }
}

這個連接封裝了Socket連接,提供協議版本制定和認證等。這里我們連接本機上的代理,如果要連接其它服務器,只要指定機器名或IP地址。

接下來是創建信道,大部分的API都已經封裝。

為了發送消息,需要定義一個隊列,然后發送消息到這個隊列。

using System;
using RabbitMQ.Client;
using System.Text;

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

定義的隊列如果不存的話就會創建。消息的內容是字節數組,所以你可以編碼任何的你想發送的消息。

上邊的代碼運行結束后,信道和連接會被釋放。

接收

接收器接收RabbitMQ推送過來的消息,所以不像發送器只發送單一的消息,接收器需要監聽消息並顯示。

Receive.cs中的代碼和發送者類似,建立一個連接和信道,並定義要使用的隊列。注意隊列名要和發送器里的隊列一樣。

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                ...
            }
        }
    }
}

注意這里我們定義了隊列。因為可能在發送器之前先啟動接收器,我們需要確保需要使用的隊列已經存在。

我們要告訴服務器從隊列里推送消息,因為消息是民步發送的,所以我們需要提供一個回調事件EventingBasicConsumer,用於處理接收到的消息。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello",
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

運行

建立兩個控制台項目Send和Receive,分別運行以上代碼。

接收器通過RabbitMQ接收來自發送器的消息並顯示,接收器會一直運行等待消息。

如果要檢查隊列,可以使用命令 rabbitmqctl list_queues

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM