dotnet core使用IO合並技巧輕松實現千萬級消息推送


之前講述過多路復用實現單服百萬級別RPS吞吐,但在文中有一點是沒有說的就是消息IO合並,如果缺少了消息IO合並即使怎樣多路復用也很難達到百萬級別的請求響畢竟所有應用層面的網絡IO讀寫都是非常損耗性能的(需要硬件配置很高的服務器)。這一章主要講述的是IO合並的應用,並通過這個特性實現普通單服務千萬級別的消息推送測試。

什么是消息IO合並

所謂的消息IO合並即是由原來一個消息對應一個網絡讀寫設計成多個消息共享一個網絡讀寫。那這樣的設計到底會帶來多大的性能提升,最簡單的對比場就是每次執行1條SQL執行1萬次和直接批執行1萬條SQL的差別,相信做過的朋友一定非常清楚其性能提升的幅度。那在網絡通訊中如何設計才能讓多個消息進行IO合並呢?作者在實際實踐中的方式有兩種:1)通過定時器把隊列中的所有消息定期合並發送,2)通過一個狀態機歸遞消息隊列,一旦隊列存在消息一次過合並發送。定時器這種比較損耗性能,在連接量大的情況存在延時間相互影響;對於后者則比較好控制很多也不存在延時性,原理發送消息進隊列后和網絡發送完成再回到狀態機檢測消息隊列狀態即可。 

消息推送相對於請求響應來說相還是簡單很多的,畢竟消息推送是單向並不需要有高效的響應機制。不過對於普通服務器間實現千萬級的消息推送還是需要做些規划,畢竟是需要在有限的IO讀寫量的情況來達到這么大規模的消息處理。還有這么大量的消息序列化和反序列化也是一非常損耗性能的事情,所以這次實踐並沒有使用Protobuf,而是采用自定義序列化。測試的通訊組件選擇Beetlex因為它具備了自動消息合並能力,並配合高效的多復路用機制在服務之間進行千萬級別的消息推變得簡單。

測試簡述

這一次測試主要是向服務端推着一個簡單的訂單信息,由客戶每次生成不同的訂單信息推送給服務端,服務器接收訂單消息后進行統計,並計算每秒接收的訂單數量。

消息結構

    public class Order
    {
        public long ID;
        public string Product;
        public int Quantity;
        public double Price;
        public double Total;
    }

創建訂單

private static long mId;
private static string[] mProducts = new string[] { "Apple", "Orange", "Banana", "Citrus", "Mango" };
private static int[] mQuantity = new int[] { 3, 10, 20, 23, 6, 9, 21 };
private static double[] mPrice = new double[] { 2.3, 1.6, 3.2, 4.6, 20, 4 };
public static Order CreateOrder()
{
      Order order = new Order();
      order.ID = System.Threading.Interlocked.Increment(ref mId);
      order.Product = mProducts[order.ID % mPrice.Length];
      order.Quantity = mQuantity[order.ID % mQuantity.Length];
      order.Price = mPrice[order.ID % mPrice.Length];
      order.Total = order.Quantity * order.Price;
      return order;
 }

基於測試資源有限,這次的測試並沒像之前跑PRS那樣采用Protobuf,因為這量的對象處理量實在太大,測試的硬件環境不變所以采了自定義的序列化方式,具體可以參考源代碼。

接收端代碼

        public override void SessionPacketDecodeCompleted(IServer server, PacketDecodeCompletedEventArgs e)
        {
            PushMessages.Order order = (PushMessages.Order)e.Message;
            if (order.ID > MaxOrerID)
                MaxOrerID = order.ID;
            System.Threading.Interlocked.Increment(ref Count);
        }

由於是接收推送的消息,服務端接收消息后統計相關數量即可完成,對於之前的RPS測試所需處理的東西就少很多了。

推送端壓測代碼

        public void Run()
        {
            foreach (var item in mClients)
                System.Threading.ThreadPool.QueueUserWorkItem(OnRun, item);
        }
        private void OnRun(object state)
        {
            AsyncTcpClient item = (AsyncTcpClient)state;
            while (true)
            {
                Order order = OrderFactory.CreateOrder();
                item.Send(order);
                if (order.ID > MaxOrerID)
                    MaxOrerID = order.ID;
                System.Threading.Interlocked.Increment(ref Count);
                if (item.Count > 2000)
                    System.Threading.Thread.Sleep(1);
            }
        }

為了防止壓爆連接內部的消息隊列,壓測端當連接隊列超過2000個消息的時候停止一下。由於采用了消息合並機制所以並不需要太多連接,在整個測試過程中開啟了三個壓測實例,每個實例使用5個連接,換句話說BeetleX通過15個連接,實現千萬級消息的推送能力。

測試服務器資源

這次測試使用了兩家雲服務器,第一家名字就不說了,開啟了V16核的虛擬服務器,內部帶寬6G和100萬pps,結果實際壓測2G帶寬就壓不上去了,剛開始以為是linux系統要配置問題,換了windows系統試一下還是不行……,最終還是換回了阿里雲測,在v12核的虛擬服務器上順利完成了這一次測試。

服務端: v12核,24G內存,操作系統ubuntu16.04 一台,內網最大帶寬4Gb.

壓測端: v12核,24G內存,操作系統ubuntu16.04,內網最大帶寬4Gb, 兩台(主要測試方式有些暴力一台無法達到壓測目標)

測試結果

二台壓測機共開啟了3個實例,每個實例5個連接,每個連接應用層處理的buffer 32k;整個測結果消息推送量達到了1000萬個/秒。服務端記錄接收IO每秒15000次,平均每次receive得到的消息大概在600個左右。以下是測試情況的截圖:

服務程序統計情況

服務端CPU情況

網絡使用情況

下功測試代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM