前言:前面我們都講解了一些基本的RabbitMQ配置及操作,現在我們來試下使用RabbitMQ處理一些簡單的數據並發問題
准備條件:先創建一個表students, 字段有id, count
CREATE TABLE Students
(
id INT IDENTITY PRIMARY KEY NOT NULL,
count INT NULL
)

我們准備通過每一次累加1,總和存儲在count字段上
一、普通程序的處理
//創建數據庫連接 private static BaseDAL dal = new BaseDAL("conName"); static void Main(string[] args) { //開啟線程進行處理 new Thread(Update).Start(); Console.ReadLine(); } private static void Update() { for (int i = 0; i <100; i++) { //獲取當前表中的count數值 object total = dal.ExecComdToObject("select count from Students where id=1", null); int value = int.Parse(total.ToString()) + 1;//將數值累加1 //將累加后的數值更新到表Students的字段count string sql = string.Format("update Students set count={0}", value); dal.ExecComd(sql); Console.WriteLine("寫入成功=" + i); } }
上面代碼,我們就開啟了1個線程進行讀取寫入,結果如下:

數據庫的字段count值為:

發現結果沒有問題,循環100次,結果是100
二、普通程序的並發處理
這次,我們開啟2個線程進行並發讀取,再同時寫入。我們先把數據庫的count置回0,再將代碼修改一下。
class Program { //創建數據庫連接 private static BaseDAL dal = new BaseDAL("conName"); static void Main(string[] args) { //開啟2線程進行處理 new Thread(Update).Start(); new Thread(Update).Start(); Console.ReadLine(); } private static void Update() { for (int i = 1; i <=100; i++) { //獲取當前表中的count數值 object total = dal.ExecComdToObject("select count from Students where id=1", null); int value = int.Parse(total.ToString()) + 1;//將數值累加1 //將累加后的數值更新到表Students的字段count string sql = string.Format("update Students set count={0}", value); dal.ExecComd(sql); Console.WriteLine("寫入成功=" + i); } } }
程序執行結果:

一看這程序我們就知道出問題了。那么這時候,數據庫的count字段值是多少呢?

為什么會出現這個結果。是因為在並發的時候,前面線程執行的結果,會被后面的update進行了覆蓋,所以值不會是200,也不會是剛好100.
那么這時候,我們的RabbitMQ就開始派上用場了(我這里就不介紹lock以及queue的使用,主要講解RabbitMQ)
三、RabbitMQ並發處理
(1)RabbitMQ.Server代碼處理,2線程並發,輸送200次請求
static void Main(string[] args) { //開啟2線程進行處理 new Thread(SendMsg).Start(); new Thread(SendMsg).Start(); Console.ReadLine(); } private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" }; private static void SendMsg() { using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { for (int i = 1; i <= 100; i++) { string guid = Guid.NewGuid().ToString(); var body = Encoding.UTF8.GetBytes(guid); channel.QueueDeclare("AllenLeeQueue", false, false, false, null); channel.BasicPublish("", "AllenLeeQueue", null, body); Console.WriteLine("[Set Msg To AllenLeeQueue] " + guid); } } } }
(2)RabbitMQ.Client代碼處理,對發起的200個請求進行接收
//創建數據庫連接 private static BaseDAL dal = new BaseDAL("conName"); private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" }; static void Main(string[] args) { factory.AutomaticRecoveryEnabled = true;//設置端口后自動恢復連接屬性即可 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [Get Msg from AllenLeeQueue] {0}", message); //獲取當前表中的count數值 object total = dal.ExecComdToObject("select count from Students where id=1", null); int value = int.Parse(total.ToString()) + 1;//將數值累加1 string sql = string.Format("update Students set count={0}", value); dal.ExecComd(sql); }; channel.BasicConsume(queue: "AllenLeeQueue", noAck: true, consumer: consumer); Console.ReadLine(); } } }
(3)如果RabbitMQ如果處理得當,數據庫的字段count值就為200,不多不少,我們來看下數據庫字段值

實踐證明,RabbitMQ能很好的進行並發處理,達到了我們預期的效果。
但是可能有朋友會覺得,這傳的都是guid,如果是我們實際工作中,直接傳的是並發業務數據,又該怎么處理呢?
其實就是在傳輸的body中,傳入業務數據,再在RabbitMQ.Client進行業務數據轉化就可以了。
