RabbitMQ一個優秀的.NET消息隊列框架


1 簡介

RabbitMQ有成千上萬的用戶,是最受歡迎的開源消息代理之一。

1.1 AMQP是什么

AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware broker)之間進行通信。

1.2 消息隊列是什么

MQ 全稱為Message Queue, 消息隊列。是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。

2 安裝

通過docker進行安裝

首先,進入RabbitMQ官網 http://www.rabbitmq.com/download.html

然后,找到 Docker image 並進入
找到你需要安裝的版本, -management 表示有管理界面的,可以瀏覽器訪問。

接着,接來下docker安裝,我這里裝的 3-management:

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

最后,瀏覽器訪問看下:http://localhost:15672/ ,用戶名/密碼: guest/guest

3 使用

3.1 “ Hello World!”

RabbitMQ是消息代理:它接受並轉發消息。您可以將其視為郵局:將您要發布的郵件放在郵箱中時,可以確保郵遞員先生或女士最終將郵件傳遞給收件人。
在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。中間的框是一個隊列

生產者代碼:

using RabbitMQ.Client; //1. 使用名稱空間
using System;
using System.Text;

namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection()) //2. 創建到服務器的連接
            using (var channel = connection.CreateModel()) //3. 創建一個通道
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. 聲明要發送到的隊列

                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. 將消息發布到隊列

                Console.WriteLine(" 發送消息:{0}", message);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

消費者代碼:使用命名空間,創建服務器連接,創建通道,聲明隊列都與生產者代碼一致,增加了將隊列中的消息傳遞給我們。由於它將異步地向我們發送消息,因此我們提供了回調。這就是EventingBasicConsumer.Received事件處理程序所做的。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null);

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 接收消息:{0}", message);
                };
                channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

讓我們來看看輸出結果:

發送端:

 發送消息:Hello World!
 Press [enter] to exit.

接收端:

 等待消息。
 Press [enter] to exit.
 接收消息:Hello World!

3.2 工作隊列

工作隊列(又稱任務隊列)的主要思想是避免立即執行資源密集型任務,然后必須等待其完成。相反,我們安排任務在以后完成。我們將任務封裝為消息並將其發送到隊列。工作進行在后台運行並不斷的從隊列中取出任務然后執行。當你運行了多個工作進程時,任務隊列中的任務將會被工作進程共享執行。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Text;

namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);

                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;

                channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body);
                Console.WriteLine(" 發送消息:{0}", message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            return args.Length > 0 ? string.Join(" ", args) : "Hello World!";
        }
    }
}

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    byte[] body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 接收消息:{0}", message);

                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);

                    Console.WriteLine(" 接收完成");

                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

循環調度

使用任務隊列的好處是能夠很容易的並行工作。如果我們積壓了很多工作,我們僅僅通過增加更多的工作者就可以解決問題,使系統的伸縮性更加容易。

讓我們來看看輸出結果:

發送端:

\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息1
 發送消息:消息1
 Press [enter] to exit.


\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息2
 發送消息:消息2
 Press [enter] to exit.


\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息3
 發送消息:消息3
 Press [enter] to exit.


\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息4
 發送消息:消息4
 Press [enter] to exit.

接收端1:

 等待消息。
 Press [enter] to exit.
 接收消息:消息1
 接收完成
 接收消息:消息3
 接收完成

接收端2:

 等待消息。
 Press [enter] to exit.
 接收消息:消息2
 接收完成
 接收消息:消息4
 接收完成

默認情況下,RabbitMQ將按順序將每個消息發送給下一個使用者。平均而言,每個消費者都會收到相同數量的消息。這種分發消息的方式稱為循環。與三個或更多的工人一起嘗試。

消息確認

為了確保消息永不丟失,RabbitMQ支持消息確認。消費者發送回一個確認(acknowledgement),以告知RabbitMQ已經接收,處理了特定的消息,並且RabbitMQ可以自由刪除它。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

使用此代碼,我們可以確保,即使您在處理消息時使用CTRL + C殺死工作人員,也不會丟失任何信息。工人死亡后不久,所有未確認的消息將重新發送。

消息持久性

我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是,如果RabbitMQ服務器停止,我們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,除非您告知不要這樣做,否則它將忘記隊列和消息。要確保消息不會丟失,需要做兩件事:我們需要將隊列和消息都標記為持久。

首先,我們需要確保該隊列將在RabbitMQ節點重啟后繼續存在。為此,我們需要將其聲明為持久的:

channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

最后,我們需要將消息標記為持久性-通過將IBasicProperties.SetPersistent設置為true。

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

公平派遣

我們可以將BasicQos方法與 prefetchCount = 1設置一起使用。這告訴RabbitMQ一次不要給工人一個以上的消息。換句話說,在處理並確認上一條消息之前,不要將新消息發送給工作人員。而是將其分派給不忙的下一個工作程序。

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

3.3 發布/訂閱

在上一個教程中,我們創建了一個工作隊列。工作隊列背后的假設是,每個任務都恰好交付給一個工人。在這一部分中,我們將做一些完全不同的事情-我們將消息傳達給多個消費者。這種模式稱為“發布/訂閱”。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Text;

namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body);
                Console.WriteLine(" 發送消息:{0}", message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
        }
    }
}

生產者代碼與上一教程看起來沒有太大不同。最重要的變化是我們現在希望將消息發布到 Example.RabbitMQ.PublishSubscribe 交換器,而不是無名的消息交換器。交換類型有以下幾種:direct,topic,headers 和fanout,在這里我們采用fanout交換類型。

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);

                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "");

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    byte[] body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 接收消息:{0}", message);
                };
                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

如果沒有隊列綁定到交換機,則消息將丟失,但這對我們來說是可以的。如果沒有消費者在聽,我們可以安全地丟棄該消息。

3.4 路由

在上一個教程中,我們創建了一個發布/訂閱。我們能夠向許多接收者廣播消息。在本教程中,我們將向其中添加功能-將消息分類指定給具體的訂閱者。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;

namespace Example.RabbitMQ.Routing.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");

                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body);
                Console.WriteLine(" 發送消息:'{0}':'{1}'", severity, message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
                var queueName = channel.QueueDeclare().QueueName;

                if (args.Length < 1)
                {
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }

                foreach (var severity in args)
                {
                    channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity);
                }

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message);
                };
                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

3.5 話題

在上一個教程中,我們改進了消息系統。代替使用僅能進行虛擬廣播的扇出交換機,我們使用直接交換機,並有選擇地接收消息的可能性。

盡管使用直接交換對我們的系統進行了改進,但它仍然存在局限性-它無法基於多個條件進行路由。

*(星號)可以代替一個單詞。
#(哈希)可以替代零個或多個單詞。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;

namespace Example.RabbitMQ.Topics.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");

                var routingKey = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body);
                Console.WriteLine(" 發送消息:'{0}':'{1}'", routingKey, message);
            }
        }
    }
}

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
                var queueName = channel.QueueDeclare().QueueName;

                if (args.Length < 1)
                {
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }

                foreach (var bindingKey in args)
                {
                    channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey);
                }

                Console.WriteLine(" 等待消息。 To exit press CTRL+C");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message);
                };
                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM