C#之RabbitMQ


本文內容整理自https://blog.csdn.net/by_ron/category_6167618.html

RabbitMQ–環境搭建

能點進來相信你明白RabbitMQ是干什么的了,這個系列主要是本人根據RabbitMQ官網C#的Tutorials總結出來,旨在為新手提供手把手教學。接受大牛批評指正,共同學習,共同促進。

  • 安裝Erlang

  • 安裝RabbitMQ Server

  • 下載客戶端dll(亦稱驅動)

一、Erlang安裝

RabbitMQ是用Erlang實現的一個高並發高可靠AMQP消息隊列服務器,分布式處理能力出眾,對程序完全透明。在安裝RabbitMQ服務之前必須先安裝erlang,否則會發生什么呢?嘿嘿,你安裝服務時會遭到提示啦。點擊Erlang下載地址,進入下載頁面Erlang 根據自己的Windows版本選擇32位或者64位下載安裝即可。安裝沒什么需要注意的(安裝目錄默認C盤),下一步到finish即可。


二、安裝RabbitMQ Server

RabbitMQ Server下載地址 img 不用多說,安裝點擊下一步至finish即可。 實際應用中,肯定是要安裝在專門服務器上。本文為演示,安裝在自己電腦上指定端口,模擬分布式環境(呃,有點牽強,呵呵!)。 你一定還想知道這個服務是用來干嘛的,問得好。這個服務才是兔子的大腦,我們所有的操作都是通過服務來調度的。簡單點理解,他就是一個大隊列,我們product產生消息之后由它推送給customer


三、下載驅動dll

點擊dll下載地址進入下載頁面

img

下載后

這里寫圖片描述

RabbitMQ.client.dll封裝了訪問服務端所需的API,重要的事情說三遍, 只支持dotnet-4.5以上的項目! 只支持dotnet-4.5以上的項目! 只支持dotnet-4.5以上的項目!

第一篇博文,比較簡潔,着急寫完,有點像完任務似的。但無論如何,我都相信,事情會越來越好的。 什么?違反墨菲定律,沒錯,就是這么自信!

生產者–消費者模式

上一篇討論了如何搭建我們的開發環境,無論使用哪種語言,服務的部署肯定都是相同的。

摘自官網:RabbitMQ is a message broker. In essence, it accepts messages from producers, and delivers them to consumers. In-between, it can route, buffer, and persist the messages according to rules you give it.(譯文:兔子就是消息代理,本質上,它接受來自producers的消息然后分發給consumers,在此過程中,它能夠根據你定制的規則來路由緩存持久化消息)。 本文開始將基於生產者-消費者模式創建第一個項目,hello world,是否感覺很親切呢!

既然是生產者-消費者模式,那么顯然意味着我們會有生產者和消費者兩套程序。生產者負責生產message並推送給queue,而消費者從queue中拉取消息進行處理。


生產者

首先我們需要創建一個控制台應用程序,生產者,即消息發送方,我們創建一個類Send.cs,當然,如果你願意,也可以叫Producer.cs或者F**k.cs等等。 還記否,上一篇我們已經下載好了驅動,即RabbitMQ.Client.dll,現在只要在此項目中引用即可。

代碼中這樣引用即可(哎,官網就是這么詳細,稍有常識的猿直接跳過)

using System;
using System.Text;
using RabbitMQ.Client;123

接下來就可以和RabbitMQ Server創建連接了:

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };//創建代理服務器實例。注意:HostName為Rabbit Server所在服務器的ip或域名,如果服務裝在本機,則為localhost,默認端口5672
        using (var connection = factory.CreateConnection())//創建socket連接
        {
            using (var channel = connection.CreateModel())//channel中包含幾乎所有的api來供我們操作queue
            {
                ...
            }
        }
    }
}

很簡單的對不對,接下來給出Send.cs的完整代碼,為了方便理解,注釋我會寫在代碼中:

using System;
using RabbitMQ.Client;
using System.Text;
​
class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            //聲明queue
            channel.QueueDeclare(queue: "hello",//隊列名
                                 durable: false,//是否持久化
                                 exclusive: false,//true:排他性,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除
                                 autoDelete: false,//true:如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除
                                 arguments: null);//如果安裝了隊列優先級插件則可以設置優先級
string message = "Hello World!";//待發送的消息
            var body = Encoding.UTF8.GetBytes(message);
​
            channel.BasicPublish(exchange: "",//exchange名稱
                                 routingKey: "hello",//如果存在exchange,則消息被發送到名稱為hello的queue的客戶端
                                 basicProperties: null,
                                 body: body);//消息體
            Console.WriteLine(" [x] Sent {0}", message);
        }
​
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

 

好的,接下來啟動程序,我們的消息就被推送到了Rabbit Server上的queue中,等待客戶端的連接,也就是等待消費者拉取。如下圖:

 

消費者

這次重新創建控制台應用程序,類名為Receive.cs,同理,你可以用自己舒服的單詞去命名。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
​
class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",//指定發送消息的queue,和生產者的queue匹配
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
​
            var consumer = new EventingBasicConsumer(channel);
            //注冊接收事件,一旦創建連接就去拉取消息
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello",
                                 noAck: true,//和tcp協議的ack一樣,為false則服務端必須在收到客戶端的回執(ack)后才能刪除本條消息
                                 consumer: consumer);
​
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

 

這段代碼是不是和Send很相似呢,沒錯,在創建連接和聲明queue上沒有區別! 運行程序,效果如下:

img

哪怕你的Send程序已關閉,但只要運行過且成功發送了,queue就會一直保存消息,直到客戶端連接,這些消息才會一股腦兒發送給消費者。 你可以這樣實驗,在bin中debug目錄下啟動Send.exe,連續3次,然后再運行客戶端,就會收到3條消息,如下圖:

img

至此,我們的Hello World已經成功跑起。這個小demo當然不是僅僅用來say hello的,更多的用意是幫助我理解兔子的基本原理,提供一種高並發情形下的解決方案。相信以后公司商城發展壯大時能夠用到!!!

 

 

工作隊列

使用場景:Work Queue被用以處理大量耗時任務,與其等待任務處理完畢耗費大量cpu資源,還不如立即返回並交由代理worker隨后處理message。

消息持久化

生產者和消費者的代碼和上一節Publish-Consumer基本相同,唯一不同的是配置項的參數調整。代碼如下:

using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;
​
namespace NewTask
{
    class NewTask
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                //At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting IBasicProperties.SetPersistent to true.
                var properties = channel.CreateBasicProperties();
                //properties.SetPersistent(true);//此方法已經過時:“這種設置方法已經被摒棄,現在使用持久化屬性取代它”
                properties.Persistent = true;//取代上面的寫法
​
                ...
        }
    }
}

 

隊列持久化通過durable: true聲明,服務重啟后隊列依然存在,但如果聲明為排他隊列exclusive: true,則不受持久化影響,連接斷開即移除。 消息持久化通過properties.Persistent = true來設置,前提是隊列持久化,否則服務宕掉后消息肯定丟失,因為消息的載體隊列都沒了。

消息回執

在一些對准確度要求比較高的場景下時,我們可能需要收到從消費者傳回的ack后才從隊列刪除。

  • 收不到ack,消息會一直駐留在隊列中直到連接斷開,此時會發送到集群中下一個消費者去處理;

  • 隊列永遠不斷開呢?那么消息當然永駐隊列中直到內存吃盡,這可不是好事情,所以消費端切記做好異常處理並且finally發送回執;

  • 沒有集群則連接重新打開就會再次發送給原來的消費者;

談回worker

在我看來,worker只是消費者的另外一種叫法,只是它的功能更加特殊,本文開頭也說了,消費端基本上聲明隊列,注冊接受事件這些步驟都一樣,只是配置項的不同罷了,看下worker的配置:

 
using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
​
namespace Worker
{
    class Worker
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
​
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//prefetchCount:This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy(Fair dispatch(公平分發):它會選擇空閑的worker去處理消息,適用於對性能敏感的場景;Round-robin dispatching(輪循分發):不解釋)
​
                Console.WriteLine(" [*] Waiting for messages.");
​
                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "task_queue",
                                     noAck: false,
                                     consumer: consumer);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
​
                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);
​
                    Console.WriteLine(" [x] Done");
​
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//用以處理完畢后發送ack通知服務端
                };
​
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

注意這句代碼以及注釋:channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false) 下面是BasicQops方法的說明:

BasicQops用於設置任務分發,只有收到前一條消息的回執才會發給當前worker,否則輪循下一個worker。如果所有worker都是忙碌,那么建議添加worker設備。

說明worker的特點就是單線程處理消息,如果處於忙碌狀態(未發送回執),則不會收到隊列推送的消息。

 

 

小結:通過隊列聲明durable: true設置隊列持久化,並無多大意義,只是為消息持久化做鋪墊;消息持久化通過properties.Persistent = true或者properties.DeliveryMode = 2進行設置;消息回執需要注意處理消費端的代碼保證ack總是返回給隊列;worker實際上就是一種單線程機制的消費者


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM