一. 事件總線機制
1. 業務改造
引入時間總線的概念,采用CAP框架進行業務處理,同時利用RabbitMQ代替Redis隊列,采用SQLServer進行本地消息表的存儲, 采用 推模式 發送消息,我們習慣稱之為 發布-訂閱 模式。
關於基於CAP框架實現事件總線,詳見:
https://www.cnblogs.com/yaopengfei/p/13763500.html
https://www.cnblogs.com/yaopengfei/p/13776361.html
2. 分析
(1) 多件商品參與秒殺,訂閱者如何在一個接口中接收,如何處理這多個隊列?
答:這里思考的角度不對,因為隊列中存的是下單信息,而不是商品信息,所有用戶對秒殺下單信息都往這一個隊列中存的(不同商品的庫存肯定是存放在多個緩存中),下單信息里肯定有商品信息,訂閱者拿到商品信息,再進行不同的業務處理。
PS: 當然,也可以定義多個訂閱者接口。
(2).消息總線 發消息 給訂閱者,訂閱者的並發怎么處理?
答:CAP框架默認是單線程處理的,下一個消息的發送,需要等待上一個消息執行完才行,當然也可以多線程,但無法保證消費順序且訂閱者中可能會出現並發問題。
(3).由於是單線程發送,所以執行速度會相對慢,不如項目消費者主動獲取塊級處理那種模式塊。
答:單條數據的處理肯定不如批量提交效率高,這是一個無法回避的缺陷,這里用CAP框架的優勢在於基於本地消息表的異常處理很友好。
核心代碼:
cap框架注冊代碼:

services.AddCap(x => { //-----------------------------一.聲明存儲類型--------------------------------- //1. 使用SQLServer存儲 //還需要配合上面EF上下文的注入 services.AddDbContext x.UseEntityFramework<ESHOPContext>(); //EFCore配置 //-----------------------------二.聲明消息隊列類型---------------------------------//1.使用RabbitMq隊列存儲 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; //rb.QueueMessageExpires = 24 * 3600 * 10; //隊列中消息自動刪除時間(默認10天) }); //-----------------------------三.添加后台監控,用於人工干預--------------------------------- //-----------------------------四.通用配置--------------------------------- x.ConsumerThreadCount = 1; //消費者線程並行處理消息的線程數,提高消費速度,但這個值大於1時,將不能保證消息執行的順序,且可能存在並發問題。 });
發布訂閱核心代碼:

/// <summary> ///09-引入事件總線RabbitMQ /// </summary> /// <param name="userId">用戶編號</param> /// <param name="arcId">商品編號</param> /// <param name="totalPrice">訂單總額</param> /// <param name="requestId">請求ID</param> /// <param name="goodNum">用戶購買的商品數量</param> /// <returns></returns> public string POrder9([FromServices] ICapPublisher _capBus, string userId, string arcId, string totalPrice, string requestId = "125643", int goodNum = 1) { int tLimits = 100; //限制請求數量 int tSeconds = 1; //限制秒數 string limitKey = $"LimitRequest{arcId}";//受限商品ID int tGoodBuyLimits = 3; //用戶單個商品可以購買的數量 string userBuyGoodLimitKey = $"userBuyGoodLimitKey-{userId}-{arcId}"; //用戶單個商品的限制key string userRequestId = requestId; //用戶下單頁面的請求ID string arcKey = $"{arcId}-sCount"; //該商品庫存key try { //調用lua腳本 //參數說明: var result = RedisHelper.EvalSHA(_cache.Get<string>("SeckillLua1"), "ypf12345", tLimits, tSeconds, limitKey, goodNum, tGoodBuyLimits, userBuyGoodLimitKey, userRequestId, arcKey); if (result.ToString() == "1") { //2. 將下單信息存到消息隊列中 var orderNum = Guid.NewGuid().ToString("N"); _capBus.Publish("seckillGoods", $"{userId}-{arcId}-{totalPrice}-{orderNum}"); //3. 把部分訂單信息返回給前端 return $"下單成功,訂單信息為:userId={userId},arcId={arcId},orderNum={orderNum}"; } else { //請求被禁止,或者是商品賣完了 throw new Exception($"沒搶到"); } } catch (Exception ex) { //lua回滾 RedisHelper.EvalSHA(_cache.Get<string>("SeckillLuaCallback1"), "ypf12345", limitKey, userBuyGoodLimitKey, userRequestId, arcKey, goodNum); throw new Exception(ex.Message); } } /// <summary> /// 訂閱者的方法 /// //可以改為調用SQL語句,速率會有提升 /// </summary> /// <param name="time"></param> [NonAction] [CapSubscribe("seckillGoods")] public void CreateOrder(string orderInfor) { try { Console.WriteLine("下面開始執行訂閱業務"); //1.扣減庫存 var sArctile = _baseService.Entities<T_SeckillArticle>().Where(u => u.id == "300001").FirstOrDefault(); sArctile.articleStockNum = sArctile.articleStockNum - 1; //2. 插入訂單信息 List<string> tempData = orderInfor.Split('-').ToList(); T_Order tOrder = new T_Order(); tOrder.id = Guid.NewGuid().ToString("N"); tOrder.userId = tempData[0]; tOrder.orderNum = tempData[3]; tOrder.articleId = tempData[1]; tOrder.orderTotalPrice = Convert.ToDecimal(tempData[2]); tOrder.addTime = DateTime.Now; tOrder.orderStatus = 0; _baseService.AddNo<T_Order>(tOrder); int count = _baseService.SaveChange(); Console.WriteLine($"執行成功,條數為:{count}"); } catch (Exception ex) { Console.WriteLine($"訂閱業務執行失敗:{ex.Message}"); } }
lua核心腳本 1

--[[本腳本主要整合:單品限流、購買的商品數量限制、方法冪等、扣減庫存的業務]] --[[ 一. 方法聲明 ]]-- --[[ --1. 單品限流--存在緩存覆蓋問題 local function seckillLimit() --(1).獲取相關參數 -- 限制請求數量 local tLimits=tonumber(ARGV[1]); -- 限制秒數 local tSeconds =tonumber(ARGV[2]); -- 受限商品key local limitKey = ARGV[3]; --(2).執行判斷業務 local myLimitCount = redis.call('INCR',limitKey); if myLimitCount > tLimits then return 0; --失敗 else redis.call('expire',limitKey,tSeconds) return 1; --成功 end; --對應的是if的結束 end; --對應的是整個代碼塊的結束 ]]-- --1. 單品限流--解決緩存覆蓋問題 local function seckillLimit() --(1).獲取相關參數 -- 限制請求數量 local tLimits=tonumber(ARGV[1]); -- 限制秒數 local tSeconds =tonumber(ARGV[2]); -- 受限商品key local limitKey = ARGV[3]; --(2).執行判斷業務 local myLimitCount = redis.call('INCR',limitKey); -- 僅當第一個請求進來設置過期時間 if (myLimitCount ==1) then redis.call('expire',limitKey,tSeconds) --設置緩存過期 end; --對應的是if的結束 -- 超過限制數量,返回失敗 if (myLimitCount > tLimits) then return 0; --失敗 end; --對應的是if的結束 end; --對應的是整個代碼塊的結束 --2. 限制一個用戶商品購買數量(這里假設一次購買一件,后續改造) local function userBuyLimit() --(1).獲取相關參數 local tGoodBuyLimits = tonumber(ARGV[5]); local userBuyGoodLimitKey = ARGV[6]; --(2).執行判斷業務 local myLimitCount = redis.call('INCR',userBuyGoodLimitKey); if (myLimitCount > tGoodBuyLimits) then return 0; --失敗 else redis.call('expire',userBuyGoodLimitKey,600) --10min過期 return 1; --成功 end; end; --對應的是整個代碼塊的結束 --3. 方法冪等(防止網絡延遲多次下單) local function recordOrderSn() --(1).獲取相關參數 local requestId = ARGV[7]; --請求ID --(2).執行判斷業務 local requestIdNum = redis.call('INCR',requestId); --表示第一次請求 if (requestIdNum==1) then redis.call('expire',requestId,600) --10min過期 return 1; --成功 end; --第二次及第二次以后的請求 if (requestIdNum>1) then return 0; --失敗 end; end; --對應的是整個代碼塊的結束 --4、扣減庫存 local function subtractSeckillStock() --(1) 獲取相關參數 --local key =KEYS[1]; --傳過來的是ypf12345沒有什么用處 --local arg1 = tonumber(ARGV[1]);--購買的商品數量 -- (2).扣減庫存 -- local lastNum = redis.call('DECR',"sCount"); local lastNum = redis.call('DECRBY',ARGV[8],tonumber(ARGV[4])); --string類型的自減 -- (3).判斷庫存是否完成 if lastNum < 0 then return 0; --失敗 else return 1; --成功 end end --[[ 二. 方法調用 返回值1代表成功,返回:0,2,3,4 代表不同類型的失敗 ]]-- --[[ --1. 單品限流調用 local status1 = seckillLimit(); if status1 == 0 then return 2; --失敗 end ]]-- --[[ --2. 限制購買數量 local status2 = userBuyLimit(); if status2 == 0 then return 3; --失敗 end ]]-- --3. 方法冪等 --[[ local status3 = recordOrderSn(); if status3 == 0 then return 4; --失敗 end ]]-- --4.扣減秒殺庫存 local status4 = subtractSeckillStock(); if status4 == 0 then return 0; --失敗 end return 1; --成功
lua回滾腳本

--[[本腳本主要整合:單品限流、購買的商品數量限制、方法冪等、扣減庫存的業務的回滾操作]] --[[ 一. 方法聲明 ]]-- --1.單品限流恢復 local function RecoverSeckillLimit() local limitKey = ARGV[1];-- 受限商品key redis.call('INCR',limitKey); end; --2.恢復用戶購買數量 local function RecoverUserBuyNum() local userBuyGoodLimitKey = ARGV[2]; local goodNum = tonumber(ARGV[5]); --商品數量 redis.call("DECRBY",userBuyGoodLimitKey,goodNum); end --3.刪除方法冪等存儲的記錄 local function DelRequestId() local userRequestId = ARGV[3]; --請求ID redis.call('DEL',userRequestId); end; --4. 恢復訂單原庫存 local function RecoverOrderStock() local stockKey = ARGV[4]; --庫存中的key local goodNum = tonumber(ARGV[5]); --商品數量 redis.call("INCRBY",stockKey,goodNum); end; --[[ 二. 方法調用 ]]-- RecoverSeckillLimit(); RecoverUserBuyNum(); DelRequestId(); RecoverOrderStock();
3.各種異常處理
(1). 發布者發送消息失敗
A. CAP框架有本地消息存儲發送的消息,會自動重試.
B. 如果還沒有存儲到本地消息表中失敗了怎么辦?→要回滾之前的業務,寫一個Lua回滾的腳本)
(2).RabbitMq宕機
本地消息表機制
(3).訂閱者異常
本地消息表機制+人工干預
二. 下單成功后的處理方案
1.技術背景
在最終的下單接口模式下,當業務出錯或被限制的時候,會直接返回給前端,比如‘搶單失敗等’,但是當下單請求存放到隊列中后也是直接返回給前端, 此時我們並不知道后續創建訂單等操作是否成功。
面對這種情況,,應該怎么處理? 前端又作何提示呢?
2. 常用方案剖析
方案1:等待推送
(1).原理:前端頁面提示“請求已提交,請耐心等待”,等服務端消費成功后,主動推送給客戶端,客戶端接受到推送成功請求后,自動跳轉付款頁面進行付款.
(2).分析:
A.該方案受客戶端網絡等影響或者用戶退出應用了,很容易推送失敗, 而且還需要引入額外的機制進行離線處理.
B.高並發下推送也會出現丟失現象.
C.需要引入新的實時通訊技術,增加了技術成本
方案2:主動刷新
(1).原理:前端頁面提示“請求已提交,請稍后刷新查看結果”,需要用戶主動刷新,或者可以主動輪訓獲取結果。
(2).分析:
A.秒殺期間並發已經很高了,又要增加刷新請求,增加硬件資源成本。
B.體驗不是很友好,還需要用戶主動刷新一下,看似加了一步操作,有的人可能不知道怎么操作導致直接不付錢了
C.實際上該方案也經常被運用,在一定場景下,該方案不失為一種辦法
方案3:直接付款下單(推薦!!)
(1).思考
用戶既然已經提交成功了,說明該用戶已經搶到購買資格了,只要正常付款就可以, 但是由於商家技術層面的問題,導致該用戶的后續業務失敗,憑啥讓用戶來買單呢??
所以很多商城都認為:提交成功了,就是成功了!! 可以直接付款即可.
(2).原理
A.提交到隊列前,相關訂單信息我們實際上早已組裝好(比如訂單號、總金額、用戶id、商品id等等),所以提交隊列成功后,我們完全可以拿着這些信息直接去付款,也就是在付款表中插入記錄。
B.另外我們還需要一個定時器:用於輪訓支付記錄表和訂單表,當支付記錄表中有記錄顯示付款成功,但是由於一些原因導致訂單創建失敗,這個時候需要根據支付記錄表中的數據,向訂單表插入數據來創建訂單信息.(如果定時器也失敗了,就人工干預)
(3).分析
A.首先業務我們已經很嚴謹了,采用CAP框架后有很多異常處理方案,也就是說插入隊列成功,但訂閱業務失敗的這種情況極為少見,就會有重試機制,也會有人工手動干預處理的。
B.上面我們還增加了一個定時器,就算真的失敗了沒有創建訂單成功,定時器也會進行同步數據的。
C.由於是高並發,數據可能非常多,輪訓同步訂單數據 和 訂閱者創建訂單數據 不一定誰先進行,所以都要先判斷一下訂單表中是否有數據,再進行操作。
部分代碼分享(簡版):
支付接口和支付回調接口:

#region 01-創建支付接口 /// <summary> /// 創建支付接口 /// </summary> /// <param name="userId">用戶編號</param> /// <param name="orderNum">訂單號</param> /// <param name="arcId">商品編號</param> /// <param name="totalPrice">訂單總額</param> /// <returns></returns> public string CreatePayInfor(string userId, string orderNum, string arcId, string totalPrice) { //1.組裝數據請求向第三方支付平台下單 //2.三方支付平台下單成功 //說明:會返回一個鏈接,用戶跳轉喚起支付應用(微信/支付寶) //2.1 創建預訂單信息 (這里錯了!!!,應該創建支付記錄,預定單是在隊列中創建啊!!!) T_Order tOrder = new T_Order(); tOrder.id = Guid.NewGuid().ToString("N"); tOrder.userId = userId; tOrder.orderNum = orderNum; tOrder.articleId = arcId; tOrder.orderTotalPrice = Convert.ToDecimal(totalPrice); tOrder.addTime = DateTime.Now; tOrder.orderStatus = 0; _baseService.Add(tOrder); _baseService.SaveChange(); //2.2 跳轉支付成功的鏈接進行支付 return ""; } #endregion #region 02-支付成功回調接口 /// <summary> /// 支付成功回調接口 /// 僅為了演示,實際解析出來的參數很多 /// </summary> /// <param name="orderNum">訂單號</param> /// <returns></returns> public string PaySuccessCallBack(string orderNum) { //1. 參數加密規則校驗--用戶判斷是否是支付平台發過來的 //2. 解析參數,再次請求支付平台,看是否有這條信息(很多情況下這一步是省略的) //3. 將自己系統的訂單信息改為已支付 //3.1 這里要有一層判斷邏輯,假設一個訂單支付平台回調了兩次,我們系統是只執行一次的 var data = _baseService.Entities<T_Order>().Where(u => u.orderNum == orderNum).FirstOrDefault(); if (data!=null) { data.payTime = DateTime.Now; data.orderStatus = 1; } _baseService.SaveChange(); //4. 返回一個標記,否則支付平台會多次回調 return "success"; } #endregion
輪訓同步數據:
業務相對簡單,控制好間隔和查詢數量即可。
!
- 作 者 : Yaopengfei(姚鵬飛)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 聲 明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
- 聲 明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。