本文內容整理自https://blog.csdn.net/by_ron/category_6167618.html
能點進來相信你明白RabbitMQ是干什么的了,這個系列主要是本人根據
-
安裝Erlang
-
安裝RabbitMQ Server
-
下載客戶端dll(亦稱驅動)
一、Erlang安裝
RabbitMQ是用Erlang實現的一個高並發高可靠AMQP消息隊列服務器,分布式處理能力出眾,對程序完全透明。在安裝RabbitMQ服務之前必須先安裝erlang,否則會發生什么呢?嘿嘿,你安裝服務時會遭到提示啦。點擊
二、安裝RabbitMQ Server
三、下載驅動dll
點擊
第一篇博文,比較簡潔,着急寫完,有點像完任務似的。但無論如何,我都相信,事情會越來越好的。 什么?違反墨菲定律,沒錯,就是這么自信!
生產者–消費者模式
上一篇討論了如何搭建我們的開發環境,無論使用哪種語言,服務的部署肯定都是相同的。
既然是生產者-消費者模式,那么顯然意味着我們會有生產者和消費者兩套程序。生產者負責生產message並推送給queue,而消費者從queue中拉取消息進行處理。
生產者
首先我們需要創建一個控制台應用程序,生產者,即消息發送方,我們創建一個類Send.cs,當然,如果你願意,也可以叫Producer.cs或者F**k.cs等等。 還記否,上一篇我們已經下載好了驅動,即RabbitMQ.Client.dll,現在只要在此項目中引用即可。
代碼中這樣引用即可(哎,官網就是這么詳細,稍有常識的猿直接跳過)
using System;
using System.Text;
using RabbitMQ.Client;123
接下來就可以和RabbitMQ Server創建連接了:
class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" };//創建代理服務器實例。注意:HostName為Rabbit Server所在服務器的ip或域名,如果服務裝在本機,則為localhost,默認端口5672 using (var connection = factory.CreateConnection())//創建socket連接 { using (var channel = connection.CreateModel())//channel中包含幾乎所有的api來供我們操作queue { ... } } } }
很簡單的對不對,接下來給出Send.cs的完整代碼,為了方便理解,注釋我會寫在代碼中:
using System; using RabbitMQ.Client; using System.Text; class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { //聲明queue channel.QueueDeclare(queue: "hello",//隊列名 durable: false,//是否持久化 exclusive: false,//true:排他性,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除 autoDelete: false,//true:如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除 arguments: null);//如果安裝了隊列優先級插件則可以設置優先級 string message = "Hello World!";//待發送的消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",//exchange名稱 routingKey: "hello",//如果存在exchange,則消息被發送到名稱為hello的queue的客戶端 basicProperties: null, body: body);//消息體 Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
好的,接下來啟動程序,我們的消息就被推送到了Rabbit Server上的queue中,等待客戶端的連接,也就是等待消費者拉取。如下圖:
消費者
這次重新創建控制台應用程序,類名為Receive.cs,同理,你可以用自己舒服的單詞去命名。
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello",//指定發送消息的queue,和生產者的queue匹配 durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); //注冊接收事件,一旦創建連接就去拉取消息 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", noAck: true,//和tcp協議的ack一樣,為false則服務端必須在收到客戶端的回執(ack)后才能刪除本條消息 consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
這段代碼是不是和Send很相似呢,沒錯,在創建連接和聲明queue上沒有區別! 運行程序,效果如下:
哪怕你的Send程序已關閉,但只要運行過且成功發送了,queue就會一直保存消息,直到客戶端連接,這些消息才會一股腦兒發送給消費者。 你可以這樣實驗,在bin中debug目錄下啟動Send.exe,連續3次,然后再運行客戶端,就會收到3條消息,如下圖:
至此,我們的Hello World已經成功跑起。這個小demo當然不是僅僅用來say hello的,更多的用意是幫助我理解兔子的基本原理,提供一種高並發情形下的解決方案。相信以后公司商城發展壯大時能夠用到!!!
工作隊列
使用場景:Work Queue被用以處理大量耗時任務,與其等待任務處理完畢耗費大量cpu資源,還不如立即返回並交由代理worker隨后處理message。
消息持久化
生產者和消費者的代碼和
using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; namespace NewTask { class NewTask { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); //At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting IBasicProperties.SetPersistent to true. var properties = channel.CreateBasicProperties(); //properties.SetPersistent(true);//此方法已經過時:“這種設置方法已經被摒棄,現在使用持久化屬性取代它” properties.Persistent = true;//取代上面的寫法 ... } } }
隊列持久化通過durable: true聲明,服務重啟后隊列依然存在,但如果聲明為排他隊列exclusive: true,則不受持久化影響,連接斷開即移除。 消息持久化通過properties.Persistent = true來設置,前提是隊列持久化,否則服務宕掉后消息肯定丟失,因為消息的載體隊列都沒了。
消息回執
在一些對准確度要求比較高的場景下時,我們可能需要收到從消費者傳回的ack后才從隊列刪除。
收不到ack,消息會一直駐留在隊列中直到連接斷開,此時會發送到集群中下一個消費者去處理;
隊列永遠不斷開呢?那么消息當然永駐隊列中直到內存吃盡,這可不是好事情,所以消費端切記做好異常處理並且finally發送回執;
沒有集群則連接重新打開就會再次發送給原來的消費者;
談回worker
在我看來,worker只是消費者的另外一種叫法,只是它的功能更加特殊,本文開頭也說了,消費端基本上聲明隊列,注冊接受事件這些步驟都一樣,只是配置項的不同罷了,看下worker的配置:
using System; using System.Text; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Worker { class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//prefetchCount:This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy(Fair dispatch(公平分發):它會選擇空閑的worker去處理消息,適用於對性能敏感的場景;Round-robin dispatching(輪循分發):不解釋) Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//用以處理完畢后發送ack通知服務端 }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
注意這句代碼以及注釋:channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false) 下面是BasicQops方法的說明:
BasicQops用於設置任務分發,只有收到前一條消息的回執才會發給當前worker,否則輪循下一個worker。如果所有worker都是忙碌,那么建議添加worker設備。
說明worker的特點就是單線程處理消息,如果處於忙碌狀態(未發送回執),則不會收到隊列推送的消息。
小結