消息隊列基礎知識以及在 C# 中使用 RabbitMQ
我在這里只記錄點基本內容,不會寫太深入的內容
同步與異步對比
這是使用消息隊列之前的服務器架構
這是使用消息隊列之后的服務器架構
異步化架構流程
異步,不是同步,也就是不立即處理,而是延遲處理。時間換性能
舉例:
一個請求,需要一秒才能處理完,請求多了,服務器就處理不過來
如果能優化到一毫秒處理一個請求,那就提升了 1000 倍處理能力,但是這是不可能的,因為數據庫有瓶頸
解決方案:
- 在應用服務器(Web API)和數據庫之間加一層消息隊列
- 從應用服務器到消息隊列只是保存一下操作的信息(在內存),直接返回結果給用戶,但是並未完成業務。這是可以達到毫秒級響應,因為沒有處理業務邏輯
- 在消息隊列和數據庫之間再加一層處理器,處理器就是從消息隊列里面拿數據的,然后處理器再去與數據庫交互,處理業務邏輯
消息應該使用 JSON
應用服務器是不能主動去查詢數據庫再返回結果給用戶的,因為成本比較高,一般都是客戶端輪詢或WebSocket之類的,去問應用服務器業務邏輯是否完成,比如掃碼之類的功能
異步化優劣勢解讀
優勢
1. 流量削峰
流量削峰,消息隊列的最重要特性,把流量高峰的業務延遲到后面再處理
流量高峰期,請求特別多,但是應用服務器不處理業務邏輯,所以無所謂,吞吐量高
業務堆積在隊列里面,晚點再處理
2. 高可用
可用性,就是對外不間斷的提供服務
簡化應用服務,處理器異常也不影響我們對外提供服務
使用消息隊列,應用服務器可能也就做一下數據驗證什么的工作,具體的業務邏輯在處理器中處理,即使所有的處理器都宕機,但是我們的消息隊列依然可以接收消息,這樣,我們的系統依然能對外提供服務
3. 擴展性
消息隊列的消息來自應用服務器,如果我們需要做擴展功能和手機業務邏輯,只需要給處理器這一層升級就可以了。用戶提交的信息不需要改變,也完全不影響應用服務器,耦合度也降低了
直接從物理層面隔離,擴展互不影響
4. 重試機制
以前我們的應用服務器是直接連接數據庫,對數據進行操作,操作失敗就返回失敗。
消息隊列如果失敗了,並不會把消息扔掉,之后再重新嘗試操作
缺陷
1. 降低用戶體驗
因為不能快速拿到結果,會降低用戶體驗,需要業務妥協一下
2. 代碼的復雜度
原來我們只有客戶端、應用服務器、數據庫這三層
現在我們有客戶端、應用服務器、消息隊列、處理器、數據庫
應用服務器層,除了做驗證和消息序列化操作,還要支持用戶查詢業務處理結果
處理器層,需要寫業務邏輯
3. 重放攻擊
頁面連點幾次,數據庫有多條相同數據
但是在消息隊列里面就比較難處理了,會有多條重復消息
可能因為網絡抖動,操作信息已經保存到消息隊列中,但是應用服務器沒有返回結果。用戶就以為請求沒有成功,很可能會再次發起請求,消息隊列會出現多條相同的數據
4. 冪等性設計
處理器處理完業務邏輯會返回結果,並刪除消息隊列中的信息,但是在消息隊列里,也有可能處理器對數據庫處理完成,但是沒有移除消息隊列的信息
冪等性設計就是說,處理器處理重復的消息,不會在數據庫產生新的數據,也不會影響結果
消息隊列
消息隊列,是一個獨立進程,(一般)使用內存保存(速度快,有丟失),支持網絡讀寫
生產者消費者模式:
- 生產者負責往消息隊列里寫入數據,生產者可以有多個
- 消費者負責使用消息隊列中的數據
- 一條消息消費一次
生產者消費者模式長這樣
發布訂閱模式:
- 發布者負責往消息隊列里寫入數據,發布者可以有多個
- 訂閱者負責使用消息隊列中的數據,訂閱者可以有多個
- 一條消息訂閱多次
發布訂閱模式長這樣
安裝 RabbitMQ
安裝過程就沒必要寫了,教程到處都是,也有 Windows 版一鍵安裝
但是注意:guest 不支持遠程訪問,所以要新增一個用戶
RabbitMQ 集群
因為整幾台 Linux 有點麻煩,而且我也不會用 Docker,所以我就不寫了
RabbitMQ 集群搭建參考:https://blog.csdn.net/qq_28533563/article/details/107932737
RabbitMQ
RabbitMQ 相對於一般的消息隊列來說,有一個比較獨特的設計,就是Exchangeds(交換機)和Queues(隊列)
所以系統結構就變成了,生產者先連接交換機,交換機再去連接消息隊列。
多了交換機這一層,就可以有很多功能:
- 路由:由交換機去轉發到消息隊列
- 實現了一條消息多個隊列使用:因為消息由交換機轉發,所以可以轉發到多個隊列中
在 C# 中使用 RabbitMQ
首先,使用 NuGet 安裝一個包:RabbitMQ.Client
會涉及到以下幾個函數
這些參數我也不太懂,所以我把源碼的函數聲明給拿出來了,自行翻譯
定義隊列
/// <summary>
/// Declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
/// <param name="queue">The name of the queue. Pass an empty string to make the server generate a name.</param>
/// <param name="durable">Should this queue will survive a broker restart?</param>
/// <param name="exclusive">Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.</param>
/// <param name="autoDelete">Should this queue be auto-deleted when its last consumer (if any) unsubscribes?</param>
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
定義交換機
/// <summary>Declare an exchange.</summary>
/// <remarks>
/// The exchange is declared non-passive and non-internal.
/// The "nowait" option is not exercised.
/// </remarks>
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
將隊列綁定到交換機上
/// <summary>
/// Bind a queue to an exchange.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
發布消息
/// <summary>
/// (Extension method) Convenience overload of BasicPublish.
/// </summary>
/// <remarks>
/// The publication occurs with mandatory=false
/// </remarks>
public static void BasicPublish(this IModel model, string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
model.BasicPublish(exchange, routingKey, false, basicProperties, body);
}
/// <summary>
/// Publishes a message.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);
啟動消費者,接收消息
/// <summary>Start a Basic content-class consumer.</summary>
public static string BasicConsume(this IModel model, string queue, bool autoAck, IBasicConsumer consumer)
{
return model.BasicConsume(queue, autoAck, "", false, false, null, consumer);
}
/// <summary>Start a Basic content-class consumer.</summary>
string BasicConsume(
string queue,
bool autoAck,
string consumerTag,
bool noLocal,
bool exclusive,
IDictionary<string, object> arguments,
IBasicConsumer consumer);
Received 事件,消費者接收到消息事件,即,消費者處理消息
///<summary>
/// Event fired when a delivery arrives for the consumer.
/// </summary>
/// <remarks>
/// Handlers must copy or fully use delivery body before returning.
/// Accessing the body at a later point is unsafe as its memory can
/// be already released.
/// </remarks>
public event EventHandler<BasicDeliverEventArgs> Received;
生產者消費者模式
生產者部分
生產者項目就用 RESTful
[HttpGet("one/{count}")]
public async Task<ActionResult> One(int count)
{
string queueName = "queue_demo_one";
string exchangeName = "exchange_demo_one";
//先創建連接
var factory = new ConnectionFactory()
{
HostName = "192.168.0.102",//ip
Port = 5672,//端口,15672 是 web 端管理用的,5672 是用於客戶端與消息中間件之間可以傳遞消息
UserName = "admin",//用戶名
Password = "123456"//密碼
};
//打開連接
using var connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
//定義隊列
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//定義交換機
channel.ExchangeDeclare(exchange: exchangeName,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
//將隊列綁定到交換機上
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: string.Empty,
arguments: null);
//發送隊列
for (int i = 0; i < count; i++)
{
string message = $"Task {i}";
byte[] body = Encoding.UTF8.GetBytes(message);
//發送消息
channel.BasicPublish(exchange: exchangeName,
routingKey: string.Empty,
basicProperties: null,
body: body);
Console.WriteLine($"消息:{message} 已發送");
}
return Ok();
}
消費者部分
處理器這里用控制台
static void Foo_One()
{
string queueName = "queue_demo_one";
string exchangeName = "exchange_demo_one";
var factory = new ConnectionFactory()
{
HostName = "192.168.0.102",
Port = 5672,
UserName = "admin",
Password = "123456"
};
using var connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
//定義消費者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, args) =>
{
var body = args.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"消費者接收消息 {message}");
};
//啟動消費者
channel.BasicConsume(queue: queueName,
autoAck: true,//自動確認
consumer: consumer);
//處理完消息后,保持程序繼續運行,可以繼續接收消息
Console.ReadLine();
}
效果
先啟動 RESTful 項目,訪問一次,會給隊列增加消息,我在這里添加 10 條消息
可以看到 RabbitMQ 服務器這里多了一個 exchange
消息隊列里也多了 10 條消息
啟動控制台項目,消費者會處理數據
再看 RabbitMQ 服務器,消息被處理
不關閉控制台,再次訪問 webapi,這樣會自動執行
發布訂閱模式
發布者
發布者也是用 RESTful
[HttpGet("multi/{count}")]
public async Task<ActionResult> Multi(int count)
{
string queueName = "queue_demo_multi";
string smsQueueName = "queue_demo_multi_sms";
string emailQueueName = "queue_demo_multi_eamil";
string exchangeName = "exchange_demo_multi";
//先創建連接
var factory = new ConnectionFactory()
{
HostName = "192.168.0.102",
Port = 5672,
UserName = "admin",
Password = "123456"
};
using var connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: exchangeName,
type: ExchangeType.Fanout,
durable: true,
autoDelete: false,
arguments: null);
//這里聲明三個隊列,並且綁定同一個交換機
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: string.Empty,
arguments: null);
channel.QueueDeclare(queue: smsQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: smsQueueName,
exchange: exchangeName,
routingKey: string.Empty,
arguments: null);
channel.QueueDeclare(queue: emailQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: emailQueueName,
exchange: exchangeName,
routingKey: string.Empty,
arguments: null);
for (int i = 0; i < count; i++)
{
string message = $"Task {i}";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName,
routingKey: string.Empty,
basicProperties: null,
body: body);
Console.WriteLine($"消息:{message} 已發送");
}
return Ok();
}
訂閱者
訂閱者這里也是控制台,輸入索引,表示啟動不同的消費者(處理器)
static void Foo_Multi()
{
string queueName = "queue_demo_multi";
string smsQueueName = "queue_demo_multi_sms";
string emailQueueName = "queue_demo_multi_eamil";
string exchangeName = "exchange_demo_multi";
string[] strs = new string[3];
strs[0] = queueName;
strs[1] = smsQueueName;
strs[2] = emailQueueName;
Console.Write("輸入索引 0 ~ 2 :");
int index = Convert.ToInt32(Console.ReadLine());
var factory = new ConnectionFactory()
{
HostName = "192.168.0.102",
Port = 5672,
UserName = "admin",
Password = "123456"
};
using var connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: exchangeName,
type: ExchangeType.Fanout,
durable: true,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: strs[index],
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: strs[index],
exchange: exchangeName,
routingKey: string.Empty,
arguments: null);
//定義消費者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, args) =>
{
var body = args.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"消費者 {strs[index]} 接收消息 {message}");
};
//啟動消費者
channel.BasicConsume(queue: strs[index],
autoAck: true,//自動確認
consumer: consumer);
//處理完消息后,保持程序繼續運行,可以繼續接收消息
Console.ReadLine();
}
效果
先啟動 RESTful 項目,訪問一次,會給隊列增加消息,這里添加 10 條消息
查看 exchange,沒有問題
查看 Queue,三個隊列都有消息
啟動三個控制台項目,消費者會處理數據
都處理完了
不關閉控制台,再次訪問 webapi,這樣會自動執行,這里就不演示了
注意
Console.ReadLine()
一定要跟啟動消費者
即channel.BasicConsume()
寫在同一個函數中