前言
2020年即將進入尾聲,分享一下在現公司業務處理流程,一起討論在分布式場景下,如何通過消息流的方式處理各種復雜的業務場景,這里涉及到一些常用組件,后面結合場景與代碼來具體說明
場景說明
這里就拿我負責的短信應用來舉例,它由3個核心模塊組成
- 短信網關(接收客戶提交短信,接收通道短信回執,轉發回執給客戶...)
- 計費服務(短信計費,購買套餐...)
- 短信通道(短信提交到通道...)
痛點
- 批量提交營銷類短信,如客戶一次提交10w-20w條短信
- 短信實時計費,延遲問題
- 通道限流控制
- 通知回執延時問題
組件
mysql
redis
阿里雲日志
流程如下
痛點解決方案
- 批量提交短信
批量提交這里主要以下幾種情況
- 短信內容一致,手機號多個
- 短信內容不一致(例如內容攜帶用戶信息等... 通常采用excel上傳)
- 循環調用接口,提交短信
這里僅針對情況3來說明,首先將用戶信息存入redis(先讀緩存,再讀庫),減少在驗證與鑒權時對數據庫的查詢壓力,校驗通過的消息開始寫入收單隊列,並記錄日志(注意日志一定要異步寫入),隊列使用的redis隊列,以目前的業務承載能力,是完全沒有問題的,收單接口的qps可以通過分布式來提升,這個得益於k8s容器伸縮,平時我們一般是5個pod在運行(相當於負載5個應用),在節假日高峰期可以起10個pod,這里性能的瓶頸主要集中在redis隊列,如果有更高要求可以嘗試換成rabbitmq,kafka等
- mysql批量持久化
這也一直是我比較迷的一個地方,數據庫使用的阿里雲mysql,單條循環插入速度在200/s左右,我這里采用的dapper,通過拼接values來批量插入,速度大概能達到3000/s,后面看看有沒有更好的方案
- 短信實時計費,延遲問題
在分布式情況下,實時計費又是一個比較大的性能瓶頸,直接讀庫,並修改用戶條數顯然是不行的,這樣會出現臟讀,導致最終數據不准確而出損(程序員就要背鍋),而且效率低下,使用redis分布式鎖等同於將分布式改成單應用,需要頻繁更新緩存里的用戶短信余量,速度大概在150/s - 180/s,消費速度過慢會導致隊列里數據堆積,短信延遲過高,特別是通知類短信(如獲取驗證碼)對延遲有較高的要求,這里使用redis的計數器來實現,通過計數器遞減的方式,如果短信余量<0則用戶短信余量不足,再單獨起一個任務,每分鍾同步一下用戶短信余量,用戶充值與短信失敗回退,也是對計數器的操作
- 通道限流控制
通道限流主要是供貨商對通道進行流量限制,超頻的短信會直接失敗,一般出現在營銷類短信,因為每條通道的價格(與地域,三網,到達率...有關)都不一樣,每個用戶都會分配通道,為了讓短信盡量成功,程序需要進行限流,這里也是使用redis計數器實現,設置一個1分鍾失效的緩存,超過頻次后,會嘗試其它通道,沒有可用通道則再次寫入隊列
- 通知回執延時問題
系統拿到回執后,需要將回執通知給客戶配置的http地址,並得到客戶的響應,未響應的回執會再次入列,直到客戶響應,或者超過推送策略(推送3次或超過多長時間),有些客戶配置的http地址響應非常慢,或者干脆是一些訪問超時的地址,這樣會導致通知延遲,通知類發卡密業務的短信對回執有較高要求,這里通過多線程來實現,通過創建多個線程從隊列里獲取回執並轉發
List<Task> tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
tasks.Add(Task.Run(async () =>{...}));
}
await Task.WhenAll(tasks);
- 注意事項
- 因為操作redis的地方非常多,為了便於管理,所有redis的key建議統一寫在常量類里,並寫上注釋,並制定對應的規范,方便維護
/// 項目名_類型_操作_參數 有效期 project_queue_action_params
- 業務日志,業務日志方便排查問題,建議不要偷懶,在業務的入口跟出口都寫上對應的日志信息
- 推薦我自己寫的一個Redis消息隊列中間件InitQ,操作簡單可以下載的玩玩
https://github.com/wmowm/initq