一、未做消息隊列
缺陷:用戶秒殺,請求到了上游服務秒殺服務,然后上游服務調用下游服務下訂單,減去庫存,更新余額。
上游服務秒殺服務的並發量能力有10000,下游服務的並發量能力有1000,當真的客戶端並發量是10000,上游服務秒殺服務能接收10000個請求,但是下游服務只能接收1000個請求,那么下游服務就宕機了。
二、配合消息隊列
上游服務並發來了10000個請求,只把1000個請求寫入消息隊列。
三、封裝Redis消息隊列,優化流量請求
namespace MyRedisUnitty { /// <summary> /// 封裝Redis消息隊列 /// </summary> public class RedisMsgQueueHelper : IDisposable { /// <summary> /// Redis客戶端 /// </summary> public RedisClient redisClient { get; } public RedisMsgQueueHelper(string redisHost) { redisClient = new RedisClient(redisHost); } /// <summary> /// 入隊 /// </summary> /// <param name="qKeys">入隊key</param> /// <param name="qMsg">入隊消息</param> /// <returns></returns> public long EnQueue(string qKey, string qMsg) { //1、編碼字符串 byte[] bytes = System.Text.Encoding.UTF8.GetBytes(qMsg); //2、Redis消息隊列入隊 long count = redisClient.LPush(qKey, bytes); return count; } /// <summary> /// 出隊(非阻塞) === 拉 /// </summary> /// <param name="qKey">出隊key</param> /// <returns></returns> public string DeQueue(string qKey) { //1、redis消息出隊 byte[] bytes = redisClient.RPop(qKey); string qMsg = null; //2、字節轉string if (bytes == null) { Console.WriteLine($"{qKey}隊列中數據為空"); } else { qMsg = System.Text.Encoding.UTF8.GetString(bytes); } return qMsg; } /// <summary> /// 出隊(阻塞) === 推 /// </summary> /// <param name="qKey">出隊key</param> /// <param name="timespan">阻塞超時時間</param> /// <returns></returns> public string DeQueueBlock(string qKey, TimeSpan? timespan) { // 1、Redis消息出隊 string qMsg = redisClient.BlockingPopItemFromList(qKey, timespan); return qMsg; } /// <summary> /// 獲取隊列數量 /// </summary> /// <param name="qKey">隊列key</param> /// <returns></returns> public long GetQueueCount(string qKey) { return redisClient.GetListCount(qKey); } /// <summary> /// 關閉Redis /// </summary> public void Dispose() { redisClient.Dispose(); } } }
四、上游服務--使用Redis消息隊列優化流量請求
發送消息,模擬下游可以接受請求量100,隊列中數量超出100則拋出異常,否則寫入隊列。
namespace MyRedisFlowPeak.FlowLimit { /// <summary> /// 秒殺上游服務:客戶接受請求量100 /// </summary> class SecKillUpstream { /// <summary> /// 處理的最大請求數 /// </summary> private int HandlerRequestCounts = 100; /// <summary> /// 秒殺方法 /// </summary> /// <param name="RequestCounts">請求數量</param> public void CreateSkillOrder(int requestCounts) { //1、創建秒殺訂單 Console.WriteLine($"秒殺請求數量:{requestCounts}"); //如何使用Redis消息隊列優化流量請求? //Redis優化 using (var msgQueue = new RedisMsgQueueHelper("localhost:6379")) { //1、循環寫入隊列 for (int i = 0; i < requestCounts; i++) { //1.1、獲取消息隊列數量 long count = msgQueue.GetQueueCount("My_Order"); //1.2、判斷是否已滿 if (count >= HandlerRequestCounts) { Console.WriteLine($"系統正常繁忙,請稍后..."); } else { //1.3、寫入隊列訂單編號 Console.WriteLine($"入隊成功"); msgQueue.EnQueue("My_Order", "OrderNo:" + i + ""); } } } } }
五、下游服務
消費上游消息,做下游服務的業務邏輯。
namespace MyRedisSecKillDownStream { class Program { static void Main(string[] args) { //Redis優化 using (var msgQueue = new RedisMsgQueueHelper("localhost:6379")) { Console.WriteLine("上游消息......"); //1、獲取上游消息 while (true) { string rm_sms = msgQueue.DeQueueBlock("My_Order", TimeSpan.FromSeconds(60)); Console.WriteLine($"*****************開始秒殺下游業務調用**********************"); //2、下游業務邏輯,秒殺業務處理 SecKillDownstream secKillDownstream = new SecKillDownstream(); secKillDownstream.HandlerRequest(rm_sms); Console.WriteLine($"*****************秒殺下游業務調用完成**********************"); } } } } }
namespace MyRedisSecKillDownStream.FlowLimit { /// <summary> /// 下游服務:最大請求處理量為100 /// </summary> class SecKillDownstream { /// <summary> /// 處理請求 /// </summary> /// <param name="requestCount"></param> public void HandlerRequest(string requestCount) { Thread.Sleep(10); //1、創建訂單 Console.WriteLine($"秒殺訂單創建成功"); //2、扣減庫存 Console.WriteLine($"秒殺訂單庫存扣減生成"); //3、扣減余額 Console.WriteLine($"用戶余額扣減成功"); Console.WriteLine($"處理的請求數:{requestCount}"); } } }
六、調用客戶端
namespace MyRedisFlowPeak { class Program { static void Main(string[] args) { SecKillUpstream secKillUpstream = new SecKillUpstream(); secKillUpstream.CreateSkillOrder(1000); } } }
七、運行效果
八、項目截圖