RabbitMQ原理與相關操作(一)


小編是菜鳥一枚,最近想試試MQ相關的技術,所以自己看了下RabbitMQ官網,試着寫下自己的理解與操作的過程。

剛開始的第一篇,原理只介紹 生產者、消費者、隊列,至於其他的內容,會在后續中陸續補齊。

引入MQ話題

什么時候會用到MQ

可能很多人有疑惑:MQ到底是什么?哪些場景下要使用MQ?
前段時間安裝了RabbitMQ,現在就記錄下自己的學習心得吧。
首先看段程序:

class Program
    {
        static void Main(string[] args)
        {
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
        }
        
        public static void WriteLog(int i)
        {
            using (FileStream f = new FileStream(@"d:\\test.txt", FileMode.Append))
            {
                using (StreamWriter sw = new StreamWriter(f, Encoding.Default))
                {
                    sw.Write(i);
                }
            }
        }

        public static void Write()
        {
            for (int i = 0; i < 10000; i++)
            {
                WriteLog(i);
            }
        }
    }
View Code

僅僅從代碼上看,沒有覺得任何問題對吧?編譯也是通過的,但是執行時,出現一個問題:

 當然,這僅僅是一個小的案例,類似這種多線程寫文件造成的問題, 就應該使用MQ了。

MQ的使用場景大概包括解耦,提高峰值處理能力,送達和排序保證,緩沖等。

MQ概述

消息隊列技術是分布式應用間交換信息的一種技術。

消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。

通過消息隊列,應用程序可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程序接收此消息。

MQ主要作用是接受和轉發消息。你可以想想在生活中的一種場景:當你把信件的投進郵筒,郵遞員肯定最終會將信件送給收件人。我們可以把MQ比作 郵局和郵遞員

MQ和郵局的主要區別是,它不處理消息,但是,它會接受數據、存儲消息數據、轉發消息。

RabbitMQ術語

生產者

消息發送者,在MQ中被稱為生產者(producer),一個發送消息的應用也被叫做生產者,用P表示

消費者:

生產者“生產”出消息后,最終由誰消費呢?等待接受消息的應用程序,我們稱之為消費者(Consuming ),用C表示

隊列:

消息只能存儲在隊列(queue )中。盡管消息在rabbitMQ和應用程序間流通,但是隊列卻是存在於RabbitMQ內部。

一個隊列不受任何限制,它可以存儲你想要存儲的消息量,它本質上是一個無限的緩沖區。

多個生產者可以向同一個隊列發送消息,多個消費者可以嘗試從同一個消息隊列中接收數據。

一個隊列像下面這樣(上面是它的隊列名稱)

注意:

生產者、消費者、中間件不必在一台機器上,實際應用中也是絕大多數不在一起的。我們可以用一張圖表示RabbitMQ的構造:

 

注:此圖片摘自於百度百科RabbitMQ

使用RabbitMQ解決多線程寫入文件問題

分析

多線程寫入,產生消息的也就是一個程序(一個生產者P),消費消息的也是一個消息,它的模型應該是:

編寫代碼

引入RabbitMQ client DLL

程序包管理控制台命令:

PM> Install-Package RabbitMQ.Client

生產者

首先,創建一個 connection 通過socket連接 去和服務器連接起來(需要傳目的服務器的IP、用戶名、密碼等)。

接着 創建一個 channel ,這是大部分的要做的事情所在。

要發送消息,我們必須聲明一個隊列,,然后我們可以向隊列發布消息。

執行一次BasicPublish方法,推送一個消息。

class Program
    {
        static void Main(string[] args)
        {
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
        }

        public static void Write()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null);
                for (int i = 0; i < 8000; i++)
                {
                    string message = i.ToString();
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);
                    Console.WriteLine("Program Sent {0}", message);
                }
            }
        }
    }
View Code

聲明的隊列,在服務器中如果不存在了,會自動創建。而消息的內容是字節數組,在使用時,注意編碼問題。

消費者

當隊列里有消息時,消費者要隨時能夠從隊列里獲取消息,所以我需要一直運行它,讓它監聽消息。

就像我們打籃球進行傳球,需要事先確認要傳給的那個隊友位置一樣,生產者要發送消息,一定要事先知道消費消息的程序的對列是哪個。所以,在運行生產者程序前,需要先啟動消費者程序。

由此,聲明對列,就應該在消費者程序中完成。

class Program
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost ="/"};
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "writeLog",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    ExcuateWriteFile(message);
                    Console.WriteLine(" Receiver Received {0}", message);
                };
                channel.BasicConsume(queue: "writeLog",
                                     noAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
        public static void ExcuateWriteFile(string i)
        {
            using (FileStream fs = new FileStream(@"d:\\test.txt", FileMode.Append))
            {
                using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode))
                {
                    sw.Write(i);
                }
            }
        }
    }
View Code

執行程序

先執行 消費者程序,讓它一直保持監聽。

錯誤解決

執行時VS報錯:

“RabbitMQ.Client.Exceptions.BrokerUnreachableException”類型的未經處理的異常在 RabbitMQ.Client.dll 中發生 其他信息: None of the specified endpoints were reachable。

進入查看詳細的內部異常:

innerEception:{"The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=\"NOT_ALLOWED - access to vhost '/' refused for user 'eric'\", classId=10, methodId=40, cause="}

此時,我們打開在http://localhost:15672/#/users 可以看到eric 下 的Can access virtual hosts 為 NoAccess

解決辦法:

rabbitmqctl控制台輸入

rabbitmqctl set_permissions -p / userName "." "." ".*"

再次執行時,可以看到:

 

 然后運行 生產者程序。

 我們先開着 Receive ,當生產者運行時

 

消費者的自動觸發執行 :

直到所有的 指定的 queue 里面的消息完全消費完為止。(此時消費者程序仍然在監聽中)

 

對於需要安裝和設置用戶的同學,請參考 windows下 安裝 rabbitMQ 及操作常用命令

 

本文參考:

rabbitMq外文網

百度百科

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM