第六節:基於Cap框架引入事件總線機制(RabbitMQ+SQLServer) 和 下單成功后的方案剖析


一. 事件總線機制

1. 業務改造

 引入時間總線的概念,采用CAP框架進行業務處理,同時利用RabbitMQ代替Redis隊列,采用SQLServer進行本地消息表的存儲, 采用 推模式 發送消息,我們習慣稱之為 發布-訂閱 模式。

關於基於CAP框架實現事件總線,詳見:

 https://www.cnblogs.com/yaopengfei/p/13763500.html

 https://www.cnblogs.com/yaopengfei/p/13776361.html

2. 分析

 (1) 多件商品參與秒殺,訂閱者如何在一個接口中接收,如何處理這多個隊列?

 答:這里思考的角度不對,因為隊列中存的是下單信息,而不是商品信息,所有用戶對秒殺下單信息都往這一個隊列中存的(不同商品的庫存肯定是存放在多個緩存中),下單信息里肯定有商品信息,訂閱者拿到商品信息,再進行不同的業務處理。

 PS: 當然,也可以定義多個訂閱者接口。

 (2).消息總線 發消息 給訂閱者,訂閱者的並發怎么處理?

 答:CAP框架默認是單線程處理的,下一個消息的發送,需要等待上一個消息執行完才行,當然也可以多線程,但無法保證消費順序且訂閱者中可能會出現並發問題。

 (3).由於是單線程發送,所以執行速度會相對慢,不如項目消費者主動獲取塊級處理那種模式塊。

 答:單條數據的處理肯定不如批量提交效率高,這是一個無法回避的缺陷,這里用CAP框架的優勢在於基於本地消息表的異常處理很友好。

核心代碼:

cap框架注冊代碼:

           services.AddCap(x =>
            {
                //-----------------------------一.聲明存儲類型---------------------------------
                //1. 使用SQLServer存儲
                //還需要配合上面EF上下文的注入 services.AddDbContext
                x.UseEntityFramework<ESHOPContext>();   //EFCore配置

                //-----------------------------二.聲明消息隊列類型---------------------------------//1.使用RabbitMq隊列存儲
                x.UseRabbitMQ(rb =>
                {
                    rb.HostName = "localhost";
                    rb.UserName = "guest";
                    rb.Password = "guest";
                    rb.Port = 5672;
                    rb.VirtualHost = "/";
                    //rb.QueueMessageExpires = 24 * 3600 * 10;  //隊列中消息自動刪除時間(默認10天)
                });

                //-----------------------------三.添加后台監控,用於人工干預---------------------------------


                //-----------------------------四.通用配置---------------------------------
                x.ConsumerThreadCount = 1; //消費者線程並行處理消息的線程數,提高消費速度,但這個值大於1時,將不能保證消息執行的順序,且可能存在並發問題。
            });
View Code

發布訂閱核心代碼:

        /// <summary>
        ///09-引入事件總線RabbitMQ
        /// </summary>
        /// <param name="userId">用戶編號</param>
        /// <param name="arcId">商品編號</param>
        /// <param name="totalPrice">訂單總額</param>
        /// <param name="requestId">請求ID</param>
        /// <param name="goodNum">用戶購買的商品數量</param>
        /// <returns></returns>
        public string POrder9([FromServices] ICapPublisher _capBus, string userId, string arcId, string totalPrice, string requestId = "125643", int goodNum = 1)
        {
            int tLimits = 100;    //限制請求數量
            int tSeconds = 1;     //限制秒數
            string limitKey = $"LimitRequest{arcId}";//受限商品ID
            int tGoodBuyLimits = 3;  //用戶單個商品可以購買的數量
            string userBuyGoodLimitKey = $"userBuyGoodLimitKey-{userId}-{arcId}";  //用戶單個商品的限制key
            string userRequestId = requestId;    //用戶下單頁面的請求ID
            string arcKey = $"{arcId}-sCount";   //該商品庫存key
            try
            {
                //調用lua腳本
                //參數說明:
                var result = RedisHelper.EvalSHA(_cache.Get<string>("SeckillLua1"), "ypf12345", tLimits, tSeconds, limitKey, goodNum, tGoodBuyLimits, userBuyGoodLimitKey, userRequestId, arcKey);
                if (result.ToString() == "1")
                {
                    //2. 將下單信息存到消息隊列中
                    var orderNum = Guid.NewGuid().ToString("N");
                    _capBus.Publish("seckillGoods", $"{userId}-{arcId}-{totalPrice}-{orderNum}");

                    //3. 把部分訂單信息返回給前端
                    return $"下單成功,訂單信息為:userId={userId},arcId={arcId},orderNum={orderNum}";
                }
                else
                {
                    //請求被禁止,或者是商品賣完了
                    throw new Exception($"沒搶到");
                }
            }
            catch (Exception ex)
            {
                //lua回滾
                RedisHelper.EvalSHA(_cache.Get<string>("SeckillLuaCallback1"), "ypf12345", limitKey, userBuyGoodLimitKey, userRequestId, arcKey, goodNum);
                throw new Exception(ex.Message);
            }
        }


        /// <summary>
        /// 訂閱者的方法
        /// //可以改為調用SQL語句,速率會有提升
        /// </summary>
        /// <param name="time"></param>
        [NonAction]
        [CapSubscribe("seckillGoods")]
        public void CreateOrder(string orderInfor)
        {
            try
            {
                Console.WriteLine("下面開始執行訂閱業務");
                //1.扣減庫存
                var sArctile = _baseService.Entities<T_SeckillArticle>().Where(u => u.id == "300001").FirstOrDefault();
                sArctile.articleStockNum = sArctile.articleStockNum - 1;
                //2. 插入訂單信息
                List<string> tempData = orderInfor.Split('-').ToList();
                T_Order tOrder = new T_Order();
                tOrder.id = Guid.NewGuid().ToString("N");
                tOrder.userId = tempData[0];
                tOrder.orderNum = tempData[3];
                tOrder.articleId = tempData[1];
                tOrder.orderTotalPrice = Convert.ToDecimal(tempData[2]);
                tOrder.addTime = DateTime.Now;
                tOrder.orderStatus = 0;
                _baseService.AddNo<T_Order>(tOrder);

                int count = _baseService.SaveChange();
                Console.WriteLine($"執行成功,條數為:{count}");

            }
            catch (Exception ex)
            {
                Console.WriteLine($"訂閱業務執行失敗:{ex.Message}");
            }
        }
View Code

lua核心腳本 1

--[[本腳本主要整合:單品限流、購買的商品數量限制、方法冪等、扣減庫存的業務]]

--[[
    一. 方法聲明
]]--

--[[
--1. 單品限流--存在緩存覆蓋問題
local function seckillLimit()
--(1).獲取相關參數
-- 限制請求數量
local tLimits=tonumber(ARGV[1]);
-- 限制秒數
local tSeconds =tonumber(ARGV[2]);
-- 受限商品key
local limitKey = ARGV[3];
--(2).執行判斷業務
local myLimitCount = redis.call('INCR',limitKey);
if myLimitCount > tLimits 
then
return 0;  --失敗
else
redis.call('expire',limitKey,tSeconds)
return 1;  --成功
end;    --對應的是if的結束
end;    --對應的是整個代碼塊的結束
]]--

--1. 單品限流--解決緩存覆蓋問題
local function seckillLimit()
--(1).獲取相關參數
-- 限制請求數量
local tLimits=tonumber(ARGV[1]);
-- 限制秒數
local tSeconds =tonumber(ARGV[2]);
-- 受限商品key
local limitKey = ARGV[3];
--(2).執行判斷業務
local myLimitCount = redis.call('INCR',limitKey);

-- 僅當第一個請求進來設置過期時間
if (myLimitCount ==1) 
then
redis.call('expire',limitKey,tSeconds) --設置緩存過期
end;   --對應的是if的結束

-- 超過限制數量,返回失敗
if (myLimitCount > tLimits) 
then
return 0;  --失敗
end;   --對應的是if的結束

end;   --對應的是整個代碼塊的結束



--2. 限制一個用戶商品購買數量(這里假設一次購買一件,后續改造)
local function userBuyLimit()
--(1).獲取相關參數
local tGoodBuyLimits = tonumber(ARGV[5]); 
local userBuyGoodLimitKey = ARGV[6]; 

--(2).執行判斷業務
local myLimitCount = redis.call('INCR',userBuyGoodLimitKey);
if (myLimitCount > tGoodBuyLimits)
then
return 0;  --失敗
else
redis.call('expire',userBuyGoodLimitKey,600)  --10min過期
return 1;  --成功
end;
end;    --對應的是整個代碼塊的結束




--3. 方法冪等(防止網絡延遲多次下單)
local function recordOrderSn()
--(1).獲取相關參數
local requestId = ARGV[7];    --請求ID
--(2).執行判斷業務
local requestIdNum = redis.call('INCR',requestId);
--表示第一次請求
if (requestIdNum==1)                            
then
redis.call('expire',requestId,600)  --10min過期
return 1; --成功
end;
--第二次及第二次以后的請求
if (requestIdNum>1)
then
return 0;  --失敗
end;
end;  --對應的是整個代碼塊的結束




--4、扣減庫存
local function subtractSeckillStock()
--(1) 獲取相關參數
--local key =KEYS[1];   --傳過來的是ypf12345沒有什么用處
--local arg1 = tonumber(ARGV[1]);--購買的商品數量
-- (2).扣減庫存
-- local lastNum = redis.call('DECR',"sCount");
local lastNum = redis.call('DECRBY',ARGV[8],tonumber(ARGV[4]));  --string類型的自減
-- (3).判斷庫存是否完成
if lastNum < 0 
then
return 0; --失敗
else
return 1; --成功
end
end



--[[
    二. 方法調用   返回值1代表成功,返回:0,2,3,4 代表不同類型的失敗
]]--

--[[
--1. 單品限流調用
local status1 = seckillLimit();
if status1 == 0 then
return 2;   --失敗
end
]]--

--[[
--2. 限制購買數量
local status2 = userBuyLimit();
if status2 == 0 then
return 3;   --失敗
end
]]--

--3.  方法冪等
--[[
local status3 = recordOrderSn();
if status3 == 0 then
return 4;   --失敗
end
]]--


--4.扣減秒殺庫存
local status4 = subtractSeckillStock();
if status4 == 0 then
return 0;   --失敗
end
return 1;    --成功
View Code

lua回滾腳本

--[[本腳本主要整合:單品限流、購買的商品數量限制、方法冪等、扣減庫存的業務的回滾操作]]

--[[
    一. 方法聲明
]]--

--1.單品限流恢復
local function RecoverSeckillLimit()
local limitKey = ARGV[1];-- 受限商品key
redis.call('INCR',limitKey);
end;

--2.恢復用戶購買數量
local function RecoverUserBuyNum()
local userBuyGoodLimitKey =  ARGV[2]; 
local goodNum = tonumber(ARGV[5]); --商品數量
redis.call("DECRBY",userBuyGoodLimitKey,goodNum);
end

--3.刪除方法冪等存儲的記錄
local function DelRequestId()
local userRequestId = ARGV[3];  --請求ID
redis.call('DEL',userRequestId);
end;

--4. 恢復訂單原庫存
local function RecoverOrderStock()
local stockKey = ARGV[4];  --庫存中的key
local goodNum = tonumber(ARGV[5]); --商品數量
redis.call("INCRBY",stockKey,goodNum);
end;

--[[
    二. 方法調用
]]--
RecoverSeckillLimit();
RecoverUserBuyNum();
DelRequestId();
RecoverOrderStock();
View Code

3.各種異常處理

(1). 發布者發送消息失敗

 A. CAP框架有本地消息存儲發送的消息,會自動重試.

 B. 如果還沒有存儲到本地消息表中失敗了怎么辦?→要回滾之前的業務,寫一個Lua回滾的腳本)

(2).RabbitMq宕機

 本地消息表機制

(3).訂閱者異常

 本地消息表機制+人工干預

 

二. 下單成功后的處理方案

1.技術背景

 在最終的下單接口模式下,當業務出錯或被限制的時候,會直接返回給前端,比如‘搶單失敗等’,但是當下單請求存放到隊列中后也是直接返回給前端, 此時我們並不知道后續創建訂單等操作是否成功。

 面對這種情況,,應該怎么處理? 前端又作何提示呢?

2. 常用方案剖析

方案1:等待推送

 (1).原理:前端頁面提示“請求已提交,請耐心等待”,等服務端消費成功后,主動推送給客戶端,客戶端接受到推送成功請求后,自動跳轉付款頁面進行付款.

 (2).分析:

  A.該方案受客戶端網絡等影響或者用戶退出應用了,很容易推送失敗, 而且還需要引入額外的機制進行離線處理.

  B.高並發下推送也會出現丟失現象.

  C.需要引入新的實時通訊技術,增加了技術成本

方案2:主動刷新

 (1).原理:前端頁面提示“請求已提交,請稍后刷新查看結果”,需要用戶主動刷新,或者可以主動輪訓獲取結果。

 (2).分析:

  A.秒殺期間並發已經很高了,又要增加刷新請求,增加硬件資源成本。

  B.體驗不是很友好,還需要用戶主動刷新一下,看似加了一步操作,有的人可能不知道怎么操作導致直接不付錢了

  C.實際上該方案也經常被運用,在一定場景下,該方案不失為一種辦法

方案3:直接付款下單(推薦!!)

 (1).思考

  用戶既然已經提交成功了,說明該用戶已經搶到購買資格了,只要正常付款就可以, 但是由於商家技術層面的問題,導致該用戶的后續業務失敗,憑啥讓用戶來買單呢??

  所以很多商城都認為:提交成功了,就是成功了!!  可以直接付款即可.

 (2).原理

  A.提交到隊列前,相關訂單信息我們實際上早已組裝好(比如訂單號、總金額、用戶id、商品id等等),所以提交隊列成功后,我們完全可以拿着這些信息直接去付款,也就是在付款表中插入記錄。

  B.另外我們還需要一個定時器:用於輪訓支付記錄表和訂單表,當支付記錄表中有記錄顯示付款成功,但是由於一些原因導致訂單創建失敗,這個時候需要根據支付記錄表中的數據,向訂單表插入數據來創建訂單信息.(如果定時器也失敗了,就人工干預)

 (3).分析

  A.首先業務我們已經很嚴謹了,采用CAP框架后有很多異常處理方案,也就是說插入隊列成功,但訂閱業務失敗的這種情況極為少見,就會有重試機制,也會有人工手動干預處理的。

  B.上面我們還增加了一個定時器,就算真的失敗了沒有創建訂單成功,定時器也會進行同步數據的。

  C.由於是高並發,數據可能非常多,輪訓同步訂單數據 和 訂閱者創建訂單數據 不一定誰先進行,所以都要先判斷一下訂單表中是否有數據,再進行操作。

部分代碼分享(簡版):

支付接口和支付回調接口:

 #region 01-創建支付接口
        /// <summary>
        /// 創建支付接口
        /// </summary>
        /// <param name="userId">用戶編號</param>
        /// <param name="orderNum">訂單號</param>
        /// <param name="arcId">商品編號</param>
        /// <param name="totalPrice">訂單總額</param>
        /// <returns></returns>
        public string CreatePayInfor(string userId, string orderNum, string arcId, string totalPrice)
        {
            //1.組裝數據請求向第三方支付平台下單


            //2.三方支付平台下單成功
            //說明:會返回一個鏈接,用戶跳轉喚起支付應用(微信/支付寶)

            //2.1 創建預訂單信息 (這里錯了!!!,應該創建支付記錄,預定單是在隊列中創建啊!!!)
            T_Order tOrder = new T_Order();
            tOrder.id = Guid.NewGuid().ToString("N");
            tOrder.userId = userId;
            tOrder.orderNum = orderNum;
            tOrder.articleId = arcId;
            tOrder.orderTotalPrice = Convert.ToDecimal(totalPrice);
            tOrder.addTime = DateTime.Now;
            tOrder.orderStatus = 0;
            _baseService.Add(tOrder);
            _baseService.SaveChange();

            //2.2 跳轉支付成功的鏈接進行支付
            return "";
        }
        #endregion

        #region 02-支付成功回調接口
        /// <summary>
        /// 支付成功回調接口
        /// 僅為了演示,實際解析出來的參數很多
        /// </summary>
        /// <param name="orderNum">訂單號</param>
        /// <returns></returns>
        public string PaySuccessCallBack(string orderNum)
        {
            //1. 參數加密規則校驗--用戶判斷是否是支付平台發過來的


            //2. 解析參數,再次請求支付平台,看是否有這條信息(很多情況下這一步是省略的)


            //3. 將自己系統的訂單信息改為已支付
            //3.1 這里要有一層判斷邏輯,假設一個訂單支付平台回調了兩次,我們系統是只執行一次的
            var data = _baseService.Entities<T_Order>().Where(u => u.orderNum == orderNum).FirstOrDefault();
            if (data!=null)
            {
                data.payTime = DateTime.Now;
                data.orderStatus = 1;
            }
            _baseService.SaveChange();


            //4. 返回一個標記,否則支付平台會多次回調
            return "success";

        } 
        #endregion
View Code

輪訓同步數據:

 業務相對簡單,控制好間隔和查詢數量即可。  

 

 

 

 

 

!

  • 作       者 : Yaopengfei(姚鵬飛)
  • 博客地址 : http://www.cnblogs.com/yaopengfei/
  • 聲     明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
  • 聲     明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM