[超簡潔]EasyQ框架-應對WEB高並發業務(秒殺、抽獎)等業務


背景介紹

    這幾年一直在摸索一種框架,足夠簡單,又能應付很多高並發高性能的需求。研究過一些框架思想如DDD DCI,也實踐過CQRS框架。

但是總覺得復雜度高,門檻也高,自己學都吃力,如果團隊新人更難接受。所以自從寫了最簡單的BaseContext類之后很長一段時間內都沒有加任何代碼。(basecontext只有10行內代碼)

之前有個秒殺業務要做,用了MVC的異步Action隊列處理請求,感覺還是蠻不錯,所以跟另外一位同事一同把這個功能整合進這個baseContext里面,既沒有用第三方的Queue(如 RabbitMQ )也沒有另外開一個宿主進程Exe。

總之“simple is good!”

EasyQ

EasyQ是一個輕量級的專門用來處理高並發HTTP請求的框架。
應用MVC異步Action機制實現了相同業務單線程排隊處理,沒有用任何讀寫鎖。
可以指定某一種業務創建一條隊列,也可以指定某一種業務+某一種數據ID作為一條隊列。
如秒殺商品業務,可以指定所有秒殺業務都使用單線程排隊處理,避免臟讀 臟寫。
但是這樣做的話,所有秒殺商品都會進入排隊,顯然是不科學的。
所以擴展一種方式是: 秒殺業務+商品ID 作為隊列名。
當然不止商品ID,也可以是用戶ID,商品分類等任意字符串作為隊列名的后綴。
  GITHUB地址:https://github.com/BTteam/EasyQ

如能占用您一點時間,提出一點改進的意見,不勝感激!


使用說明

HomeController 是入口頁面,需要繼承AsyncController,使用MVC的異步Action
BT.Contexts項目放置業務代碼,所有Context需要繼承抽象類QueueBaseContext,並且實現3個方法
1,InitData 初始化數據,數據庫獲取數據的方法應該寫在此處
2,Interact 交互操作,數據模型之間的交互,業務代碼的各種計算、判斷等
3,Persist 持久化操作,數據保存到數據庫的操作應當寫在此處。
這3個方法的默認執行步驟非常簡單 1=》2=》3
  這個類是封裝了隊列、線程的操作,是EasyQ的核心類。
在HomeController使用Context時,首先應該分開2個Action 如 TestAsync TestCompleted。這是MVC異步Action的機制決定
TestAsync用來啟動異步,TestCompleted是異步完成后的回調操作。這2個方法必須成對出現。具體原理請參考MSDN

調用是URL為:{host}/home/text 注意Async后綴在路由時會被去掉。
SetAsync方法必須傳入AsyncManager對象,key是可選參數,如上所述是用來細分隊列的。
如果想根據商品ID生成隊列,不同商品的秒殺行為在不同的隊列中排隊,就在此處用SetAsync傳入key是商品ID

public void TestAsync(string key)
{
//GET DATA
TestContext context = new TestContext(1);
context.SetAsync(AsyncManager, key);//參數為產品隊列標識
context.Execute();
}

 

再看回調方法

public ActionResult TestCompleted()
{
var result = AsyncManager.Parameters["response"];

return Content(JsonConvert.SerializeObject( result));
}

 所有Context執行后的結果以Parameters["response"]返回

 

核心解析

QueueBaseContext類

public abstract class  QueueBaseContext:BaseContext
    {
       private ILog log = LogManager.GetLogger(typeof(QueueBaseContext));
       private static ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>> killQueues = new ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>>();
        private static ConcurrentDictionary<string, Task> taskDic = new ConcurrentDictionary<string, Task>();
       //場景使用步驟  編寫好 1.Interact() 2.Persist() 3.在api調用初始場景后,調用QueueContextAsync()
        private AsyncManager AsyncManager;
        private string quenekey;

       public void SetAsync(AsyncManager _AsyncManager)
       {
           SetAsync(_AsyncManager, "");
       }
       public void SetAsync(AsyncManager _AsyncManager, string _quenekey)
       {
           quenekey = _quenekey;
           this.AsyncManager = _AsyncManager;
       }

       public abstract void InitData();
       public override string Execute()
       {
           if (AsyncManager == null)
           {
               throw new Exception("必須調用SetAsync 設置AsyncManager對象");

           }
         var runtimeType=  this.GetType();
           var qKey = runtimeType.FullName + quenekey;
          // typeof().
           AsyncManager.OutstandingOperations.Increment();
           //開一個隊列 判斷是否有隊列 
           if (killQueues.ContainsKey(qKey) == false)
           {
               killQueues.TryAdd(qKey, new ConcurrentQueue<AsyncManager>(new[] {AsyncManager}));
           }
           else
           {
               killQueues[qKey].Enqueue(AsyncManager);

           }
           Action ac = () =>
           {
              
               while (killQueues[qKey].IsEmpty == false)
               {
                 //  Thread.Sleep(15000);
                   log.DebugFormat("while 進來了  killQueueitemCount length:{0} ,Q num{1}", killQueues[qKey].Count, killQueues.Count);
                   AsyncManager item;
                   killQueues[qKey].TryDequeue(out item);//取出隊列的一個進行處理
                   try
                   {

                       InitData();
                       if (Interact())//對應業務邏輯
                           Persist();

                       AsyncManager.Parameters["response"] = new { Code = this.StatusCode};
                       AsyncManager.OutstandingOperations.Decrement();
                   }
                   catch (Exception e)
                   {
                       log.ErrorFormat("出錯,e msg:{0} ,trace:{1}", e.Message, e.StackTrace);
                       AsyncManager.Parameters["response"] = new { Code = ResponseCode.DataError, Description = "服務器錯誤,請重試" };
                       AsyncManager.OutstandingOperations.Decrement();
                   }
               }
               //remove q
           };
           if (taskDic.ContainsKey(qKey) == false)
           {
               taskDic.TryAdd(qKey, Task.Factory.StartNew(ac));
           }
           if (taskDic[qKey].IsCompleted || taskDic[qKey].IsFaulted)
           {
               taskDic[qKey] = Task.Factory.StartNew(ac);
           }
           return "";
       }
     

    }

構建了2個字典 

killQueues :隊列名作為KEY 隊列實例作為Value
taskDic:隊列名作為KEY 隊列指定的執行Task作為Value

處理邏輯是創建一個Task 循環隊列每次取出一個項,執行業務操作。直到隊列為空。
如果在上一個Task運行過程中有新的請求加入,則不需要新建Task,只需要繼續加入隊列尾部。上一個Task會執行對應的業務操作。
隊列中的每一個元素代表一次業務操作,操作完畢之后會調用
AsyncManager.OutstandingOperations.Decrement();
用來返回異步結果給請求線程。這樣HTTP請求結果就返回給用戶了。不需要等到隊列完畢。
qKey可以設置任意字符串,用來細分隊列名稱。

隊列最好用
ConcurrentQueue
因為在入列出列操作時處於多線程共享隊列,必須要用線程安全的隊列類。


存在缺陷

1 目前是單點設計,只能在單機上運行,還在研究橫向擴展。
2 性能還需要優化
3 由於使用異步Action 導致每個Action必須一分為二。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM