背景介紹
這幾年一直在摸索一種框架,足夠簡單,又能應付很多高並發高性能的需求。研究過一些框架思想如DDD DCI,也實踐過CQRS框架。
但是總覺得復雜度高,門檻也高,自己學都吃力,如果團隊新人更難接受。所以自從寫了最簡單的BaseContext類之后很長一段時間內都沒有加任何代碼。(basecontext只有10行內代碼)
之前有個秒殺業務要做,用了MVC的異步Action隊列處理請求,感覺還是蠻不錯,所以跟另外一位同事一同把這個功能整合進這個baseContext里面,既沒有用第三方的Queue(如 RabbitMQ )也沒有另外開一個宿主進程Exe。
總之“simple is good!”
EasyQ
EasyQ是一個輕量級的專門用來處理高並發HTTP請求的框架。
應用MVC異步Action機制實現了相同業務單線程排隊處理,沒有用任何讀寫鎖。
可以指定某一種業務創建一條隊列,也可以指定某一種業務+某一種數據ID作為一條隊列。
如秒殺商品業務,可以指定所有秒殺業務都使用單線程排隊處理,避免臟讀 臟寫。
但是這樣做的話,所有秒殺商品都會進入排隊,顯然是不科學的。
所以擴展一種方式是: 秒殺業務+商品ID 作為隊列名。
當然不止商品ID,也可以是用戶ID,商品分類等任意字符串作為隊列名的后綴。
GITHUB地址:https://github.com/BTteam/EasyQ
如能占用您一點時間,提出一點改進的意見,不勝感激!
使用說明
HomeController 是入口頁面,需要繼承AsyncController,使用MVC的異步Action
BT.Contexts項目放置業務代碼,所有Context需要繼承抽象類QueueBaseContext,並且實現3個方法
1,InitData 初始化數據,數據庫獲取數據的方法應該寫在此處
2,Interact 交互操作,數據模型之間的交互,業務代碼的各種計算、判斷等
3,Persist 持久化操作,數據保存到數據庫的操作應當寫在此處。
這3個方法的默認執行步驟非常簡單 1=》2=》3
這個類是封裝了隊列、線程的操作,是EasyQ的核心類。
在HomeController使用Context時,首先應該分開2個Action 如 TestAsync TestCompleted。這是MVC異步Action的機制決定
TestAsync用來啟動異步,TestCompleted是異步完成后的回調操作。這2個方法必須成對出現。具體原理請參考MSDN
調用是URL為:{host}/home/text 注意Async后綴在路由時會被去掉。
SetAsync方法必須傳入AsyncManager對象,key是可選參數,如上所述是用來細分隊列的。
如果想根據商品ID生成隊列,不同商品的秒殺行為在不同的隊列中排隊,就在此處用SetAsync傳入key是商品ID
public void TestAsync(string key) { //GET DATA TestContext context = new TestContext(1); context.SetAsync(AsyncManager, key);//參數為產品隊列標識 context.Execute(); }
再看回調方法
public ActionResult TestCompleted() { var result = AsyncManager.Parameters["response"]; return Content(JsonConvert.SerializeObject( result)); }
所有Context執行后的結果以Parameters["response"]返回
核心解析
QueueBaseContext類
public abstract class QueueBaseContext:BaseContext { private ILog log = LogManager.GetLogger(typeof(QueueBaseContext)); private static ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>> killQueues = new ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>>(); private static ConcurrentDictionary<string, Task> taskDic = new ConcurrentDictionary<string, Task>(); //場景使用步驟 編寫好 1.Interact() 2.Persist() 3.在api調用初始場景后,調用QueueContextAsync() private AsyncManager AsyncManager; private string quenekey; public void SetAsync(AsyncManager _AsyncManager) { SetAsync(_AsyncManager, ""); } public void SetAsync(AsyncManager _AsyncManager, string _quenekey) { quenekey = _quenekey; this.AsyncManager = _AsyncManager; } public abstract void InitData(); public override string Execute() { if (AsyncManager == null) { throw new Exception("必須調用SetAsync 設置AsyncManager對象"); } var runtimeType= this.GetType(); var qKey = runtimeType.FullName + quenekey; // typeof(). AsyncManager.OutstandingOperations.Increment(); //開一個隊列 判斷是否有隊列 if (killQueues.ContainsKey(qKey) == false) { killQueues.TryAdd(qKey, new ConcurrentQueue<AsyncManager>(new[] {AsyncManager})); } else { killQueues[qKey].Enqueue(AsyncManager); } Action ac = () => { while (killQueues[qKey].IsEmpty == false) { // Thread.Sleep(15000); log.DebugFormat("while 進來了 killQueueitemCount length:{0} ,Q num{1}", killQueues[qKey].Count, killQueues.Count); AsyncManager item; killQueues[qKey].TryDequeue(out item);//取出隊列的一個進行處理 try { InitData(); if (Interact())//對應業務邏輯 Persist(); AsyncManager.Parameters["response"] = new { Code = this.StatusCode}; AsyncManager.OutstandingOperations.Decrement(); } catch (Exception e) { log.ErrorFormat("出錯,e msg:{0} ,trace:{1}", e.Message, e.StackTrace); AsyncManager.Parameters["response"] = new { Code = ResponseCode.DataError, Description = "服務器錯誤,請重試" }; AsyncManager.OutstandingOperations.Decrement(); } } //remove q }; if (taskDic.ContainsKey(qKey) == false) { taskDic.TryAdd(qKey, Task.Factory.StartNew(ac)); } if (taskDic[qKey].IsCompleted || taskDic[qKey].IsFaulted) { taskDic[qKey] = Task.Factory.StartNew(ac); } return ""; } }
構建了2個字典
killQueues :隊列名作為KEY 隊列實例作為Value
taskDic:隊列名作為KEY 隊列指定的執行Task作為Value
處理邏輯是創建一個Task 循環隊列每次取出一個項,執行業務操作。直到隊列為空。
如果在上一個Task運行過程中有新的請求加入,則不需要新建Task,只需要繼續加入隊列尾部。上一個Task會執行對應的業務操作。
隊列中的每一個元素代表一次業務操作,操作完畢之后會調用
AsyncManager.OutstandingOperations.Decrement();
用來返回異步結果給請求線程。這樣HTTP請求結果就返回給用戶了。不需要等到隊列完畢。
qKey可以設置任意字符串,用來細分隊列名稱。
隊列最好用
ConcurrentQueue
因為在入列出列操作時處於多線程共享隊列,必須要用線程安全的隊列類。
存在缺陷
1 目前是單點設計,只能在單機上運行,還在研究橫向擴展。
2 性能還需要優化
3 由於使用異步Action 導致每個Action必須一分為二。