RabbitMQ 教程(四)RabbitMQ並發處理


前言:前面我們都講解了一些基本的RabbitMQ配置及操作,現在我們來試下使用RabbitMQ處理一些簡單的數據並發問題

准備條件:先創建一個表students, 字段有id, count

CREATE TABLE Students
(
id INT IDENTITY PRIMARY KEY NOT NULL,
count INT NULL
)

我們准備通過每一次累加1,總和存儲在count字段上

 

一、普通程序的處理

//創建數據庫連接
        private static BaseDAL dal = new BaseDAL("conName");
        static void Main(string[] args)
        {
            //開啟線程進行處理
            new Thread(Update).Start();

            Console.ReadLine();
        }

        private static void Update()
        {
            for (int i = 0; i <100; i++)
            {
                //獲取當前表中的count數值
                object total = dal.ExecComdToObject("select count from Students where id=1", null);
                int value = int.Parse(total.ToString()) + 1;//將數值累加1

                //將累加后的數值更新到表Students的字段count
                string sql = string.Format("update Students set count={0}", value);
                dal.ExecComd(sql);

                Console.WriteLine("寫入成功=" + i);
            }
        }

 

上面代碼,我們就開啟了1個線程進行讀取寫入,結果如下:

 

數據庫的字段count值為:

 

發現結果沒有問題,循環100次,結果是100

 

二、普通程序的並發處理

這次,我們開啟2個線程進行並發讀取,再同時寫入。我們先把數據庫的count置回0,再將代碼修改一下。

class Program
    {
        //創建數據庫連接
        private static BaseDAL dal = new BaseDAL("conName");
        static void Main(string[] args)
        {
            //開啟2線程進行處理
            new Thread(Update).Start();
            new Thread(Update).Start();

            Console.ReadLine();
        }

        private static void Update()
        {
            for (int i = 1; i <=100; i++)
            {
                //獲取當前表中的count數值
                object total = dal.ExecComdToObject("select count from Students where id=1", null);
                int value = int.Parse(total.ToString()) + 1;//將數值累加1

                //將累加后的數值更新到表Students的字段count
                string sql = string.Format("update Students set count={0}", value);
                dal.ExecComd(sql);

                Console.WriteLine("寫入成功=" + i);
            }
        }
    }

 

程序執行結果:

 

一看這程序我們就知道出問題了。那么這時候,數據庫的count字段值是多少呢?

 

為什么會出現這個結果。是因為在並發的時候,前面線程執行的結果,會被后面的update進行了覆蓋,所以值不會是200,也不會是剛好100.

那么這時候,我們的RabbitMQ就開始派上用場了(我這里就不介紹lock以及queue的使用,主要講解RabbitMQ)

 

三、RabbitMQ並發處理

(1)RabbitMQ.Server代碼處理,2線程並發,輸送200次請求

static void Main(string[] args)
        {
            //開啟2線程進行處理
            new Thread(SendMsg).Start();
            new Thread(SendMsg).Start();

            Console.ReadLine();
        }

        private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" };
        private static void SendMsg() 
        { 
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    for (int i = 1; i <= 100; i++)
                    {
                        string guid = Guid.NewGuid().ToString();
                        var body = Encoding.UTF8.GetBytes(guid);
                        channel.QueueDeclare("AllenLeeQueue", false, false, false, null);
                        channel.BasicPublish("", "AllenLeeQueue", null, body);

                        Console.WriteLine("[Set Msg To AllenLeeQueue] " + guid);
                    }
                }
            }
        }

 

(2)RabbitMQ.Client代碼處理,對發起的200個請求進行接收

//創建數據庫連接
        private static BaseDAL dal = new BaseDAL("conName");
        private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" };
        static void Main(string[] args)
        {
            factory.AutomaticRecoveryEnabled = true;//設置端口后自動恢復連接屬性即可
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [Get Msg from AllenLeeQueue] {0}", message);

                        //獲取當前表中的count數值
                        object total = dal.ExecComdToObject("select count from Students where id=1", null);
                        int value = int.Parse(total.ToString()) + 1;//將數值累加1
                        string sql = string.Format("update Students set count={0}", value);
                        dal.ExecComd(sql);
                    };

                    channel.BasicConsume(queue: "AllenLeeQueue", noAck: true, consumer: consumer);
                    Console.ReadLine();
                }
            }
        } 

 

(3)如果RabbitMQ如果處理得當,數據庫的字段count值就為200,不多不少,我們來看下數據庫字段值

 

實踐證明,RabbitMQ能很好的進行並發處理,達到了我們預期的效果。

但是可能有朋友會覺得,這傳的都是guid,如果是我們實際工作中,直接傳的是並發業務數據,又該怎么處理呢?

其實就是在傳輸的body中,傳入業務數據,再在RabbitMQ.Client進行業務數據轉化就可以了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM