ASP.NET Core 6.0 使用RabbitMQ


1. 簡介

RabbitMQ是一個開源的,基於AMQP(Advanced Message Queuing Protocol)協議的完整的可復用的企業級消息隊,RabbitMQ可以實現點對點,發布訂閱等消息處理模式。

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持Linux,windows,macOS,FreeBSD等操作系統,同時也支持很多語言,如:Python,Java,Ruby,PHP,C#,JavaScript,Go,Elixir,Objective-C,Swift等。

當今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等。

不同MQ特點

  1. ActiveMQ
    是Apache出品,最流行的,能力強勁的開源消息總線。它是一個完全支持JMS規范的的消息中間件。豐富的API,多種集群架構模式讓ActiveMQ在業界成為老牌的消息中間件,在中小型企業頗受歡迎!
  2. Kafka
    是LinkedIn開源的分布式發布-訂閱消息系統,目前歸屬於Apache頂級項目。Kafka主要特點是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日志收集和傳輸。0.8版本開始支持復制,不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。
  3. RocketMQ
    是阿里開源的消息中間件,它是純Java開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。RocketMQ思路起源於Kafka,但並不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用於交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。
  4. RabbitMQ
    RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。

RabbitMQ比Kafka可靠,Kafka更適合IO高吞吐的處理,一般應用在大數據日志處理或對實時性(少量延遲),可靠性(少量丟數據)要求稍低的場景使用,比如ELK日志收集。

RabbitMQ的工作機制:

首先要知道RabbitMQ的三種角色:生產者、消費者、消息服務器

  • 生產者:消息的創建者,負責創建和推送消息到消息服務器
  • 消費者:消息的接收方,接受消息並處理消息
  • 消息服務器:其實RabbitMQ本身,不會產生和消費消息,相當於一個中轉站,將生產者的消息路由給消費者

RabbitMQ的一些角色

  • ConnectionFactory:連接管理,應用程序或消費方與RabbitMQ建立連接的管理器
  • Channel:信道,推送消息的通道
  • Exchange:交換機,用於接收分配消息到隊列中
  • Queue:保存消息
  • Routingkey:消息會攜帶routingKey,決定消息最終的隊列
  • BindingKey:Queue通過bindingKey與交換機綁定

2. 安裝

網上有許多RabbitMQ的安裝博客,所以在此不介紹。可以安裝在 windows、linux、docker

web管理界面介紹

2.1. overview概覽

2.2. Admin用戶和虛擬主機管理

2.2.1. 添加用戶


上面的Tags選項,其實是指定用戶的角色,可選的有以下幾個:

  • 超級管理員(administrator)
    可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
  • 監控者(monitoring)
    可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)
  • 策略制定者(policymaker)
    可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
  • 普通管理者(management)
    僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
  • 其他
    無法登陸管理控制台,通常就是普通的生產者和消費者。

2.2.2. 創建虛擬主機


為了讓各個用戶可以互不干擾的工作,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。其實就是一個獨立的訪問路徑,不同用戶使用不同路徑,各自有自己的隊列、交換機,互相不會影響。

2.2.3. 支持的消息模型


3. NET Core中使用RabbitMQ

RabbitMQ 從信息接收者角度可以看做三種模式,一對一,一對多(此一對多並不是發布訂閱,而是每條信息只有一個接收者)和發布訂閱。其中一對一是簡單隊列模式,一對多是Worker模式,而發布訂閱包括發布訂閱模式,路由模式和通配符模式,為什么說發布訂閱模式包含三種模式呢,其實發布訂閱,路由,通配符三種模式都是使用只是交換機(Exchange)類型不一致

3.1 簡單隊列

首先,我們需要創建兩個控制台項目.Send(發送者)和Receive(接收者),然后為兩個項目安裝RabbitMQ.Client驅動

install-package rabbitmq.client

然后在Send和Receive項目中編寫我們的消息隊列代碼

生產者代碼

show code
using RabbitMQ.Client;
using System.Text;

Console.WriteLine("Hello, World! 生產者");

var factory = new ConnectionFactory()       // 創建連接工廠對象
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 創建連接對象
var channel = connection.CreateModel();         // 創建連接會話對象

string queueName = "queue1";

// 聲明一個隊列
channel.QueueDeclare(
    queue: queueName,   // 隊列名稱
    durable: false,     // 是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息
    exclusive: false,   // 是否排他,如果一個隊列聲明為排他隊列,該隊列僅對時候次聲明它的連接可見,並在連接斷開時自動刪除
    autoDelete: false,  // 是否自動刪除,自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除
    arguments: null     // 設置隊列的其他參數
);

string str = string.Empty;

do {
    Console.WriteLine("發送內容:");
    str = Console.ReadLine()!;

    // 消息內容
    byte[] body = Encoding.UTF8.GetBytes(str);

    // 發送消息
    channel.BasicPublish("", queueName, null, body);

    // Console.WriteLine("成功發送消息:" + str);
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

code describe

  • 可以看到 RabbitMQ 使用了 IConnectionFactory, IConnection和IModel 來創建鏈接和通信管道, IConnection 實例對象只負責與 Rabbit 的連接,而發送接收這些實際操作全部由會話通道進行。
  • 而后使用 QueneDeclare 方法進行創建消息隊列,創建完成后可以在 RabbitMQ 的管理工具中看到此隊列,QueneDelare 方法需要一個消息隊列名稱的必須參數.后面那些參數則代表緩存,參數等信息。
  • 最后使用 BasicPublish 來發送消息,在一對一中 routingKey 必須和 queueName 一致。

消費者代碼

show code
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

Console.WriteLine("Hello, World! 消費者1");

var factory = new ConnectionFactory()       // 創建連接工廠對象
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 創建連接對象
IModel channel = connection.CreateModel();         // 創建連接會話對象

string queueName = "queue1";
//聲明一個隊列
channel.QueueDeclare(
  queue: queueName,//消息隊列名稱
  durable: false,//是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息。
  exclusive: false,//是否排他,true排他的,如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除.
  arguments: null ////設置隊列的一些其它參數
);

// 創建消費者對象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的消息為:" + Encoding.UTF8.GetString(message));
};

// 消費者開啟監聽
channel.BasicConsume(queueName, true, consumer);

Console.ReadKey();
channel.Dispose();
connection.Close();

code describe

  • 在接收者中是定義一個EventingBasicConsumer對象的消費者(接收者),這個消費者與會話對象關聯,
  • 然后定義接收事件,輸出從消息隊列中接收的數據,
  • 最后使用會話對象的BasicConsume方法來啟動消費者監聽.消費者的定義也是如此簡單.
  • 不過注意一點,可以看到在接收者代碼中也有聲明隊列的方法,其實這句代碼可以去掉,但是如果去掉的話接收者在程序啟動時監聽隊列,而此時這個隊列還未存在,所以會出異常,所以往往會在消費者中也添加一個聲明隊列方法

此時,簡單消息隊列傳輸就算寫好了,我們可以運行代碼就行測試

3.2 Worker模式

Worker模式其實是一對多的模式,但是這個一對多並不是像發布訂閱那種,而是信息以順序的傳輸給每個接收者,我們可以使用上個例子來運行worker模式甚至,只需要運行多個接收者即可

默認情況下,RabbitMQ會順序的將message發給下一個消費者。每個消費者會得到平均數量的message。這種方式稱之為round-robin(輪詢).
但是很多情況下並不希望消息平均分配,而是要消費快的多消費,消費少的少消費。還有很多情況下一旦其中一個宕機,那么另外接收者的無法接收原本這個接收者所要接收的數據。

下面針對上面的兩個問題進行處理
首先我們先來看一下所說的宕機丟失數據一說,我們在上個例子Receive接收事件中添加線程等待

consumer.Received += (model, ea) => {
    Thread.Sleep(3000);
    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的消息為:" + Encoding.UTF8.GetString(message));
};

然后再次啟動兩個接收者進行測試

可以看到發送者發送了1-9的數字,第二個接收者在接收數據途中宕機,第一個接收者也並沒有去接收第二個接收者宕機后的數據,有的時候我們會有當接收者宕機后,其余數據交給其它接收者進行消費,那么該怎么進行處理呢,解決這個問題得方法就是改變其消息確認模式

Rabbit中存在兩種消息確認模式

  • 自動模式 - 只要消息從隊列獲取,無論消費者獲取到消息后是否成功消費,都認為是消息成功消費.
  • 手動模式 - 消費從隊列中獲取消息后,服務器會將該消息處於不可用狀態,等待消費者反饋。如果消費者在消費過程中出現異常,斷開連接切沒有發送應答,那么RabbitMQ會將這個消息重新投遞。

修改兩個消費者代碼,並在其中一個中延遲確認。

consumer.Received += (model, ea) => {
    Thread.Sleep(3000);
    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的消息為:" + Encoding.UTF8.GetString(message));
    
    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回消息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設置false 關閉自動確認.

如果在延遲中消費者斷開連接,那么RabbitMQ會重新投遞未確認的消息

‘能者多勞’模式
能者多勞是給消費速度快的消費更多的消息.少的責消費少的消息.能者多勞是建立在手動確認基礎上實現。
在延遲確認的消費中添加BasicQos

3.3 Exchange模式(發布訂閱模式,路由模式,通配符模式)

前面說過發布,路由,通配符這三種模式其實可以算為一種模式,區別僅僅是交互機類型不同.在這里出現了一個交換機的東西,發送者將消息發送發送到交換機,接收者創建各自的消息隊列綁定到交換機,

通過上面三幅圖可以看出這三種模式本質就是一種訂閱模式,路由,通配符模式只是訂閱模式的變種模式。使其可以選擇發送訂閱者中的接收者。
注意:交換機本身並不存儲數據,數據存儲在消息隊列中,所以如果向沒有綁定消息隊列的交換機中發送信息,那么信息將會丟失

3.3.1 發布訂閱模式(Fanout)

生產者代碼

show code
using RabbitMQ.Client;
using System.Text;

Console.WriteLine("Hello, World! 生產者");

var factory = new ConnectionFactory()       // 創建連接工廠對象
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 創建連接對象
var channel = connection.CreateModel();         // 創建連接會話對象

#region 定義交換機
string exchangeName = "exchange1";

channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); // 把交換機設置為 fanout 發布訂閱模式
#endregion

string str;
do {
    Console.WriteLine("發送內容:");
    str = Console.ReadLine()!;

    byte[] body = Encoding.UTF8.GetBytes(str); // 消息內容

    channel.BasicPublish(exchangeName, "", null, body); // 發送消息
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

code describe

  • 代碼與上面沒有什么差異,只是由上面的消息隊列聲明變成了交換機聲明(交換機類型為fanout),也就說發送者發送消息從原來的直接發送消息隊列變成了發送到交換機

消費者代碼

show code
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

Console.WriteLine("Hello, World! 消費者1");

var factory = new ConnectionFactory()       // 創建連接工廠對象
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 創建連接對象
IModel channel = connection.CreateModel();         // 創建連接會話對象

#region 聲明交換機
string exchangeName = "exchange1";
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
#endregion

#region 聲明隊列
string queueName = exchangeName + "_" + new Random().Next(1, 1000);
Console.WriteLine("隊列名稱:" + queueName);

channel.QueueDeclare(
  queue: queueName,//消息隊列名稱
  durable: false,//是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息。
  exclusive: false,//是否排他,true排他的,如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除.
  arguments: null ////設置隊列的一些其它參數
);
#endregion


channel.QueueBind(queueName, exchangeName, ""); // 將隊列與交換機綁定

channel.BasicQos(0, 1, false);  // 告訴Rabbit每次只能向消費者發送一條信息,再消費者未確認之前,不再向他發送信息

// 創建消費者對象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的消息為:" + Encoding.UTF8.GetString(message));

    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回消息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設置false 關閉自動確認.

Console.ReadKey();
channel.Dispose();
connection.Close();

code describe

  • 可以看到消費者代碼與上面有些差異
  • 首先是聲明交換機(同上面一樣,為了防止異常)
  • 然后聲明消息隊列並對交換機進行綁定,在這里使用了隨機數,目的是聲明不重復的消息隊列,如果是同一個消息隊列,則就變成worker模式,也就是說對於發布訂閱模式有多少接收者就有多少個消息隊列,而這些消息隊列共同從一個交換機中獲取數據

然后同時開兩個接收者,結果就如下

3.3.2 路由模式(Direct)

路由模式下,在發布消息時指定不同的routeKey,交換機會根據不同的routeKey分發消息到不同的隊列中

生產者代碼

show code
Console.WriteLine("Hello, World! 生產者");

Console.WriteLine($"輸入 routingKey:");
string routingKey = Console.ReadLine()!;

// 創建連接工廠對象
var factory = new ConnectionFactory() {
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 創建連接對象
var channel = connection.CreateModel();         // 創建連接會話對象

#region 定義交換機
string exchangeName = "exchange2";

channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
#endregion

string str;
do {
    Console.WriteLine("發送內容:");
    str = Console.ReadLine()!;

    byte[] body = Encoding.UTF8.GetBytes(str); // 消息內容

    channel.BasicPublish(exchangeName, routingKey, null, body); // 發送消息
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

申明一個routeKey值為key1,並在發布消息的時候告訴了RabbitMQ,消息傳遞時routeKey必須匹配,才會被隊列接收否則消息會被拋棄。

消費者代碼

show code
Console.WriteLine("Hello, World! 消費者1");

Console.WriteLine($"輸入接受key名稱:");
string routeKey = Console.ReadLine()!;

var factory = new ConnectionFactory()       // 創建連接工廠對象
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 創建連接對象
IModel channel = connection.CreateModel();         // 創建連接會話對象

#region 聲明交換機
string exchangeName = "exchange2";
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
#endregion

#region 聲明隊列
string queueName = exchangeName + "_" + new Random().Next(1, 1000);
Console.WriteLine("隊列名稱:" + queueName);

channel.QueueDeclare(
  queue: queueName,//消息隊列名稱
  durable: false,//是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息。
  exclusive: false,//是否排他,true排他的,如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除.
  arguments: null ////設置隊列的一些其它參數
);
#endregion

channel.QueueBind(queueName, exchangeName, routeKey); // 將隊列與交換機綁定
channel.QueueBind(queueName, exchangeName, "key2"); 
channel.QueueBind(queueName, exchangeName, "key3"); // 可以通過綁定多個,來匹配多個路由 

// channel.BasicQos(0, 1, false);  // 告訴Rabbit每次只能向消費者發送一條信息,再消費者未確認之前,不再向他發送信息

// 創建消費者對象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的消息為:" + Encoding.UTF8.GetString(message));

    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回消息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設置false 關閉自動確認.

Console.ReadKey();
channel.Dispose();
connection.Close();

code describe

  • 一個接收者消息隊列可以聲明多個路由與交換機進行綁定

運行結果如下

3.3.3 通配符模式(Topic)

通配符模式與路由模式一致,只不過通配符模式中的路由可以聲明為模糊查詢,RabbitMQ擁有兩個通配符

  • #:匹配0-n個字符語句
  • *:匹配一個字符語句
  • 注意:RabbitMQ中通配符並不像正則中的單個字符,而是一個以“.”分割的字符串,如 ”topic1.*“匹配的規則以topic1開始並且"."后只有一段語句的路由 例:“topic1.aaa”,“topic1.bb”
  • 而“#”可以匹配到 “topic1.aaa.bb”,“topic1.bb.cc”.

生產者代碼

show code
Console.WriteLine("Hello, World! 生產者");

Console.WriteLine($"輸入 routingKey:");
string routingKey = Console.ReadLine()!;


// 創建連接工廠對象
var factory = new ConnectionFactory() {
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 創建連接對象
var channel = connection.CreateModel();         // 創建連接會話對象

#region 定義交換機
string exchangeName = "exchange3";

channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic);
#endregion

string str;
do {
    Console.WriteLine("發送內容:");
    str = Console.ReadLine()!;

    byte[] body = Encoding.UTF8.GetBytes(str); // 消息內容

    channel.BasicPublish(exchangeName, routingKey, null, body); // 發送消息
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

消費者代碼

show code
Console.WriteLine("Hello, World! 消費者1");

Console.WriteLine($"輸入接受key名稱:");  // key.* 或者 key.#
string routeKey = Console.ReadLine()!;

var factory = new ConnectionFactory()       // 創建連接工廠對象
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 創建連接對象
IModel channel = connection.CreateModel();         // 創建連接會話對象

#region 聲明交換機
string exchangeName = "exchange3";
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
#endregion

#region 聲明隊列
string queueName = exchangeName + "_" + new Random().Next(1, 1000);
Console.WriteLine("隊列名稱:" + queueName);

channel.QueueDeclare(
  queue: queueName,//消息隊列名稱
  durable: false,//是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息。
  exclusive: false,//是否排他,true排他的,如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除.
  arguments: null ////設置隊列的一些其它參數
);
#endregion

channel.QueueBind(queueName, exchangeName, routeKey); // 將隊列與交換機綁定

// 創建消費者對象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的消息為:" + Encoding.UTF8.GetString(message));
    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回消息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設置false 關閉自動確認.

Console.ReadKey();
channel.Dispose();
connection.Close();

只有在通配符匹配通過的情況下才會接收消息

這里引用兩個鏈接是對RabbitMQ中參數和方法的說明:
https://blog.csdn.net/fly_leopard/article/details/102821776
https://www.cnblogs.com/cuijl/p/8075130.html

部分內容來自:
https://blog.csdn.net/qq_44845339/article/details/114848670
https://www.cnblogs.com/yan7/p/9498685.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM