高並發場景之RabbitMQ
上次我們介紹了在單機、集群下高並發場景可以選擇的一些方案,傳送門:高並發場景之一般解決方案
但是也發現了一些問題,比如集群下使用ConcurrentQueue或加鎖都不能解決問題,后來采用Redis隊列也不能完全解決問題,
因為使用Redis要自己實現分布式鎖
這次我們來了解一下一個專門處理隊列的組件:RabbitMQ,這個東西天生支持分布式隊列。
下面我們來用RabbitMQ來實現上一篇的場景
一、新建RabbitMQ.Receive
private static ConnectionFactory factory = new ConnectionFactory
{ HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" };
1 static void Main(string[] args) 2 { 3 using (var connection = factory.CreateConnection()) 4 { 5 using (var channel = connection.CreateModel()) 6 { 7 var consumer = new EventingBasicConsumer(); 8 consumer.Received += (model, ea) => 9 { 10 var body = ea.Body; 11 var message = Encoding.UTF8.GetString(body); 12 Console.WriteLine(" [x] Received {0}", message); 13 14 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 15 var value = int.Parse(total) + 1; 16 17 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 18 }; 19 20 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 21 channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer); 22 23 Console.WriteLine(" Press [enter] to exit."); 24 Console.ReadLine(); 25 } 26 } 27 }
二、新建RabbitMQ.Send
1 static void Main(string[] args) 2 { 3 for (int i = 1; i <= 500; i++) 4 { 5 Task.Run(async () => 6 { 7 await Produce(); 8 }); 9 10 Console.WriteLine(i); 11 } 12 13 Console.ReadKey(); 14 } 15 16 public static Task Produce() 17 { 18 return Task.Factory.StartNew(() => 19 { 20 using (var connection = factory.CreateConnection()) 21 { 22 using (var channel = connection.CreateModel()) 23 { 24 var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); 25 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 26 channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body); 27 } 28 } 29 }); 30 }
這里是模擬500個用戶請求,正常的話最后Total就等於500
我們來說試試看,運行程序
2.1、打開接收端
2.2 運行客戶端
2.3、可以看到2邊幾乎是實時的,再去看看數據庫
三、我們在集群里執行
最后數據是1000