性能提升五十倍:消息隊列延時聚合通知的重要性


前言

 

 

這個話題對我而言,是影響很久的事情。從第一次使用消息隊列開始,業務背景是報名系統通知到我們的系統。正常流量下數據都能正常通知過來,但遇到導入報名人時,采用了Task異步通知,數據量一大,隊列就死了。當時是盡量采用同步方式,減少並發量。

 

 后來業務上有了專門的營銷系統,各種數據的增刪改都要進營銷系統,我采用的方式在倉儲層對需要通知的表的任何更新都通知到隊列,這樣的方式幾乎對其他業務無侵犯。

 


好處有,壞處也有。很多批量任務的更新如果采用同步方式頻繁通知是十分浪費速度的,既影響數據的更新速度,也對隊列帶來了挑戰。我曾經專門拉了個分支來優化批量任務,但由於需要涉及很多批量任務最后不了了之。
更合理的推送模型應該是這樣,更新消息先到內存隊列,積累一段時間(5秒或30秒)后,聚合到一起推送到消息隊列,如下圖:

 

挑戰過去

其實也說不上是問題,原因知道,解決方法也知道。只是現狀還能支撐,就沒有去解決,但這些事情總要面對的。挑戰過去的糟糕代碼,優化提升性能,本身就是一個技術成長的過程。

邁出第一步

第一步當然是Demo,先列出代碼。先貼上一個基於Rabbitmq.Client的客戶端幫助代碼,用於推送單條數據和多條數據。

public class RabbitProvider { public const string RABBITMQURL = "amqp://test:test@rabbitmq.login1.com:5672/test"; private static IConnection conn; /// <summary> /// 獲取連接。 /// </summary> /// <param name="url"></param> /// <returns></returns> public static IConnection CreateConnection(string url) { ConnectionFactory factory = new ConnectionFactory(); factory.Uri = new Uri(url); factory.AutomaticRecoveryEnabled = true; IConnection conn = factory.CreateConnection(); return conn; } /// <summary> /// 單個 /// </summary> /// <param name="data"></param> public static void Publish<T>(string exchange, string queue, string route, T data) { if (conn == null || !conn.IsOpen) { conn = CreateConnection(RABBITMQURL); } using (IModel model = conn.CreateModel()) { model.ExchangeDeclare(exchange, ExchangeType.Direct); model.QueueDeclare(queue, false, false, false, null); model.QueueBind(queue, exchange, route, null); //IBasicProperties props = ch.CreateBasicProperties(); //FillInHeaders(props); // or similar // byte[] body = ComputeBody(props); // or similar model.BasicPublish(exchange, route, null, System.Text.Encoding.Default.GetBytes(data.ToString())); } } /// <summary> /// 多條數據 /// </summary> /// <param name="data"></param> public static void Publish<T>(string exchange, string queue, string route, List<T> data) { if (conn == null || !conn.IsOpen) { conn = CreateConnection(RABBITMQURL); } using (IModel model = conn.CreateModel()) { model.ExchangeDeclare(exchange, ExchangeType.Direct); model.QueueDeclare(queue, false, false, false, null); model.QueueBind(queue, exchange, route, null); //IBasicProperties props = ch.CreateBasicProperties(); //FillInHeaders(props); // or similar // byte[] body = ComputeBody(props); // or similar foreach (var item in data) { model.BasicPublish(exchange, route, null, System.Text.Encoding.Default.GetBytes(item.ToString())); } } } } 

也許在部分人眼里能提供支持單條和多條推送的方式已經能解決絕大多數問題,看起來確實如此。但單純的推送批量數據是有業務方發起,是對每個批量任務都有較大侵入的,雖然它很好,但不夠好。接下來我們貼上基於BlockingCollection<T>提供的線程安全集合來完成的隊列代碼。

 public class DANQueue<T> : IDANQueue<T> { private static BlockingCollection<DANMessage<T>> GlobalCollection; static DANQueue() { GlobalCollection = new BlockingCollection<DANMessage<T>>(); } /// <summary> /// 添加 /// </summary> /// <param name="item"></param> /// <returns></returns> public static bool TryAdd(DANMessage<T> item) { return GlobalCollection.TryAdd(item); } /// <summary> /// 獲取一個 /// </summary> /// <param name="item"></param> public static DANMessage<T> TryTake() { var msg = new DANMessage<T>(); if (GlobalCollection.TryTake(out msg)) { return msg; } return null; } /// <summary> /// 獲取所有 /// </summary> /// <returns></returns> public static List<DANMessage<T>> TryTakeAll() { var list = new List<DANMessage<T>>(); while (true) { var q = TryTake(); if (q == null) { return list; } list.Add(q); } } /// <summary> /// 統計 /// </summary> public static int Count() { return GlobalCollection.Count; } } 

測試業務Demo

    /// <summary> /// 用戶 /// </summary> public class User { public string Mobile { get; set; } public long CompanyId { get; set; } } /// <summary> /// 倉儲 /// </summary> public class Repository<TDocument> : IRepository<TDocument> { public bool Update(User user) { DANQueue<User>.TryAdd(new DANMessage<User>() { Body = user, Key = user.CompanyId + user.Mobile, Type = typeof(User).Name, TimeStamp = DateTime.Now.Ticks }); return true; } } 

分別測試批量更新數據下循環通知和只通知一次耗時,代碼如下:

  public const string ExchangeStr = "fanTest"; public const string QueueStr = "fanQueueTest"; private static string TypeUserName = typeof(User).Name; static void Main(string[] args) { //這里就不引入依賴注入了。 Repository<User> repository = new Repository<User>(); Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); for (var i = 0; i <= 1000; i++) { var user = new User() { CompanyId = 13232, Mobile = "11111" + i }; repository.Update(user); RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, DANQueue<User>.TryTake()); } stopwatch.Stop(); Console.WriteLine($"100000UpdateWithPush-Time:" + stopwatch.ElapsedMilliseconds); //批量測試。 stopwatch.Restart(); for (var i = 0; i <= 1000; i++) { var user = new User() { CompanyId = 13232, Mobile = "11111" + i }; repository.Update(user); } RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, DANQueue<User>.TryTakeAll()); stopwatch.Stop(); Console.WriteLine($"100000UpdateDelayPush-Time:" + stopwatch.ElapsedMilliseconds); Console.ReadLine(); } } 

結果如下:

UpdateWithPush-Time:4103 UpdateDelayPush-Time:73 

這里列舉的只是1000條,當我改成1萬條的時候,隊列掛了!這充分說明了延時聚合通知的重要性。相同的環境下,循環通知支撐不了1萬,但聚合后只通知一次的情況下,10萬數據也花了9秒。雙方性能對比結果是指數級的。

UpdateDelayPush-Time:9671 

引入定時機制

上面已經對比了循環通知和聚合通知的性能,但普通的聚合十分侵入業務。每種類型的業務都需要引入代碼,使用不方便,而且維護起來也麻煩。這時候可以考慮引入定時任務來處理聚合通知。先來個1百萬的更新。

 System.Timers.Timer timer = new System.Timers.Timer(5000); timer.Elapsed += Timer_Elapsed; timer.Start(); //批量測試大量數據 for (var i = 0; i <= 1000000; i++) { var user = new User() { CompanyId = 13232, Mobile = "11111" + i }; repository.Update(user); } 

定時觸發的方法如下:

private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { var list = DANQueue<User>.TryTakeAll(); if (list.Count > 0) { RabbitProvider.Publish(ExchangeStr, QueueStr, TypeUserName, list); } } 

運行Debug測試,為方便顯示,我減少了一些列,只顯示queue名和發布速度,能達到每秒1萬左右的量。

| test | [fanQueueTest](/#/queues/test/fanQueueTest) | 10,569/s | | test | [fanQueueTest](/#/queues/test/fanQueueTest) | 12,336/s | 

談到此時的推送速度,再來回顧下剛開始循環通知的速度,每秒250左右,可見速度提升了50倍!

| test | [fanQueueTest](/#/queues/test/fanQueueTest) | 249/s | 

源碼

DAN : DelayAggregationNotice 延時聚合通知組件

總結

經過以上對比,性能從幾千就掛到支撐到每秒上萬的推送量,並且支撐百萬(更高級別沒測試)以上級更新依然健壯運行。
結果如此明顯,如果還沒有動力改變,那還有什么能拯救你呢?
這里的Timer以后可以替換成hangfire,因為hangfire有UI監控,可以查看狀態。hangfire貌似不推薦大數據量的參數,這些細節問題以后可以根據測試情況去取舍。

以上僅為了測試,如果要變成通用可復用,還有更長的路需要走,但比起分布式追蹤簡單多了,一步一步來,用目標約束自己慢慢實現。

本篇完畢,謝謝觀看。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM