RabbitMQ一個簡單可靠的方案(.Net Core實現)


前言

  最近需要使用到消息隊列相關技術,於是重新接觸RabbitMQ。其中遇到了不少可靠性方面的問題,歸納了一下,大概有以下幾種:

  1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等;

  2. 時序異常,如A任務依賴於B任務,但可能由於調度或消費者分配的原因,導致A任務先於B任務執行;

  3. 業務異常,由於系統測試不充分,上線后發現某幾個或某幾種消息無法正常處理;

  4. 系統異常,業務中間件無法正常操作,如網絡中斷、數據庫宕機等;

  5. 非法異常,一些偽造、攻擊類型的消息。

 

  針對這些異常,我采用了一種基於消息審計、消息重試、消息檢索、消息重發的方案。

 

方案

 

 

  1. 消息均使用Exchange進行通訊,方式可以是direct或topic,不建議fanout。

  2. 根據業務在Exchange下分配一個或多個Queue,同時設置一個審計線程(Audit)監聽所有Queue,用於記錄消息到MongoDB,同時又不阻塞正常業務處理

  3. 生產者(Publisher)在發布消息時,基於AMQP協議,生成消息標識MessageId和時間戳Timestamp,根據消息業務添加頭信息Headers便於跟蹤。

  

  4. 消費者(Comsumer)消息處理失敗時,則把消息發送到重試交換機(Retry Exchange),並設置過期(重試)時間及更新重試次數;如果超過重試次數則刪除消息。

  5. 重試交換機Exchange設置死信交換機(Dead Letter Exchange),消息過期后自動轉發到業務交換機(Exchange)。

  6. WebApi可以根據消息標識MessageId、時間戳Timestamp以及頭信息Headers在MongoDB中對消息進行檢索或重試。

   

  注:選擇MongoDB作為存儲介質的主要原因是其對頭信息(headers)的動態查詢支持較好,同等的替代產品還可以是Elastic Search這些。

 

生產者(Publisher)

  1. 設置斷線自動恢復

  var factory = new ConnectionFactory
  {
      Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"),
      AutomaticRecoveryEnabled = true
  };

 

  2. 定義Exchange,模式為direct

  channel.ExchangeDeclare("Exchange", "direct");

 

  3. 根據業務定義QueueA和QueueB

  channel.QueueDeclare("QueueA", true, false, false);
  channel.QueueBind("QueueA", "Exchange", "RouteA");

  channel.QueueDeclare("QueueB", true, false, false);
  channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  4. 啟動消息發送確認機制,即需要收到RabbitMQ服務端的確認消息

  channel.ConfirmSelect();

 

  5. 設置消息持久化

  var properties = channel.CreateBasicProperties();
  properties.Persistent = true;

 

  6. 生成消息標識MessageId、時間戳Timestamp以及頭信息Headers

  properties.MessageId = Guid.NewGuid().ToString("N");
  properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
  properties.Headers = new Dictionary<string, object>
  {
      { "key", "value" + i}
  };

 

  7. 發送消息,偶數序列發送到QueueA(RouteA),奇數序列發送到QueueB(RouteB)

  channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);

 

  8. 確定收到RabbitMQ服務端的確認消息

  var isOk = channel.WaitForConfirms();
  if (!isOk)
  {
      throw new Exception("The message is not reached to the server!");
  }

 

  完整代碼

var factory = new ConnectionFactory
{
    Uri = new Uri("amqp://guest:guest@localhost:5672"),
    AutomaticRecoveryEnabled = true
};

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("Exchange", "direct");

        channel.QueueDeclare("QueueA", true, false, false);
        channel.QueueBind("QueueA", "Exchange", "RouteA");

        channel.QueueDeclare("QueueB", true, false, false);
        channel.QueueBind("QueueB", "Exchange", "RouteB");

        channel.ConfirmSelect();

        for (var i = 0; i < 2; i++)
        {
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.MessageId = Guid.NewGuid().ToString("N");
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

            properties.Headers = new Dictionary<string, object>
            {
                { "key", "value" + i}
            };

            var message = "Hello " + i;
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);
            var isOk = channel.WaitForConfirms();
            if (!isOk)
            {
                throw new Exception("The message is not reached to the server!");
            }
        }
    }
}
View Code

 

  效果:QueueA和QueueB各一條消息,QueueAudit兩條消息

 

   注:Exchange下必須先聲明Queue才能接收到消息,上述代碼並沒有QueueAudit的聲明;需要手動聲明,或者先執行下面的消費者程序進行聲明。

 

正常消費者(ComsumerA)

  1. 設置預取消息,避免公平輪訓問題,可以根據需要設置預取消息數,這里是1

  _channel.BasicQos(0, 1, false);

  

 

  2. 聲明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");
  _channel.QueueDeclare("QueueA", true, false, false);
  _channel.QueueBind("QueueA", "Exchange", "RouteA");

 

  3. 編寫回調函數

  var consumer = new EventingBasicConsumer(_channel);
  consumer.Received += (model, ea) =>
  {
      //The QueueA is always successful.
      try
      {
          _channel.BasicAck(ea.DeliveryTag, false);
      }
      catch (AlreadyClosedException ex)
      {
          _logger.LogCritical(ex, "RabbitMQ is closed!");
      }
  };

  _channel.BasicConsume("QueueA", false, consumer);

  注:設置了RabbitMQ的斷線恢復機制,當RabbitMQ連接不可用時,與MQ通訊的操作會拋出AlreadyClosedException的異常,導致主線程退出,哪怕連接恢復了,程序也無法恢復,因此,需要捕獲處理該異常。

 

異常消費者(ComsumerB)

  1. 設置預取消息

  _channel.BasicQos(0, 1, false);

 

  2. 聲明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");
  _channel.QueueDeclare("QueueB", true, false, false);   _channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  3.  設置死信交換機(Dead Letter Exchange)

  var retryDic = new Dictionary<string, object>
  {
      {"x-dead-letter-exchange", "Exchange"},
      {"x-dead-letter-routing-key", "RouteB"}
  };

  _channel.ExchangeDeclare("Exchange_Retry", "direct");
  _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);
  _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");

 

  4. 重試設置,3次重試;第一次1秒,第二次10秒,第三次30秒

  _retryTime = new List<int>
  {
      1 * 1000,
      10 * 1000,
      30 * 1000
  };

 

  5. 獲取當前重試次數

  var retryCount = 0;
  if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))
  {
      retryCount = (int)ea.BasicProperties.Headers["retryCount"];
      _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");
  }

 

  6. 發生異常,判斷是否可以重試

  private bool CanRetry(int retryCount)
  {
      return retryCount <= _retryTime.Count - 1;
  }

 

  7. 可以重試,則啟動重試機制

  private void SetupRetry(int retryCount, string retryExchange, string retryRoute, BasicDeliverEventArgs ea)
  {
      var body = ea.Body;
      var properties = ea.BasicProperties;
      properties.Headers = properties.Headers ?? new Dictionary<string, object>();
      properties.Headers["retryCount"] = retryCount;
      properties.Expiration = _retryTime[retryCount].ToString();

      try
      {
          _channel.BasicPublish(retryExchange, retryRoute, properties, body);
      }
      catch (AlreadyClosedException ex)
      {
          _logger.LogCritical(ex, "RabbitMQ is closed!");
      }
  }

 

  完整代碼

    _channel.BasicQos(0, 1, false);
    
    _channel.ExchangeDeclare("Exchange", "direct");
    _channel.QueueDeclare("QueueB", true, false, false);
    _channel.QueueBind("QueueB", "Exchange", "RouteB");
    
    var retryDic = new Dictionary<string, object>
    {
        {"x-dead-letter-exchange", "Exchange"},
        {"x-dead-letter-routing-key", "RouteB"}
    };
    
    _channel.ExchangeDeclare("Exchange_Retry", "direct");
    _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);
    _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");
    
    var consumer = new EventingBasicConsumer(_channel);
    consumer.Received += (model, ea) =>
    {
        //The QueueB is always failed.
        bool canAck;
        var retryCount = 0;
        if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))
        {
            retryCount = (int)ea.BasicProperties.Headers["retryCount"];
            _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");
        }
    
        try
        {
            Handle();
            canAck = true;
        }
        catch (Exception ex)
        {
            _logger.LogCritical(ex, "Error!");
            if (CanRetry(retryCount))
            {
                SetupRetry(retryCount, "Exchange_Retry", "RouteB_Retry", ea);
                canAck = true;
            }
            else
            {
                canAck = false;
            }
        }
    
        try
        {
            if (canAck)
            {
                _channel.BasicAck(ea.DeliveryTag, false);
            }
            else
            {
                _channel.BasicNack(ea.DeliveryTag, false, false);
            }
        }
        catch (AlreadyClosedException ex)
        {
            _logger.LogCritical(ex, "RabbitMQ is closed!");
        }
    };
    
    _channel.BasicConsume("QueueB", false, consumer);
View Code

 

審計消費者(Audit Comsumer)

  1. 聲明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");

  _channel.QueueDeclare("QueueAudit", true, false, false);
  _channel.QueueBind("QueueAudit", "Exchange", "RouteA");
  _channel.QueueBind("QueueAudit", "Exchange", "RouteB");

 

  2. 排除死信Exchange轉發過來的重復消息

  if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death"))
  {
      ...
  }

 

  3. 生成消息實體

  var message = new Message
  {
      MessageId = ea.BasicProperties.MessageId,
      Body = ea.Body,
      Exchange = ea.Exchange,
      Route = ea.RoutingKey
  };

 

  4. RabbitMQ會用bytes來存儲字符串,因此,要把頭中bytes轉回字符串

  if (ea.BasicProperties.Headers != null)
  {
      var headers = new Dictionary<string, object>();

      foreach (var header in ea.BasicProperties.Headers)
      {
          if (header.Value is byte[] bytes)
          {
              headers[header.Key] = Encoding.UTF8.GetString(bytes);
          }
          else
          {
              headers[header.Key] = header.Value;
          }
      }

      message.Headers = headers;
  }

 

  5. 把Unix格式的Timestamp轉成UTC時間

  if (ea.BasicProperties.Timestamp.UnixTime > 0)
  {
      message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;
      var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);
      message.Timestamp = offset.UtcDateTime;
  }

 

  6. 消息存入MongoDB

  _mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);

 

  MongoDB記錄:

  

 

  重試記錄:

  

 

消息檢索及重發(WebApi)

  1. 通過消息Id檢索消息

  

 

  2. 通過頭消息檢索消息

  

  

 

  3. 消息重發,會重新生成MessageId

  

  

 

Ack,Nack,Reject的關系

  1. 消息處理成功,執行Ack,RabbitMQ會把消息從隊列中刪除。

  2. 消息處理失敗,執行Nack或者Reject:

  a) 當requeue=true時,消息會重新回到隊列,然后當前消費者會馬上再取回這條消息;

  b) 當requeue=false時,如果Exchange有設置Dead Letter Exchange,則消息會去到Dead Letter Exchange;

  c) 當requeue=false時,如果Exchange沒設置Dead Letter Exchange,則消息從隊列中刪除,效果與Ack相同。

 

  3. Nack與Reject的區別在於:Nack可以批量操作,Reject只能單條操作。

  

RabbitMQ自動恢復

連接(Connection)恢復

  1. 重連(Reconnect)

  2. 恢復連接監聽(Listeners)

  3. 重新打開通道(Channels)

  4. 恢復通道監聽(Listeners)

  5. 恢復basic.qos,publisher confirms以及transaction設置

   

拓撲(Topology)恢復

  1. 重新聲明交換機(Exchanges)

  2. 重新聲明隊列(Queues)

  3. 恢復所有綁定(Bindings)

  4. 恢復所有消費者(Consumers)

 

異常處理機制

  1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等

  通過短時間重試(如1秒后)的方式處理,也可以考慮Nack/Reject來實現重試(時效性更高)。

 

  2. 時序異常,如A任務依賴於B任務,但可能由於調度或消費者分配的原因,導致A任務先於B任務執行

  通過長時間重試(如1分鍾、30分鍾、1小時、1天等),等待B任務先執行完的方式處理。

  

  3. 業務異常,由於系統測試不充分,上線后發現某幾個或某幾種消息無法正常處理

  等系統修正后,通過消息重發的方式處理。

 

  4. 系統異常,業務中間件無法正常操作,如網絡中斷、數據庫宕機等

  等系統恢復后,通過消息重發的方式處理。

 

  5. 非法異常,一些偽造、攻擊類型的消息

  多次重試失敗后,消息從隊列中被刪除,也可以針對此業務做進一步處理。

 

源碼地址

https://github.com/ErikXu/RabbitMesage


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM