RabbitMQ消息隊列實現30分鍾訂單自動取消功能(C#)


目錄:

功能介紹

消息隊列簡介及原理

代碼與實現

消息隊列常見問題

 

功能介紹

一 . 簡單介紹一下要做的功能,用戶前台下單之后,如果用戶未支付,30分鍾后訂單會自動取消,訂單狀態和庫存變回原來狀態和庫存,我們的后台使用asp.net core 2.0開發,而asp.net core后台的定時任務 需要添加服務 services.AddHostedService<DeadListener>(); 實現類直接繼承IHostedService接口,接口會調用 啟動 方法 StartAsync 。好了,不多介紹啟動定時任務,(主要介紹的是消息隊列RabbitMQ)然后 簡單的邏輯就是 用戶下單會把一條消息插入生產隊列中,當然消息隊列的配置是30分鍾,30分鍾之內如果用戶支付,就會調用消費者接口,將消息消費掉,如果30分鍾沒有支付,超時消息會到死信隊列中,然后后台任務會檢查到死信隊列中的消息,將消息消費掉,過程中會改訂單狀態等

消息隊列簡介及原理

二 . 再簡單介紹一下消息隊列吧,我也是第一次用,網上太多帖子了,哈哈

  • ConnectionFactory(連接管理器):應用程序與Rabbit之間建立連接的管理器,程序代碼中使用;
  • Channel(信道):消息推送使用的通道;
  • Exchange(交換器):用於接受、分配消息;
  • Queue(隊列):用於存儲生產者的消息;
  • RoutingKey(路由鍵):用於把生成者的數據分配到交換器上;
  • BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;

 

 

三. 工作機制

生產者、消費者和代理

在了解消息通訊之前首先要了解3個概念:生產者、消費者和代理。

生產者:消息的創建者,負責創建和推送數據到消息服務器;

消費者:消息的接收方,用於處理數據和確認消息;

代理:就是RabbitMQ本身,用於扮演“快遞”的角色,本身不生產消息,只是扮演“快遞”的角色。

消息發送原理

首先你必須連接到Rabbit才能發布和消費消息,那怎么連接和發送消息的呢?

你的應用程序和Rabbit Server之間會創建一個TCP連接,一旦TCP打開,並通過了認證,認證就是你試圖連接Rabbit之前發送的Rabbit服務器連接信息和用戶名和密碼,有點像程序連接數據庫,使用Java有兩種連接認證的方式,后面代碼會詳細介紹,一旦認證通過你的應用程序和Rabbit就創建了一條AMQP信道(Channel)。

信道是創建在“真實”TCP上的虛擬連接,AMQP命令都是通過信道發送出去的,每個信道都會有一個唯一的ID,不論是發布消息,訂閱隊列或者介紹消息都是通過信道完成的。

為什么不通過TCP直接發送命令?

對於操作系統來說創建和銷毀TCP會話是非常昂貴的開銷,假設高峰期每秒有成千上萬條連接,每個連接都要創建一條TCP會話,這就造成了TCP連接的巨大浪費,而且操作系統每秒能創建的TCP也是有限的,因此很快就會遇到系統瓶頸。

如果我們每個請求都使用一條TCP連接,既滿足了性能的需要,又能確保每個連接的私密性,這就是引入信道概念的原因。

 

四.消息持久化

Rabbit隊列和交換器有一個不可告人的秘密,就是默認情況下重啟服務器會導致消息丟失,那么怎么保證Rabbit在重啟的時候不丟失呢?答案就是消息持久化。

當你把消息發送到Rabbit服務器的時候,你需要選擇你是否要進行持久化,但這並不能保證Rabbit能從崩潰中恢復,想要Rabbit消息能恢復必須滿足3個條件:

  1. 投遞消息的時候durable設置為true,消息持久化,代碼:channel.queueDeclare(x, true, false, false, null),參數2設置為true持久化;
  2. 設置投遞模式deliveryMode設置為2(持久),代碼:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),參數3設置為存儲純文本到磁盤;
  3. 消息已經到達持久化交換器上;
  4. 消息已經到達持久化的隊列;

持久化工作原理

Rabbit會將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費之后,Rabbit會把這條消息標識為等待垃圾回收。

持久化的缺點

消息持久化的優點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬盤要比寫入內存性能較低很多,從而降低了服務器的吞吐量,盡管使用SSD硬盤可以使事情得到緩解,但他仍然吸干了Rabbit的性能,當消息成千上萬條要寫入磁盤的時候,性能是很低的。

所以使用者要根據自己的情況,選擇適合自己的方式。

 

代碼與實現

1.聲明隊列的參數說明

1 //聲明隊列
2  channel.QueueDeclare
3  (
4      queue: QueueName, //隊列名稱
5      durable: false, //隊列是否持久化.false:隊列在內存中,服務器掛掉后,隊列就沒了;true:服務器重啟后,隊列將會重新生成.注意:只是隊列持久化,不代表隊列中的消息持久化!!!!
6       exclusive: false, //隊列是否專屬,專屬的范圍針對的是連接,也就是說,一個連接下面的多個信道是可見的.對於其他連接是不可見的.連接斷開后,該隊列會被刪除.注意,不是信道斷開,是連接斷開.並且,就算設置成了持久化,也會刪除.
7      autoDelete: true, //如果所有消費者都斷開連接了,是否自動刪除.如果還沒有消費者從該隊列獲取過消息或者監聽該隊列,那么該隊列不會刪除.只有在有消費者從該隊列獲取過消息后,該隊列才有可能自動刪除(當所有消費者都斷開連接,不管消息是否獲取完)
8      arguments: null //隊列的配置
9  );

 

1 //加載消息隊列(訂單超時)
2 //定時任務觸發器
3  services.AddHostedService<DeadListener>();
4 或者
5  services.AddTransient<IHostedService, DeadListener>();
View Code

 

2.消息隊列(生產者)

 1        /// <summary>
 2         /// 訂單超時未處理消息隊列(生產者)
 3         /// </summary>
 4         /// <param name="routeKey"></param>
 5         /// <returns></returns>
 6         public Task PublisherOrder(string routeKey)
 7         {
 8             const string routingKeyDead = "queue-dead-routing-jd"; //死信隊列路由
 9             var routingKeyDelay = "queue-delay-" + routeKey;//消息隊列路由
10             const string orderQueueName = "zzhelloJd"; //定義消息隊列名
11             const string orderQueueDeadName = "zzhello_dead_Jd"; //定義一個死信消息隊列名
12 
13             var factory = new ConnectionFactory
14             {
15                 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用戶名
16                 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密碼
17                 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip
18             };
19             using (var connection = factory.CreateConnection())
20             {
21                 using (var channel = connection.CreateModel())
22                 {
23                     //定義死信交換機
24                     channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null);
25                     //創建一個名叫"zzhello_dead"的消息隊列
26                     channel.QueueDeclare(orderQueueDeadName, true, false, false, null);
27                     //將死信隊列綁定到死信交換機
28                     channel.QueueBind(orderQueueDeadName, "exchange-D", routingKeyDead);
29                     var dic = new Dictionary<string, object>
30                     {
31                         {"x-message-ttl", 1800000},//隊列上消息過期時間,應小於隊列過期時間 60000 1800000
32                         //{"x-message-ttl", 120000},//隊列上消息過期時間,應小於隊列過期時間 60000 1800000
33                         {"x-dead-letter-exchange", "exchange-D"},//過期消息轉向路由
34                         {"x-dead-letter-routing-key", routingKeyDead}//過期消息轉向路由相匹配routingkey
35                     };
36                     channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定義一個Direct類型交換機
37                     //創建一個名叫"zzhello"的消息隊列
38                     channel.QueueDeclare(orderQueueName, true, false, false, dic);
39                     //將隊列綁定到交換機
40                     channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic);
41                     var body = Encoding.UTF8.GetBytes(routeKey.ToString());
42                     //向該消息隊列發送消息message
43                     channel.BasicPublish("exchange-L",
44                             routingKeyDelay,
45                             null,
46                             body);
47                 }
48             }
49             return Task.CompletedTask;
50         }

 

3.消息隊列(消費者)

 1      /// <summary>
 2         /// 支付成功后處理消費者
 3         /// </summary>
 4         /// <returns></returns>
 5         [Obsolete]
 6         public Task ConsumerOrder(string routeKey)
 7         {
 8             const string orderQueueName = "zzhelloJd"; //定義消息隊列名
 9             var routingKeyDelay = "queue-delay-" + routeKey;//消息隊列路由
10             const string routingKeyDead = "queue-dead-routing-jd"; //死信隊列路由
11             var factory = new ConnectionFactory
12             {
13                 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用戶名
14                 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密碼
15                 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip
16             };
17             using (var connection = factory.CreateConnection())
18             {
19                 using (var channel = connection.CreateModel())
20                 {
21                     var dic = new Dictionary<string, object>
22                     {
23                         {"x-message-ttl", 1800000},//隊列上消息過期時間,應小於隊列過期時間 60000 1800000
24                         {"x-dead-letter-exchange", "exchange-D"},//過期消息轉向路由
25                         {"x-dead-letter-routing-key", routingKeyDead}//過期消息轉向路由相匹配routingkey
26                     };
27                     channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定義一個Direct類型交換機
28                     //創建一個名叫"zzhello"的消息隊列
29                     channel.QueueDeclare(orderQueueName, true, false, false, dic);
30                     //將隊列綁定到交換機
31                     channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic);
32                     //回調,當consumer收到消息后會執行該函數
33                     //var consumer = new EventingBasicConsumer(channel);
34                     //consumer.Received += (model, ea) =>
35                     //{
36                     //    var body = ea.Body;
37                     //    var message = Encoding.UTF8.GetString(body);
38                     //};
39 
40                     ////消費隊列"hello"中的消息
41                     //channel.BasicConsume(queue: name,
42                     //                     autoAck: true,
43                     //                     consumer: consumer);
44 
45                     var consumer = new QueueingBasicConsumer(channel);
46                     //消費隊列,並設置應答模式為程序主動應答
47                     channel.BasicConsume(orderQueueName, false, consumer);
48 
49                     //阻塞函數,獲取隊列中的消息
50                     var ea = consumer.Queue.Dequeue();
51                     var bytes = ea.Body;
52                     var str = Encoding.UTF8.GetString(bytes);
53                     Console.WriteLine("隊列消息:" + str);
54                     //回復確認
55                     channel.BasicAck(ea.DeliveryTag, false);
56 
57                 }
58             }
59             return Task.CompletedTask;
60         }

 

4.消費死信隊列

 1     public class DeadListener : RabbitListener
 2     {
 3 
 4         #region Fileds
 5 
 6         // 因為Process函數是委托回調,直接將其他Service注入的話兩者不在一個scope,
 7         // 這里要調用其他的Service實例只能用IServiceProvider CreateScope后獲取實例對象
 8         private readonly IServiceProvider _services;
 9         private readonly ILogger<RabbitListener> _logger;
10 
11         #endregion
12 
13 
14         #region Ctors
15 
16         public DeadListener(IServiceProvider services, IConfiguration configuration, ILogger<RabbitListener> logger) : base(configuration)
17         {
18             RouteKey = "queue-dead-routing-jd";
19             QueueName = "zzhello_dead_Jd";
20             _logger = logger;
21             _services = services;
22         }
23 
24         #endregion
25 
26 
27         #region Methods
28 
29         protected override bool Process(string message)
30         {
31             var taskMessage = message;
32             if (taskMessage == null)
33             {
34                 // 返回false 的時候回直接駁回此消息,表示處理不了
35                 return false;
36             }
37             try
38             {
39                 using (var scope = _services.CreateScope())
40                 {
41                     var xxxService = scope.ServiceProvider.GetRequiredService<IOrderService>();
42                     //_logger.LogInformation($"開始更新訂單狀態:UpdateOrderCancel,message:{message}");
43                     //LoggerHelper.Write($"開始更新訂單狀態:UpdateOrderCancel,message:{message}");
44                     var re= xxxService.UpdateOrderCancel(Guid.Parse(taskMessage)).Result;
45                     //_logger.LogInformation($"結束更新訂單狀態:UpdateOrderCancel,message:{message},result:{re}");
46                     //LoggerHelper.Write($"結束更新訂單狀態:UpdateOrderCancel,message:{message},result:{re}");
47                     if (re)
48                     {
49                         return true;
50                     }
51                     else
52                     {
53                         return false;
54                     }
55                 }
56             }
57             catch (Exception ex)
58             {
59                 _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
60                 _logger.LogError(-1, ex, "Process fail");
61                 LoggerHelper.Write($"DeadListener 自動更新訂單狀態報錯,錯誤提示  :{ex}");
62                 return false;
63             }
64         }
65         #endregion
66     }
 1     public class RabbitListener : IHostedService
 2     {
 3         private readonly IConnection _connection;
 4         private readonly IModel _channel;
 5 
 6         protected RabbitListener(IConfiguration configuration)
 7         {
 8             try
 9             {
10                 var factory = new ConnectionFactory
11                 {
12                     // 這是我這邊的配置,自己改成自己用就好
13                     UserName = configuration["RabbitMQConfig:RabbitUserName"],//用戶名
14                     Password = configuration["RabbitMQConfig:RabbitPassword"],//密碼
15                     HostName = configuration["RabbitMQConfig:RabbitHost"]//rabbitmq ip
16                     //Port = options.Value.RabbitPort,
17                 };
18                 _connection = factory.CreateConnection();
19                 _channel = _connection.CreateModel();
20             }
21             catch (Exception ex)
22             {
23                 Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
24             }
25         }
26 
27         public Task StartAsync(CancellationToken cancellationToken)
28         {
29             Register();
30             return Task.CompletedTask;
31         }
32         protected string RouteKey;
33         protected string QueueName;
34 
35         // 處理消息的方法
36         protected virtual bool Process(string message)
37         {
38             throw new NotImplementedException();
39         }
40 
41         // 注冊消費者監聽在這里
42         private void Register()
43         {
44             Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
45             // channel.ExchangeDeclare(exchange: "exchange-D", type: "topic");
46             _channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null);
47             _channel.QueueDeclare(QueueName, true, false, false, null);
48             _channel.QueueBind(QueueName, "exchange-D", RouteKey);
49 
50             //啟用QoS,每次預取10條,避免消費不過來導致消息堆積在本地緩存
51             _channel.BasicQos(0, 10, false);
52             var consumer = new EventingBasicConsumer(_channel);
53             consumer.Received += (model, ea) =>
54             {
55                 var body = ea.Body;
56                 var message = Encoding.UTF8.GetString(body);
57                 var result = Process(message);
58                 if (result)
59                 {
60                     _channel.BasicAck(ea.DeliveryTag, false);//啟用手動ack機制后,沒有及時ack導致的隊列異常(Unacked過多)
61                 }
62                 else
63                 {
64                     _channel.BasicNack(ea.DeliveryTag, false, true);// 啟用nack+重入隊 機制后,導致的死循環(Ready過多)
65                 }
66 
67             };
68             _channel.BasicConsume(queue: QueueName, consumer: consumer);
69         }
70 
71         public void DeRegister()
72         {
73             _connection.Close();
74         }
75 
76 
77         public Task StopAsync(CancellationToken cancellationToken)
78         {
79             _connection.Close();
80             return Task.CompletedTask;
81         }
82     }

消息隊列常見問題

  在這里我先說一下我遇到的問題吧!不知道什么原因會產生異常消息,也就是業務失敗產生的unasked消息,這個問題該如何處理

  處理方式是啟用nack+重入隊 機制后,但是這種方式會 導致的死循環(Ready過多),所以要啟用Qos和ack機制后,沒有及時ack導致的隊列堵塞

  啟用QoS,每次預取5條消息,避免消息處理不過來,全部堆積在本地緩存里

  channel.BasicQos(0, 5, false);

  開啟QoS,當RabbitMQ的隊列達到5條Unacked消息時,不會再推送消息給Consumer;

  這樣問題就解決了!!!!!

  其他常見問題參考     https://www.cnblogs.com/sw008/p/11054331.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM