目錄:
功能介紹
消息隊列簡介及原理
代碼與實現
消息隊列常見問題
功能介紹
一 . 簡單介紹一下要做的功能,用戶前台下單之后,如果用戶未支付,30分鍾后訂單會自動取消,訂單狀態和庫存變回原來狀態和庫存,我們的后台使用asp.net core 2.0開發,而asp.net core后台的定時任務 需要添加服務 services.AddHostedService<DeadListener>(); 實現類直接繼承IHostedService接口,接口會調用 啟動 方法 StartAsync 。好了,不多介紹啟動定時任務,(主要介紹的是消息隊列RabbitMQ)然后 簡單的邏輯就是 用戶下單會把一條消息插入生產隊列中,當然消息隊列的配置是30分鍾,30分鍾之內如果用戶支付,就會調用消費者接口,將消息消費掉,如果30分鍾沒有支付,超時消息會到死信隊列中,然后后台任務會檢查到死信隊列中的消息,將消息消費掉,過程中會改訂單狀態等
消息隊列簡介及原理
二 . 再簡單介紹一下消息隊列吧,我也是第一次用,網上太多帖子了,哈哈
- ConnectionFactory(連接管理器):應用程序與Rabbit之間建立連接的管理器,程序代碼中使用;
- Channel(信道):消息推送使用的通道;
- Exchange(交換器):用於接受、分配消息;
- Queue(隊列):用於存儲生產者的消息;
- RoutingKey(路由鍵):用於把生成者的數據分配到交換器上;
- BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;
三. 工作機制
生產者、消費者和代理
在了解消息通訊之前首先要了解3個概念:生產者、消費者和代理。
生產者:消息的創建者,負責創建和推送數據到消息服務器;
消費者:消息的接收方,用於處理數據和確認消息;
代理:就是RabbitMQ本身,用於扮演“快遞”的角色,本身不生產消息,只是扮演“快遞”的角色。
消息發送原理
首先你必須連接到Rabbit才能發布和消費消息,那怎么連接和發送消息的呢?
你的應用程序和Rabbit Server之間會創建一個TCP連接,一旦TCP打開,並通過了認證,認證就是你試圖連接Rabbit之前發送的Rabbit服務器連接信息和用戶名和密碼,有點像程序連接數據庫,使用Java有兩種連接認證的方式,后面代碼會詳細介紹,一旦認證通過你的應用程序和Rabbit就創建了一條AMQP信道(Channel)。
信道是創建在“真實”TCP上的虛擬連接,AMQP命令都是通過信道發送出去的,每個信道都會有一個唯一的ID,不論是發布消息,訂閱隊列或者介紹消息都是通過信道完成的。
為什么不通過TCP直接發送命令?
對於操作系統來說創建和銷毀TCP會話是非常昂貴的開銷,假設高峰期每秒有成千上萬條連接,每個連接都要創建一條TCP會話,這就造成了TCP連接的巨大浪費,而且操作系統每秒能創建的TCP也是有限的,因此很快就會遇到系統瓶頸。
如果我們每個請求都使用一條TCP連接,既滿足了性能的需要,又能確保每個連接的私密性,這就是引入信道概念的原因。
四.消息持久化
Rabbit隊列和交換器有一個不可告人的秘密,就是默認情況下重啟服務器會導致消息丟失,那么怎么保證Rabbit在重啟的時候不丟失呢?答案就是消息持久化。
當你把消息發送到Rabbit服務器的時候,你需要選擇你是否要進行持久化,但這並不能保證Rabbit能從崩潰中恢復,想要Rabbit消息能恢復必須滿足3個條件:
- 投遞消息的時候durable設置為true,消息持久化,代碼:channel.queueDeclare(x, true, false, false, null),參數2設置為true持久化;
- 設置投遞模式deliveryMode設置為2(持久),代碼:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),參數3設置為存儲純文本到磁盤;
- 消息已經到達持久化交換器上;
- 消息已經到達持久化的隊列;
持久化工作原理
Rabbit會將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費之后,Rabbit會把這條消息標識為等待垃圾回收。
持久化的缺點
消息持久化的優點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬盤要比寫入內存性能較低很多,從而降低了服務器的吞吐量,盡管使用SSD硬盤可以使事情得到緩解,但他仍然吸干了Rabbit的性能,當消息成千上萬條要寫入磁盤的時候,性能是很低的。
所以使用者要根據自己的情況,選擇適合自己的方式。
代碼與實現
1.聲明隊列的參數說明
1 //聲明隊列 2 channel.QueueDeclare 3 ( 4 queue: QueueName, //隊列名稱 5 durable: false, //隊列是否持久化.false:隊列在內存中,服務器掛掉后,隊列就沒了;true:服務器重啟后,隊列將會重新生成.注意:只是隊列持久化,不代表隊列中的消息持久化!!!! 6 exclusive: false, //隊列是否專屬,專屬的范圍針對的是連接,也就是說,一個連接下面的多個信道是可見的.對於其他連接是不可見的.連接斷開后,該隊列會被刪除.注意,不是信道斷開,是連接斷開.並且,就算設置成了持久化,也會刪除. 7 autoDelete: true, //如果所有消費者都斷開連接了,是否自動刪除.如果還沒有消費者從該隊列獲取過消息或者監聽該隊列,那么該隊列不會刪除.只有在有消費者從該隊列獲取過消息后,該隊列才有可能自動刪除(當所有消費者都斷開連接,不管消息是否獲取完) 8 arguments: null //隊列的配置 9 );

1 //加載消息隊列(訂單超時) 2 //定時任務觸發器 3 services.AddHostedService<DeadListener>(); 4 或者 5 services.AddTransient<IHostedService, DeadListener>();
2.消息隊列(生產者)
1 /// <summary> 2 /// 訂單超時未處理消息隊列(生產者) 3 /// </summary> 4 /// <param name="routeKey"></param> 5 /// <returns></returns> 6 public Task PublisherOrder(string routeKey) 7 { 8 const string routingKeyDead = "queue-dead-routing-jd"; //死信隊列路由 9 var routingKeyDelay = "queue-delay-" + routeKey;//消息隊列路由 10 const string orderQueueName = "zzhelloJd"; //定義消息隊列名 11 const string orderQueueDeadName = "zzhello_dead_Jd"; //定義一個死信消息隊列名 12 13 var factory = new ConnectionFactory 14 { 15 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用戶名 16 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密碼 17 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip 18 }; 19 using (var connection = factory.CreateConnection()) 20 { 21 using (var channel = connection.CreateModel()) 22 { 23 //定義死信交換機 24 channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null); 25 //創建一個名叫"zzhello_dead"的消息隊列 26 channel.QueueDeclare(orderQueueDeadName, true, false, false, null); 27 //將死信隊列綁定到死信交換機 28 channel.QueueBind(orderQueueDeadName, "exchange-D", routingKeyDead); 29 var dic = new Dictionary<string, object> 30 { 31 {"x-message-ttl", 1800000},//隊列上消息過期時間,應小於隊列過期時間 60000 1800000 32 //{"x-message-ttl", 120000},//隊列上消息過期時間,應小於隊列過期時間 60000 1800000 33 {"x-dead-letter-exchange", "exchange-D"},//過期消息轉向路由 34 {"x-dead-letter-routing-key", routingKeyDead}//過期消息轉向路由相匹配routingkey 35 }; 36 channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定義一個Direct類型交換機 37 //創建一個名叫"zzhello"的消息隊列 38 channel.QueueDeclare(orderQueueName, true, false, false, dic); 39 //將隊列綁定到交換機 40 channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic); 41 var body = Encoding.UTF8.GetBytes(routeKey.ToString()); 42 //向該消息隊列發送消息message 43 channel.BasicPublish("exchange-L", 44 routingKeyDelay, 45 null, 46 body); 47 } 48 } 49 return Task.CompletedTask; 50 }
3.消息隊列(消費者)
1 /// <summary> 2 /// 支付成功后處理消費者 3 /// </summary> 4 /// <returns></returns> 5 [Obsolete] 6 public Task ConsumerOrder(string routeKey) 7 { 8 const string orderQueueName = "zzhelloJd"; //定義消息隊列名 9 var routingKeyDelay = "queue-delay-" + routeKey;//消息隊列路由 10 const string routingKeyDead = "queue-dead-routing-jd"; //死信隊列路由 11 var factory = new ConnectionFactory 12 { 13 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用戶名 14 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密碼 15 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip 16 }; 17 using (var connection = factory.CreateConnection()) 18 { 19 using (var channel = connection.CreateModel()) 20 { 21 var dic = new Dictionary<string, object> 22 { 23 {"x-message-ttl", 1800000},//隊列上消息過期時間,應小於隊列過期時間 60000 1800000 24 {"x-dead-letter-exchange", "exchange-D"},//過期消息轉向路由 25 {"x-dead-letter-routing-key", routingKeyDead}//過期消息轉向路由相匹配routingkey 26 }; 27 channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定義一個Direct類型交換機 28 //創建一個名叫"zzhello"的消息隊列 29 channel.QueueDeclare(orderQueueName, true, false, false, dic); 30 //將隊列綁定到交換機 31 channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic); 32 //回調,當consumer收到消息后會執行該函數 33 //var consumer = new EventingBasicConsumer(channel); 34 //consumer.Received += (model, ea) => 35 //{ 36 // var body = ea.Body; 37 // var message = Encoding.UTF8.GetString(body); 38 //}; 39 40 ////消費隊列"hello"中的消息 41 //channel.BasicConsume(queue: name, 42 // autoAck: true, 43 // consumer: consumer); 44 45 var consumer = new QueueingBasicConsumer(channel); 46 //消費隊列,並設置應答模式為程序主動應答 47 channel.BasicConsume(orderQueueName, false, consumer); 48 49 //阻塞函數,獲取隊列中的消息 50 var ea = consumer.Queue.Dequeue(); 51 var bytes = ea.Body; 52 var str = Encoding.UTF8.GetString(bytes); 53 Console.WriteLine("隊列消息:" + str); 54 //回復確認 55 channel.BasicAck(ea.DeliveryTag, false); 56 57 } 58 } 59 return Task.CompletedTask; 60 }
4.消費死信隊列
1 public class DeadListener : RabbitListener 2 { 3 4 #region Fileds 5 6 // 因為Process函數是委托回調,直接將其他Service注入的話兩者不在一個scope, 7 // 這里要調用其他的Service實例只能用IServiceProvider CreateScope后獲取實例對象 8 private readonly IServiceProvider _services; 9 private readonly ILogger<RabbitListener> _logger; 10 11 #endregion 12 13 14 #region Ctors 15 16 public DeadListener(IServiceProvider services, IConfiguration configuration, ILogger<RabbitListener> logger) : base(configuration) 17 { 18 RouteKey = "queue-dead-routing-jd"; 19 QueueName = "zzhello_dead_Jd"; 20 _logger = logger; 21 _services = services; 22 } 23 24 #endregion 25 26 27 #region Methods 28 29 protected override bool Process(string message) 30 { 31 var taskMessage = message; 32 if (taskMessage == null) 33 { 34 // 返回false 的時候回直接駁回此消息,表示處理不了 35 return false; 36 } 37 try 38 { 39 using (var scope = _services.CreateScope()) 40 { 41 var xxxService = scope.ServiceProvider.GetRequiredService<IOrderService>(); 42 //_logger.LogInformation($"開始更新訂單狀態:UpdateOrderCancel,message:{message}"); 43 //LoggerHelper.Write($"開始更新訂單狀態:UpdateOrderCancel,message:{message}"); 44 var re= xxxService.UpdateOrderCancel(Guid.Parse(taskMessage)).Result; 45 //_logger.LogInformation($"結束更新訂單狀態:UpdateOrderCancel,message:{message},result:{re}"); 46 //LoggerHelper.Write($"結束更新訂單狀態:UpdateOrderCancel,message:{message},result:{re}"); 47 if (re) 48 { 49 return true; 50 } 51 else 52 { 53 return false; 54 } 55 } 56 } 57 catch (Exception ex) 58 { 59 _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}"); 60 _logger.LogError(-1, ex, "Process fail"); 61 LoggerHelper.Write($"DeadListener 自動更新訂單狀態報錯,錯誤提示 :{ex}"); 62 return false; 63 } 64 } 65 #endregion 66 }
1 public class RabbitListener : IHostedService 2 { 3 private readonly IConnection _connection; 4 private readonly IModel _channel; 5 6 protected RabbitListener(IConfiguration configuration) 7 { 8 try 9 { 10 var factory = new ConnectionFactory 11 { 12 // 這是我這邊的配置,自己改成自己用就好 13 UserName = configuration["RabbitMQConfig:RabbitUserName"],//用戶名 14 Password = configuration["RabbitMQConfig:RabbitPassword"],//密碼 15 HostName = configuration["RabbitMQConfig:RabbitHost"]//rabbitmq ip 16 //Port = options.Value.RabbitPort, 17 }; 18 _connection = factory.CreateConnection(); 19 _channel = _connection.CreateModel(); 20 } 21 catch (Exception ex) 22 { 23 Console.WriteLine($"RabbitListener init error,ex:{ex.Message}"); 24 } 25 } 26 27 public Task StartAsync(CancellationToken cancellationToken) 28 { 29 Register(); 30 return Task.CompletedTask; 31 } 32 protected string RouteKey; 33 protected string QueueName; 34 35 // 處理消息的方法 36 protected virtual bool Process(string message) 37 { 38 throw new NotImplementedException(); 39 } 40 41 // 注冊消費者監聽在這里 42 private void Register() 43 { 44 Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}"); 45 // channel.ExchangeDeclare(exchange: "exchange-D", type: "topic"); 46 _channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null); 47 _channel.QueueDeclare(QueueName, true, false, false, null); 48 _channel.QueueBind(QueueName, "exchange-D", RouteKey); 49 50 //啟用QoS,每次預取10條,避免消費不過來導致消息堆積在本地緩存 51 _channel.BasicQos(0, 10, false); 52 var consumer = new EventingBasicConsumer(_channel); 53 consumer.Received += (model, ea) => 54 { 55 var body = ea.Body; 56 var message = Encoding.UTF8.GetString(body); 57 var result = Process(message); 58 if (result) 59 { 60 _channel.BasicAck(ea.DeliveryTag, false);//啟用手動ack機制后,沒有及時ack導致的隊列異常(Unacked過多) 61 } 62 else 63 { 64 _channel.BasicNack(ea.DeliveryTag, false, true);// 啟用nack+重入隊 機制后,導致的死循環(Ready過多) 65 } 66 67 }; 68 _channel.BasicConsume(queue: QueueName, consumer: consumer); 69 } 70 71 public void DeRegister() 72 { 73 _connection.Close(); 74 } 75 76 77 public Task StopAsync(CancellationToken cancellationToken) 78 { 79 _connection.Close(); 80 return Task.CompletedTask; 81 } 82 }
消息隊列常見問題
在這里我先說一下我遇到的問題吧!不知道什么原因會產生異常消息,也就是業務失敗產生的unasked消息,這個問題該如何處理
處理方式是啟用nack+重入隊 機制后,但是這種方式會 導致的死循環(Ready過多),所以要啟用Qos和ack機制后,沒有及時ack導致的隊列堵塞
啟用QoS,每次預取5條消息,避免消息處理不過來,全部堆積在本地緩存里
channel.BasicQos(0, 5, false);
開啟QoS,當RabbitMQ的隊列達到5條Unacked消息時,不會再推送消息給Consumer;
這樣問題就解決了!!!!!