消息隊列基礎知識以及在 C# 中使用 RabbitMQ


消息隊列基礎知識以及在 C# 中使用 RabbitMQ

我在這里只記錄點基本內容,不會寫太深入的內容

同步與異步對比

這是使用消息隊列之前的服務器架構

這是使用消息隊列之后的服務器架構

異步化架構流程

異步,不是同步,也就是不立即處理,而是延遲處理。時間換性能

舉例:
一個請求,需要一秒才能處理完,請求多了,服務器就處理不過來
如果能優化到一毫秒處理一個請求,那就提升了 1000 倍處理能力,但是這是不可能的,因為數據庫有瓶頸

解決方案:

  1. 應用服務器(Web API)數據庫之間加一層消息隊列
  2. 應用服務器消息隊列只是保存一下操作的信息(在內存),直接返回結果給用戶,但是並未完成業務。這是可以達到毫秒級響應,因為沒有處理業務邏輯
  3. 消息隊列數據庫之間再加一層處理器,處理器就是從消息隊列里面拿數據的,然后處理器再去與數據庫交互,處理業務邏輯

消息應該使用 JSON

應用服務器是不能主動去查詢數據庫再返回結果給用戶的,因為成本比較高,一般都是客戶端輪詢WebSocket之類的,去問應用服務器業務邏輯是否完成,比如掃碼之類的功能

異步化優劣勢解讀

優勢

1. 流量削峰

流量削峰,消息隊列的最重要特性,把流量高峰的業務延遲到后面再處理

流量高峰期,請求特別多,但是應用服務器不處理業務邏輯,所以無所謂,吞吐量高
業務堆積在隊列里面,晚點再處理

2. 高可用

可用性,就是對外不間斷的提供服務

簡化應用服務,處理器異常也不影響我們對外提供服務

使用消息隊列,應用服務器可能也就做一下數據驗證什么的工作,具體的業務邏輯在處理器中處理,即使所有的處理器都宕機,但是我們的消息隊列依然可以接收消息,這樣,我們的系統依然能對外提供服務

3. 擴展性

消息隊列的消息來自應用服務器,如果我們需要做擴展功能和手機業務邏輯,只需要給處理器這一層升級就可以了。用戶提交的信息不需要改變,也完全不影響應用服務器,耦合度也降低了

直接從物理層面隔離,擴展互不影響

4. 重試機制

以前我們的應用服務器是直接連接數據庫,對數據進行操作,操作失敗就返回失敗。
消息隊列如果失敗了,並不會把消息扔掉,之后再重新嘗試操作

缺陷

1. 降低用戶體驗

因為不能快速拿到結果,會降低用戶體驗,需要業務妥協一下

2. 代碼的復雜度

原來我們只有客戶端、應用服務器、數據庫這三層
現在我們有客戶端、應用服務器、消息隊列、處理器、數據庫

應用服務器層,除了做驗證和消息序列化操作,還要支持用戶查詢業務處理結果
處理器層,需要寫業務邏輯

3. 重放攻擊

頁面連點幾次,數據庫有多條相同數據
但是在消息隊列里面就比較難處理了,會有多條重復消息

可能因為網絡抖動,操作信息已經保存到消息隊列中,但是應用服務器沒有返回結果。用戶就以為請求沒有成功,很可能會再次發起請求,消息隊列會出現多條相同的數據

4. 冪等性設計

處理器處理完業務邏輯會返回結果,並刪除消息隊列中的信息,但是在消息隊列里,也有可能處理器對數據庫處理完成,但是沒有移除消息隊列的信息

冪等性設計就是說,處理器處理重復的消息,不會在數據庫產生新的數據,也不會影響結果

消息隊列

消息隊列,是一個獨立進程,(一般)使用內存保存(速度快,有丟失),支持網絡讀寫

生產者消費者模式:

  • 生產者負責往消息隊列里寫入數據,生產者可以有多個
  • 消費者負責使用消息隊列中的數據
  • 一條消息消費一次

生產者消費者模式長這樣

發布訂閱模式:

  • 發布者負責往消息隊列里寫入數據,發布者可以有多個
  • 訂閱者負責使用消息隊列中的數據,訂閱者可以有多個
  • 一條消息訂閱多次

發布訂閱模式長這樣

安裝 RabbitMQ

安裝過程就沒必要寫了,教程到處都是,也有 Windows 版一鍵安裝

但是注意:guest 不支持遠程訪問,所以要新增一個用戶

RabbitMQ 集群
因為整幾台 Linux 有點麻煩,而且我也不會用 Docker,所以我就不寫了

RabbitMQ 集群搭建參考:https://blog.csdn.net/qq_28533563/article/details/107932737

RabbitMQ

RabbitMQ 相對於一般的消息隊列來說,有一個比較獨特的設計,就是Exchangeds(交換機)Queues(隊列)

所以系統結構就變成了,生產者先連接交換機,交換機再去連接消息隊列。

多了交換機這一層,就可以有很多功能:

  • 路由:由交換機去轉發到消息隊列
  • 實現了一條消息多個隊列使用:因為消息由交換機轉發,所以可以轉發到多個隊列中

在 C# 中使用 RabbitMQ

首先,使用 NuGet 安裝一個包:RabbitMQ.Client

會涉及到以下幾個函數
這些參數我也不太懂,所以我把源碼的函數聲明給拿出來了,自行翻譯

定義隊列

/// <summary>
/// Declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
/// <param name="queue">The name of the queue. Pass an empty string to make the server generate a name.</param>
/// <param name="durable">Should this queue will survive a broker restart?</param>
/// <param name="exclusive">Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.</param>
/// <param name="autoDelete">Should this queue be auto-deleted when its last consumer (if any) unsubscribes?</param>
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

定義交換機

/// <summary>Declare an exchange.</summary>
/// <remarks>
/// The exchange is declared non-passive and non-internal.
/// The "nowait" option is not exercised.
/// </remarks>
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);

將隊列綁定到交換機上

/// <summary>
/// Bind a queue to an exchange.
/// </summary>
/// <remarks>
///   <para>
///     Routing key must be shorter than 255 bytes.
///   </para>
/// </remarks>
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);

發布消息

/// <summary>
/// (Extension method) Convenience overload of BasicPublish.
/// </summary>
/// <remarks>
/// The publication occurs with mandatory=false
/// </remarks>
public static void BasicPublish(this IModel model, string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
    model.BasicPublish(exchange, routingKey, false, basicProperties, body);
}

/// <summary>
/// Publishes a message.
/// </summary>
/// <remarks>
///   <para>
///     Routing key must be shorter than 255 bytes.
///   </para>
/// </remarks>
void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);

啟動消費者,接收消息

/// <summary>Start a Basic content-class consumer.</summary>
public static string BasicConsume(this IModel model, string queue, bool autoAck, IBasicConsumer consumer)
{
    return model.BasicConsume(queue, autoAck, "", false, false, null, consumer);
}

/// <summary>Start a Basic content-class consumer.</summary>
string BasicConsume(
    string queue,
    bool autoAck,
    string consumerTag,
    bool noLocal,
    bool exclusive,
    IDictionary<string, object> arguments,
    IBasicConsumer consumer);

Received 事件,消費者接收到消息事件,即,消費者處理消息

///<summary>
/// Event fired when a delivery arrives for the consumer.
/// </summary>
/// <remarks>
/// Handlers must copy or fully use delivery body before returning.
/// Accessing the body at a later point is unsafe as its memory can
/// be already released.
/// </remarks>
public event EventHandler<BasicDeliverEventArgs> Received;

生產者消費者模式

生產者部分

生產者項目就用 RESTful

[HttpGet("one/{count}")]
public async Task<ActionResult> One(int count)
{
    string queueName = "queue_demo_one";
    string exchangeName = "exchange_demo_one";

    //先創建連接
    var factory = new ConnectionFactory()
    {
        HostName = "192.168.0.102",//ip
        Port = 5672,//端口,15672 是 web 端管理用的,5672 是用於客戶端與消息中間件之間可以傳遞消息
        UserName = "admin",//用戶名
        Password = "123456"//密碼
    };

    //打開連接
    using var connection = factory.CreateConnection();
    using IModel channel = connection.CreateModel();

    //定義隊列
    channel.QueueDeclare(queue: queueName,
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);

    //定義交換機
    channel.ExchangeDeclare(exchange: exchangeName,
        type: ExchangeType.Direct,
        durable: true,
        autoDelete: false,
        arguments: null);

    //將隊列綁定到交換機上
    channel.QueueBind(queue: queueName,
        exchange: exchangeName,
        routingKey: string.Empty,
        arguments: null);

    //發送隊列
    for (int i = 0; i < count; i++)
    {
        string message = $"Task {i}";
        byte[] body = Encoding.UTF8.GetBytes(message);

        //發送消息
        channel.BasicPublish(exchange: exchangeName,
            routingKey: string.Empty,
            basicProperties: null,
            body: body);

        Console.WriteLine($"消息:{message} 已發送");
    }

    return Ok();
}

消費者部分

處理器這里用控制台

static void Foo_One()
{
    string queueName = "queue_demo_one";
    string exchangeName = "exchange_demo_one";

    var factory = new ConnectionFactory()
    {
        HostName = "192.168.0.102",
        Port = 5672,
        UserName = "admin",
        Password = "123456"
    };

    using var connection = factory.CreateConnection();
    using IModel channel = connection.CreateModel();

    //定義消費者
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, args) =>
        {
            var body = args.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine($"消費者接收消息 {message}");
        };

    //啟動消費者
    channel.BasicConsume(queue: queueName,
        autoAck: true,//自動確認
        consumer: consumer);

    //處理完消息后,保持程序繼續運行,可以繼續接收消息
    Console.ReadLine();
}

效果

先啟動 RESTful 項目,訪問一次,會給隊列增加消息,我在這里添加 10 條消息

可以看到 RabbitMQ 服務器這里多了一個 exchange

消息隊列里也多了 10 條消息

啟動控制台項目,消費者會處理數據

再看 RabbitMQ 服務器,消息被處理

不關閉控制台,再次訪問 webapi,這樣會自動執行

發布訂閱模式

發布者

發布者也是用 RESTful

[HttpGet("multi/{count}")]
public async Task<ActionResult> Multi(int count)
{
    string queueName = "queue_demo_multi";
    string smsQueueName = "queue_demo_multi_sms";
    string emailQueueName = "queue_demo_multi_eamil";
    string exchangeName = "exchange_demo_multi";

    //先創建連接
    var factory = new ConnectionFactory()
    {
        HostName = "192.168.0.102",
        Port = 5672,
        UserName = "admin",
        Password = "123456"
    };

    using var connection = factory.CreateConnection();
    using IModel channel = connection.CreateModel();

    channel.ExchangeDeclare(exchange: exchangeName,
        type: ExchangeType.Fanout,
        durable: true,
        autoDelete: false,
        arguments: null);

    //這里聲明三個隊列,並且綁定同一個交換機
    channel.QueueDeclare(queue: queueName,
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);

    channel.QueueBind(queue: queueName,
        exchange: exchangeName,
        routingKey: string.Empty,
        arguments: null);

    channel.QueueDeclare(queue: smsQueueName,
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);

    channel.QueueBind(queue: smsQueueName,
        exchange: exchangeName,
        routingKey: string.Empty,
        arguments: null);

    channel.QueueDeclare(queue: emailQueueName,
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);

    channel.QueueBind(queue: emailQueueName,
        exchange: exchangeName,
        routingKey: string.Empty,
        arguments: null);

    for (int i = 0; i < count; i++)
    {
        string message = $"Task {i}";
        byte[] body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: exchangeName,
            routingKey: string.Empty,
            basicProperties: null,
            body: body);

        Console.WriteLine($"消息:{message} 已發送");
    }

    return Ok();
}

訂閱者

訂閱者這里也是控制台,輸入索引,表示啟動不同的消費者(處理器)

static void Foo_Multi()
{
    string queueName = "queue_demo_multi";
    string smsQueueName = "queue_demo_multi_sms";
    string emailQueueName = "queue_demo_multi_eamil";
    string exchangeName = "exchange_demo_multi";

    string[] strs = new string[3];
    strs[0] = queueName;
    strs[1] = smsQueueName;
    strs[2] = emailQueueName;

    Console.Write("輸入索引 0 ~ 2 :");
    int index = Convert.ToInt32(Console.ReadLine());

    var factory = new ConnectionFactory()
    {
        HostName = "192.168.0.102",
        Port = 5672,
        UserName = "admin",
        Password = "123456"
    };

    using var connection = factory.CreateConnection();
    using IModel channel = connection.CreateModel();

    channel.ExchangeDeclare(exchange: exchangeName,
        type: ExchangeType.Fanout,
        durable: true,
        autoDelete: false,
        arguments: null);

    channel.QueueDeclare(queue: strs[index],
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: null);

    channel.QueueBind(queue: strs[index],
        exchange: exchangeName,
        routingKey: string.Empty,
        arguments: null);

    //定義消費者
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, args) =>
    {
        var body = args.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine($"消費者 {strs[index]} 接收消息 {message}");
    };

    //啟動消費者
    channel.BasicConsume(queue: strs[index],
        autoAck: true,//自動確認
        consumer: consumer);


    //處理完消息后,保持程序繼續運行,可以繼續接收消息
    Console.ReadLine();
}

效果

先啟動 RESTful 項目,訪問一次,會給隊列增加消息,這里添加 10 條消息

查看 exchange,沒有問題

查看 Queue,三個隊列都有消息

啟動三個控制台項目,消費者會處理數據

都處理完了

不關閉控制台,再次訪問 webapi,這樣會自動執行,這里就不演示了

注意

Console.ReadLine()一定要跟啟動消費者channel.BasicConsume()寫在同一個函數中

消息隊列基礎知識以及在 C# 中使用 RabbitMQ 結束


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM