作者:林冠宏 / 指尖下的幽靈
GitHub : https://github.com/af913337456/
目錄
- 前序
- 一般的訂單流程
- 思考瓶頸點
- 訂單隊列
- 第一種訂單隊列
- 第二種訂單隊列
- 總結
- 實現隊列的選擇
- 解答
- 實現隊列的選擇
- 第二種隊列的 Go 版本例子代碼
前序
目前的開發工作主要是將
傳統電商應用
和區塊鏈技術
相結合,區塊鏈平台依然是以太坊
,此外地,這幾天由我編寫,經清華大學出版社出版的書籍,歷經八月,終於出版上架了,名稱是:《區塊鏈以太坊DApp開發實戰》
,現已可以網購。
本文所要分享的思路就是電商應用中常用的
訂單隊列
。
一般的訂單流程
電商應用中,簡單直觀的用戶從下單到付款,最終完成整個流程的步驟可以用下圖表示:
其中,訂單信息持久化
,就是存儲數據到數據庫中。而最終客戶端完成支付后的更新訂單狀態
的操作是由第三方支付平台進行回調設置好的回調鏈接 NotifyUrl
,來進行的。
補全訂單狀態的更新流程,如下圖表示:
思考瓶頸點
服務端的直接瓶頸點
,首先要考慮 TPS
。去除細分點,我們主要看訂單信息持久化
瓶頸點。
在高並發業務場景中,例如 秒殺
、優惠價搶購
等。短時間內的下單請求數會很多,如果訂單信息持久化
部分,不做優化,而是直接對數據庫層進行頻繁的
讀寫操作,數據庫會承受不了,容易成為第一個垮掉的服務,比如下圖的所示的常規寫單流程:
可以看到,每
持久化一個訂單信息,一般要經歷網絡連接操作(鏈接數據庫),以及多個 I/O
操作。
得益於連接池
技術,我們可以在鏈接數據庫的時候,不用每次都重新發起一次完整的HTTP請求,而可以直接從池中獲取已打開了的連接句柄,而直接使用,這點和線程池的原理差不多。
此外,我們還可以在上面的流程中加入更多的優化,例如對於一些需要讀取的信息,可以事先存置到內存緩存層,並加於更新維護,這樣在使用的時候,可以快速讀取。
即使我們都具備了上述的一些優化手段,但是對於寫操作
的I/O
阻塞耗時,在高並發請求
的時候,依然容易導致數據庫承受不住,容易出現鏈接多開異常
,操作超時
等問題。
在該層進行優化的操作,除了上面談到的之外,還有下面一些手段:
- 數據庫集群,采用讀寫分離,減少寫時壓力
- 分庫,不同業務的表放到不同的數據庫,會引入分布式事務問題
- 采用隊列模型削峰
每種方式有各自的特點,因為本文談的是訂單隊列
的架構思想,所以下面我們來看下如何在訂單系統中引入訂單隊列。
訂單隊列
網上有不少文章談到訂單隊列的做法,大部分都漏了說明請求與響應的一致性問題。
第一種訂單隊列
流程圖:
上圖是大多文章提到的隊列模型,有兩個沒有解析的問題:
- 如果訂單存在第三方支付情況,① 和 ② 的一致性如何保證,比如其中一處處理失敗;
- 如果訂單存在第三方支付情況,① 完成了支付,且三方支付平台回調了
notifyUrl
,而此時 ② 還在排隊等待處理,這種情況又如何處理。
首先,要肯定的是,上面的訂單流程圖是沒有問題的。它有下面的優缺點,所提到的兩個問題也是有解決方案的。
優點:
- 用戶無需等待訂單持久化處理,而能直接獲得響應,實現快速下單
- 持久化處理,采用排隊的先來先處理,不會像上面談到的高並發請求一起沖擊數據庫層面的情況。
- 可變性強,
搭配中間件
的組合性強。
缺點:
- 多訂單入隊時,② 步驟的處理速度跟不上。從而導致第二點問題。
- 實現較復雜
上面談及的問題點,我后面都會給出解決方案。下面我們來看下另外一種訂單隊列流程圖。
第二種訂單隊列
流程圖:
第二種訂單隊列的設計模型,注意它的同步等待
持久化處理的結果,解決了持久化與響應的一致性問題,但是有個嚴重的耗時等待問題,它的優缺點如下:
優點:
- 持久化與響應的強一致性。
- 持久化處理,采用排隊的先來先處理,不會像上面談到的高並發請求一起沖擊數據庫層面的情況。
- 實現簡單
缺點:
- 多訂單入隊時,持久化單元處理速度跟不上,造成客戶端同步等待響應。
這類訂單隊列,我下面會放出 Golang
實現的版本代碼。
總結
對比上面兩種常見的訂單模型,如果從用戶體驗的角度
去優先考慮,第一種不需要用戶等待持久化處理
結果的是明顯優於第二種的。如果技術團隊完善,且技術過硬,也應該考慮第一種的實現方式。
如果僅僅想要達到寧願用戶等待到超時
也不願意存儲層服務被沖垮,那么有限考慮第二種。
實現隊列的選擇
在這里,我們進一步細分一下,實現隊列模塊的功能有哪些選擇。
相信很多后端開發經驗比較老道的同志已經想到了,使用現有的中間件,比如知名的 Redis
、RocketMQ
,以及 Kafka
等,它們都是一種選擇。
此外地,我們還可以直接編寫代碼,在當前的服務系統中實現一個消息隊列來達到目的,下面我用圖來分類下隊列類型。
不同的隊列實現方式,能直接導致不同的功能,也有不同的優缺點:
一級緩存優點:
- 一級緩存,最快。無需鏈接,直接從內存層獲取;
- 如果不考慮持久化和集群,那么它實現簡單。
一級緩存缺點:
- 如果考慮持久化和集群,那么它實現比較復雜。
- 不考慮持久化情況下,如果服務器斷電或其它原因導致服務中斷,那么排隊中的訂單信息將丟失
中間件的優點:
- 軟件成熟,一般出名的消息中間件都是經過實踐使用的,文檔豐富;
- 支持多種持久化的策略,比如 Redis 有
增量
持久化,能最大程度減少因不可預料的崩潰導致訂單信息丟失; - 支持集群,主從同步,這對於分布式系統來說,是必不可少的要求。
中間件的缺點:
- 分布式部署時,需要建立鏈接通訊,導致讀寫操作需要走網絡通訊。
解答
回到第一種訂單模型中:
問題1:
如果訂單存在第三方支付情況,① 和 ② 的一致性如何保證?
首先我們看下,不一致性的時候,會產生什么結果:
- ① 失敗,用戶因為網絡原因或返回其它頁面,不能獲取結果。而 ② 成功,那么最終該訂單的狀態是待支付。用戶進入到個人訂單中心完成訂單支付即可;
- ① 和 ② 都失敗,那么下單失敗;
- ① 成功,② 失敗,此時用戶在
響應頁面
完成了支付動作,用戶查看訂單信息為空白。
上述的情況,明顯地,只有 3 是需要恢復訂單信息的,應對的方案有:
- 當服務端支付回調接口被第三方支付平台訪問時,無法找到對應的訂單信息。那么先將這類支付了卻沒訂單信息的數據存儲起來先,比如存儲到
表A
。同時啟動一個定時任務B
專門遍歷表A,然后去訂單列表尋找是否已經有了對應的訂單信息,有則更新,沒則繼續,或跟隨制定的檢測策略走。 - 當 ② 是由於服務端的
非崩潰性原因
而導致失敗時:- 失敗的時候同時將原始訂單數據重新插入到
隊列頭部
,等待下一次的重新持久化處理。
- 失敗的時候同時將原始訂單數據重新插入到
- 當 ② 因服務端的
崩潰性
原因而導致失敗時:定時任務B
在進行了多次檢測無果后,那么根據第三方支付平台在回調時候傳遞過來的訂單附屬信息
對訂單進行恢復。
- 整個過程訂單恢復的過程,用戶查看訂單信息為空白。
定時任務B
所在服務最好
和回調鏈接notifyUrl
所在的接口服務一致,這樣能保證當 B 掛掉的時候,回調服務也跟隨掛掉,然后第三方支付平台在調用回調失敗的情況下,他們會有重試邏輯
,依賴這個,在回調服務重啟時,可以完成訂單信息恢復。
問題2:
如果訂單存在第三方支付情況,① 完成了支付,且三方支付平台回調了 notifyUrl,而此時 ② 還在排隊等待處理,這種情況又如何處理?
應對的方案參考 問題1
的 定時任務B
檢測修改機制。
第二種隊列的 Go 版本例子代碼
定義一些常量
const (
QueueOrderKey = "order_queue"
QueueBufferSize = 1024 // 請求隊列大小
QueueHandleTime = time.Second * 7 // 單個 mission 超時時間
)
定義出入隊接口,方便多種實現
// 定義出入隊接口,方便多種實現
type IQueue interface {
Push(key string,data []byte) error
Pop(key string) ([]byte,error)
}
定義請求與響應實體
// 定義請求與響應實體
type QueueTimeoutResp struct {
Timeout bool // 超時標志位
Response chan interface{}
}
type QueueRequest struct {
ReqId string `json:"req_id"` // 單次請求 id
Order *model.OrderCombine `json:"order"` // 訂單信息 bean
AccessTime int64 `json:"access_time"` // 請求時間
ResponseChan *QueueTimeoutResp `json:"-"`
}
定義隊列實體
// 定義隊列實體
type Queue struct {
mapLock sync.Mutex
RequestChan chan *QueueRequest // 緩存管道,裝載請求
RequestMap map[string]*QueueTimeoutResp
Queue IQueue
}
實例化隊列,接收接口參數
// 實例化隊列,接收接口參數
func NewQueue(queue IQueue) *Queue {
return &Queue{
mapLock: sync.Mutex{},
RequestChan: make(chan *QueueRequest, QueueBufferSize),
RequestMap: make(map[string]*QueueTimeoutResp, QueueBufferSize),
Queue: queue,
}
}
接收請求
// 接收請求
func (q *Queue) AcceptRequest(req *QueueRequest) interface{} {
if req.ResponseChan == nil {
req.ResponseChan = &QueueTimeoutResp{
Timeout: false,
Response: make(chan interface{},1),
}
}
userKey := key(req) // 唯一 key 生成函數
req.ReqId = userKey
q.mapLock.Lock()
q.RequestMap[userKey] = req.ResponseChan // 內存層存儲對應的 req 的 resp 管道指針
q.mapLock.Unlock()
q.RequestChan <- req // 接收請求
log("userKey : ", userKey)
ticker := time.NewTicker(QueueHandleTime) // 以超時時間 QueueHandleTime 啟動一個定時器
defer func() {
ticker.Stop() // 釋放定時器
q.mapLock.Lock()
delete(q.RequestMap,userKey) // 當處理完一個 req,從 map 中移出
q.mapLock.Unlock()
}()
select {
case <-ticker.C: // 超時
req.ResponseChan.Timeout = true
Queue_TimeoutCounter++ // 輔助計數,int 類型
log("timeout: ",userKey)
return lghError.HandleTimeOut // 返回超時錯誤的信息
case result := <-req.ResponseChan.Response: // req 被完整處理
return result
}
}
從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine
中運行
// 從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine 中運行
func (q *Queue) addToQueue() {
for {
req := <-q.RequestChan // 取出一個 req
data, err := json.Marshal(req)
if err != nil {
log("redis queue parse req failed : ", err.Error())
continue
}
if err = q.Queue.Push(QueueOrderKey, data);err != nil { // push 入隊,這里有時間消耗
log("lpush req failed. Error : ", err.Error())
continue
}
log("lpush req success. req time: ", req.AccessTime)
}
}
取出 req 處理,該函數在 gorutine
中運行
// 取出 req 處理,該函數在 gorutine 中運行
func (q *Queue) readFromQueue() {
for {
data, err := q.Queue.Pop(QueueOrderKey) // pop 出隊,這里也有時間消耗
if err != nil {
log("lpop failed. Error : ", err.Error())
continue
}
if data == nil || len(data) == 0 {
time.Sleep(time.Millisecond * 100) // 空數據的 req,停頓下再取
continue
}
req := &QueueRequest{}
if err = json.Unmarshal(data, req);err != nil {
log("Lpop: json.Unmarshal failed. Error : ", err.Error())
continue
}
userKey := req.ReqId
q.mapLock.Lock()
resultChan, ok := q.RequestMap[userKey] // 取出對應的 resp 管道指針
q.mapLock.Unlock()
if !ok {
// 中間件重啟時,比如 redis 重啟而讀取舊 key,會進入這里
Queue_KeyNotFound ++ // 計數 int 類型
log("key not found, rollback: ", userKey)
continue
}
simulationTimeOutReq4(req) // 模擬出來任務的函數,入參為 req
if resultChan.Timeout {
// 處理期間,已經超時,這里做可以拓展回滾操作
Queue_MissionTimeout ++
log("handle mission timeout: ", userKey)
continue
}
log("request result send to chan succeee, userKey : ", userKey)
ret := util.GetCommonSuccess(req.AccessTime)
resultChan.Response <- &ret // 輸入處理成功
}
}
啟動
func (q *Queue) Start() {
go q.addToQueue()
go q.readFromQueue()
}
運行例子
func test(){
...
runtime.GOMAXPROCS(4)
redisQueue := NewQueue(NewFastCacheQueue())
redisQueue.Start()
reqNumber := testReqNumber
wg := sync.WaitGroup{}
wg.Add(reqNumber)
for i :=0;i<reqNumber;i++ {
go func(index int) {
combine := model.OrderCombine{}
ret := AcceptRequest(&QueueRequest{
UserId: int64(index),
Order: &combine,
AccessTime: time.Now().Unix(),
ResponseChan: nil,
})
fmt.Println("ret: ------------- ",ret.String())
wg.Done()
}(i)
}
wg.Wait()
time.Sleep(3*time.Second)
fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout)
}