1 簡介
RabbitMQ有成千上萬的用戶,是最受歡迎的開源消息代理之一。
1.1 AMQP是什么
AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware broker)之間進行通信。
1.2 消息隊列是什么
MQ 全稱為Message Queue, 消息隊列。是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。
2 安裝
通過docker進行安裝
首先,進入RabbitMQ官網 http://www.rabbitmq.com/download.html
然后,找到 Docker image 並進入
找到你需要安裝的版本, -management 表示有管理界面的,可以瀏覽器訪問。
接着,接來下docker安裝,我這里裝的 3-management:
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
最后,瀏覽器訪問看下:http://localhost:15672/ ,用戶名/密碼: guest/guest
3 使用
3.1 “ Hello World!”
RabbitMQ是消息代理:它接受並轉發消息。您可以將其視為郵局:將您要發布的郵件放在郵箱中時,可以確保郵遞員先生或女士最終將郵件傳遞給收件人。
在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。中間的框是一個隊列
生產者代碼:
using RabbitMQ.Client; //1. 使用名稱空間
using System;
using System.Text;
namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection()) //2. 創建到服務器的連接
using (var channel = connection.CreateModel()) //3. 創建一個通道
{
channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. 聲明要發送到的隊列
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. 將消息發布到隊列
Console.WriteLine(" 發送消息:{0}", message);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
消費者代碼:使用命名空間,創建服務器連接,創建通道,聲明隊列都與生產者代碼一致,增加了將隊列中的消息傳遞給我們。由於它將異步地向我們發送消息,因此我們提供了回調。這就是EventingBasicConsumer.Received事件處理程序所做的。
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine(" 等待消息。");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" 接收消息:{0}", message);
};
channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
讓我們來看看輸出結果:
發送端:
發送消息:Hello World!
Press [enter] to exit.
接收端:
等待消息。
Press [enter] to exit.
接收消息:Hello World!
3.2 工作隊列
工作隊列(又稱任務隊列)的主要思想是避免立即執行資源密集型任務,然后必須等待其完成。相反,我們安排任務在以后完成。我們將任務封裝為消息並將其發送到隊列。工作進行在后台運行並不斷的從隊列中取出任務然后執行。當你運行了多個工作進程時,任務隊列中的任務將會被工作進程共享執行。
生產者代碼:
using RabbitMQ.Client;
using System;
using System.Text;
namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body);
Console.WriteLine(" 發送消息:{0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return args.Length > 0 ? string.Join(" ", args) : "Hello World!";
}
}
}
消費者代碼:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" 等待消息。");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" 接收消息:{0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" 接收完成");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
循環調度
使用任務隊列的好處是能夠很容易的並行工作。如果我們積壓了很多工作,我們僅僅通過增加更多的工作者就可以解決問題,使系統的伸縮性更加容易。
讓我們來看看輸出結果:
發送端:
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息1
發送消息:消息1
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息2
發送消息:消息2
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息3
發送消息:消息3
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息4
發送消息:消息4
Press [enter] to exit.
接收端1:
等待消息。
Press [enter] to exit.
接收消息:消息1
接收完成
接收消息:消息3
接收完成
接收端2:
等待消息。
Press [enter] to exit.
接收消息:消息2
接收完成
接收消息:消息4
接收完成
默認情況下,RabbitMQ將按順序將每個消息發送給下一個使用者。平均而言,每個消費者都會收到相同數量的消息。這種分發消息的方式稱為循環。與三個或更多的工人一起嘗試。
消息確認
為了確保消息永不丟失,RabbitMQ支持消息確認。消費者發送回一個確認(acknowledgement),以告知RabbitMQ已經接收,處理了特定的消息,並且RabbitMQ可以自由刪除它。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
使用此代碼,我們可以確保,即使您在處理消息時使用CTRL + C殺死工作人員,也不會丟失任何信息。工人死亡后不久,所有未確認的消息將重新發送。
消息持久性
我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是,如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,除非您告知不要這樣做,否則它將忘記隊列和消息。要確保消息不會丟失,需要做兩件事:我們需要將隊列和消息都標記為持久。
首先,我們需要確保該隊列將在RabbitMQ節點重啟后繼續存在。為此,我們需要將其聲明為持久的:
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
最后,我們需要將消息標記為持久性-通過將IBasicProperties.SetPersistent設置為true。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
公平派遣
我們可以將BasicQos方法與 prefetchCount = 1設置一起使用。這告訴RabbitMQ一次不要給工人一個以上的消息。換句話說,在處理並確認上一條消息之前,不要將新消息發送給工作人員。而是將其分派給不忙的下一個工作程序。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
3.3 發布/訂閱
在上一個教程中,我們創建了一個工作隊列。工作隊列背后的假設是,每個任務都恰好交付給一個工人。在這一部分中,我們將做一些完全不同的事情-我們將消息傳達給多個消費者。這種模式稱為“發布/訂閱”。
生產者代碼:
using RabbitMQ.Client;
using System;
using System.Text;
namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" 發送消息:{0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
}
}
}
生產者代碼與上一教程看起來沒有太大不同。最重要的變化是我們現在希望將消息發布到 Example.RabbitMQ.PublishSubscribe 交換器,而不是無名的消息交換器。交換類型有以下幾種:direct,topic,headers 和fanout,在這里我們采用fanout交換類型。
消費者代碼:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "");
Console.WriteLine(" 等待消息。");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" 接收消息:{0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
如果沒有隊列綁定到交換機,則消息將丟失,但這對我們來說是可以的。如果沒有消費者在聽,我們可以安全地丟棄該消息。
3.4 路由
在上一個教程中,我們創建了一個發布/訂閱。我們能夠向許多接收者廣播消息。在本教程中,我們將向其中添加功能-將消息分類指定給具體的訂閱者。
生產者代碼:
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
namespace Example.RabbitMQ.Routing.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body);
Console.WriteLine(" 發送消息:'{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
消費者代碼:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var severity in args)
{
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity);
}
Console.WriteLine(" 等待消息。");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
3.5 話題
在上一個教程中,我們改進了消息系統。代替使用僅能進行虛擬廣播的扇出交換機,我們使用直接交換機,並有選擇地接收消息的可能性。
盡管使用直接交換對我們的系統進行了改進,但它仍然存在局限性-它無法基於多個條件進行路由。
*(星號)可以代替一個單詞。
#(哈希)可以替代零個或多個單詞。
生產者代碼:
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
namespace Example.RabbitMQ.Topics.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
var routingKey = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine(" 發送消息:'{0}':'{1}'", routingKey, message);
}
}
}
}
消費者代碼:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var bindingKey in args)
{
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey);
}
Console.WriteLine(" 等待消息。 To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}