RabbitMQ的簡單使用


RabbitMQ的關鍵對象概念介紹

  生產者(producer):負責生產消息,可以有多個生產者,可以理解為生成消息的那部分邏輯

  消費者(consumer):從隊列中獲取消息,對消息處理的那部分邏輯

  隊列(queue):用於存放消息,可以理解為先進先出的一個對象

  交換機(exchange):顧名思義,就是個中介的角色,將接收到的消息按不同的規則轉發到其他交換機或者隊列中

  路由(route):就是交換機分發消息的規則,交換機可以指定路由規則,生產者在發布消息時也可以指定消息路由,比如交換機中設置A路由表示將消息轉發到隊列1,B路由表示將消息轉發到隊列2,那么當交換機接收到消息時,如果消息的路由滿足A路由,則將消息轉發到隊列1,如果滿足B路由則將消息轉發到隊列2

  虛擬主機(virtual host):虛擬地址,用於進行邏輯隔離,一個虛擬主機里面可以有若干個 exchange 和 queue,但是里面不能有相同名稱的 exchange 或 queue

 

RabbitMQ的工作模式

 

簡單的來說,就是生產者將消息發布到rabbitmq上,然后消費者連接rabbitmq,獲取到消息就消費,但是有幾點說明一下

1、rabbitmq中的消息是可被多次消費的,因為rabbitmq提供了ack機制,當消費者在消費消息時,如果將自動ack設置成false,那么需要手動提交ack才能告訴rabbitmq消息已被使用,否則當通道關閉時,消息會繼續呆在隊列中等待消費

2、當存在多個消費者時,默認情況下,一個消費者獲取一個消息,處理完成后再獲取下一個,但是rabbitmq消費一次性獲取多個,當然后當這些消息消費完成后,再獲取下一批,這也就是rabbitmq的Qos機制

Qos機制:當生產者將消息發布到rabbitmq之后,如果在未配置QOS的情況下,rabbitmq盡可能快速地發送隊列中的所有消息到消費者端,如果消息比較多,消費者來不及處理,就會緩存這些消息,當消息堆積過多,可能導致服務器內存不足而影響其他進程,rabbitmq的QOS可以很好的解決這類問題,QOS就是限制消費者一次性從rabbitmq中獲取消息的個數,而不是獲取所有消息。比如設置rabbitmq的QOS為10,也就是prefetch=10,就是說,哪怕rabbitmq中有100條消息,消費者也只是一次性獲取10條,然后消費者消費這10條消息,剩下的交給其他消費者,當10條消息中的unacked個數少於prefetch * 消費者數目時,會繼續從rabbitmq獲取消息,如果在工作模式中,不使用QOS,你會發現,所有的消息都被一個消費者消費了

 

C#中使用RabbitMQ

  首先,我們創建了兩個Demo項目:RabbitMQReceived(消費者)和RabbitMQSend(生產者),分別使用使用nuget安裝RabbitMQ.Client:

 

 

 

 RabbitMQSend Program腳本

如下:

class Program
    {
        static void Main(string[] args)
        {
            //string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            string hosts = "localhost";
            int port = 5672;
            string userName = "guest";
            string password = "guest";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            #region 普通模式、工作模式
            //string queue = "queue1";//隊列名稱

            ////1、創建一個通道
            ////此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            //var channel = connection.CreateModel();
            ////給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
            //var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            ////2、聲明(創建)對列
            //// 第一個參數,queueName:對列名稱。數據類型:String
            //// 第二個參數,durable:是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫。數據類型:boolean
            //// 第三個參數,exclusive:是否排外的。數據類型:boolean
            //// 第四個參數,autoDelete:是否自動刪除。數據類型:boolean
            //// 第五個參數,arguments:參數。數據類型:Map<String, Object>
            ////channel.queueDeclare(queueName, true, false, false, null);
            //channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
            #endregion

            //3、創建交換機
            // 第一個參數,exchange:交換機名稱。數據類型:String
            // 第二個參數,type:交換機的類型(direct/topic/fanout)。數據類型:String
            //channel.exchangeDeclare(exchange, type);


            //4、綁定交換機和隊列
            //第一個參數,queueName:對列名稱。數據類型:String
            //第二個參數,exchange:交換機名稱。數據類型:String 
            //第三個參數,routingKey:隊列跟交換機綁定的鍵值。數據類型:String
            //channel.queueBind(queueName, exchange, routingKey);

            #region 訂閱模式
            ////創建一個通道
            ////此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            //var channel = connection.CreateModel();
            //string exchange = "demo.fanout";//交換機名稱
            //string exchangeType = "fanout";//交換機類型

            ////給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列
            //var arguments = new Dictionary<string, object>() { };
            //channel.QueueDeclare(queue: "queue2", durable: true, exclusive: false, autoDelete: false, arguments: arguments);
            //channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments);
            //channel.QueueBind("queue1", exchange, "");
            //channel.QueueBind("queue2", exchange, "");

            #endregion

            #region 路由模式

            ////創建一個通道
            ////此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            //var channel = connection.CreateModel();
            //string exchange_dir = "demo.direct";//交換機名稱
            //string exchangeType_dir = "direct";//交換機類型

            ////給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列
            //var arguments_dir = new Dictionary<string, object>() { };
            //channel.QueueDeclare(queue: "queue3", durable: true, exclusive: false, autoDelete: false, arguments: arguments_dir);
            //channel.QueueDeclare(queue: "queue4", durable: true, exclusive: false, autoDelete: false, arguments: arguments_dir);
            //channel.ExchangeDeclare(exchange: exchange_dir, type: exchangeType_dir, durable: true, autoDelete: false, arguments: arguments_dir);

            //string[] routes = new string[] { "apple", "banana" };
            //channel.QueueBind("queue3", exchange_dir, routes[0]);
            //channel.QueueBind("queue4", exchange_dir, routes[1]);

            #endregion

            #region 主題模式
            //創建一個通道
            //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            var channel = connection.CreateModel();
            string exchange_top = "demo.topic";//交換機名稱
            string exchangeType_top = "topic";//交換機類型

            //給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列
            var arguments_dir = new Dictionary<string, object>() { };
            channel.QueueDeclare(queue: "queue5", durable: true, exclusive: false, autoDelete: false, arguments: arguments_dir);
            channel.QueueDeclare(queue: "queue6", durable: true, exclusive: false, autoDelete: false, arguments: arguments_dir);
            channel.ExchangeDeclare(exchange: exchange_top, type: exchangeType_top, durable: true, autoDelete: false, arguments: arguments_dir);

            string[] routes = new string[] { "apple.", "banana." };
            channel.QueueBind("queue5", exchange_top, routes[0]);
            channel.QueueBind("queue6", exchange_top, routes[1]);
            #endregion
            //發布10條消息
            for (var i = 0; i < 10; i++)
            {
                var buffer = Encoding.UTF8.GetBytes("is my send msg " + i.ToString());
                //普通模式、工作模式
                //channel.BasicPublish("", queue, null, buffer);

                //訂閱模式
                //channel.BasicPublish(exchange, "", null, buffer);

                //路由模式
                //channel.BasicPublish(exchange_dir, routes[i % 2], null, buffer);

                //主題模式
                channel.BasicPublish(exchange_top, routes[i % 2], null, buffer);

            }
            channel.Close();

            Console.ReadKey();
        }
    }

簡要說明:

1、分別包含普通模式、工作模式非使用交換機情景,另外使用交換機情景包含訂閱模式、路由模式和主題模式,創建通道、隊列、綁定通道隊列,發送消息等等;

direct:路由模式,就是將消息的路由和交換機的綁定路由作比較,當兩者一致時,則匹配成功,然后消息就會被轉發到這個綁定路由后的隊列或者交換機

fanout:訂閱模式,這種類型的交換機是不需要指定路由的,當交換機接收到消息時,會將消息廣播到所有綁定到它的所有隊列或交換機中

topic:主題類型,類似direct類型,只不過在將消息的路由和綁定路由做比較時,是通過特定表達式去比較的,其中# 匹配一個或多個,* 匹配一個

2、普通模式、工作模式、訂閱模式、路由模式和主題模式都帶有說明,包括發布消息,每次只需要釋放其中一塊注釋代碼即可,當前腳本目前釋放是主題模式;

3、普通模式和工作模式使用的queue1(10條),訂閱模式queue1和queue2(各10條),路由模式queue3/key為apple和queue4/key為banana(各5條),主題模式queue5/key為apple.和queue6/key為banana.(各5條);

 

 RabbitMQRecevied Program腳本

無論是使用交換機場景還是不使用交換機場景各種模式就是發布消息方式的不一樣,消費者當然還是從隊列獲取消息消費的如下:

class Program
    {
        static void Main(string[] args)
        {
            //string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            string hosts = "localhost";
            int port = 5672;
            string userName = "guest";
            string password = "guest";
            string virtualHost = "/";


            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            #region 單消費者
            //string queue = "queue1";//隊列名稱

            ////創建一個通道
            ////此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            //var channel = connection.CreateModel();
            ////給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
            //var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
            //channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
            ////channel.BasicQos(2, 2, false);//設置QOS

            ////在通道中定義一個事件消費者
            //EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            //consumer.Received += (sender, e) =>
            //{
            //    var body = e.Body.ToArray();
            //    string message = Encoding.UTF8.GetString(body);
            //    Console.WriteLine($"接收到消息:{message}");

            //    Thread.Sleep(500);//暫停一下

            //    //通知消息已被處理,如果沒有,那么消息將會被重復消費
            //    channel.BasicAck(e.DeliveryTag, false);
            //};
            ////ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息
            //channel.BasicConsume(queue, false, consumer);
            #endregion

            #region 多消費者
            //Consumer(connection, 1);
            //Consumer(connection, 2);
            #endregion

            #region 訂閱模式
            //Consumer(connection, "queue1");//消費者1
            //Consumer(connection, "queue2");//消費者2
            #endregion

            #region 路由模式
            //Consumer(connection, "queue3");//消費者1
            //Consumer(connection, "queue4");//消費者2
            #endregion


            #region 主題模式
            Consumer(connection, "queue5");//消費者1
            Consumer(connection, "queue6");//消費者2
            #endregion

            Console.ReadKey();
        }

        static void Consumer(IConnection connection, ushort perfetch)
        {
            new Thread(() =>
            {
                int threadId = Thread.CurrentThread.ManagedThreadId;//線程Id,用於區分消費者
                string queue = "queue1";//隊列名稱

                //創建一個通道
                //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
                var channel = connection.CreateModel();
                //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

                //設置消費者每次獲取的消息數量,可以用來設置消費者消費的權重
                //必須等獲[取的消息都消費完成后才能重新獲取
                channel.BasicQos(0, perfetch, true);//設置QOS

                //在通道中定義一個事件消費者
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    var body = e.Body.ToArray();
                    string message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"ThreadId:【{threadId}】——>接收到消息:{message}");

                    Thread.Sleep(500);//暫停一下

                    //通知消息已被處理,如果沒有,那么消息將會被重復消費
                    channel.BasicAck(e.DeliveryTag, false);
                };
                //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息
                channel.BasicConsume(queue, false, consumer);
            }).Start();
        }

        static void Consumer(IConnection connection, string queue)
        {
            new Thread(() =>
            {
                int threadId = Thread.CurrentThread.ManagedThreadId;//線程Id,用於區分消費者

                //創建一個通道
                //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
                var channel = connection.CreateModel();
                //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

                //設置消費者每次獲取的消息數量,可以用來設置消費者消費的權重
                //必須等獲[取的消息都消費完成后才能重新獲取
                //channel.BasicQos(0, perfetch, true);//設置QOS

                //在通道中定義一個事件消費者
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    var body = e.Body.ToArray();
                    string message = Encoding.UTF8.GetString(body);
                    Console.WriteLine($"ThreadId:【{threadId}】——>接收到消息:{message}");

                    Thread.Sleep(500);//暫停一下

                    //通知消息已被處理,如果沒有,那么消息將會被重復消費
                    channel.BasicAck(e.DeliveryTag, false);
                };
                //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息
                channel.BasicConsume(queue, false, consumer);
            }).Start();

        }
    }

其中也是包含了各種模式的消費隊列,每次只需要釋放一種類型腳本即可,當前腳本目前釋放是主題模式


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM