本文版權歸博客園和作者吳雙本人共同所有。歡迎轉載,轉載和爬蟲請注明原文地址:http://www.cnblogs.com/tdws/p/5860668.html
想必MQ這兩個字母對於各位前輩們和老司機們並不陌生。本文初探RabbitMQ的簡單分享可能值得學習之處不怎么多,本人對於RabbitMQ的研究目前也很初級,這個月打算按照好的學習線路提高一下,歡迎新老司機留下你們的見解。
首先提到第一個簡單的場景,文件並發。我先手動實現一下文件並發,引發異常,請看如下代碼。
1 static void Main(string[] args) 2 { 3 new Thread(write1).Start(); 4 new Thread(write1).Start(); 5 new Thread(write1).Start(); 6 Console.WriteLine("等待"); 7 Console.ReadKey(); 8 } 9 public static void write1() 10 { 11 for (int i = 0; i < 10000; i++) 12 { 13 WriteLog(i); 14 } 15 //Console.ReadKey(); 16 } 17 public static void WriteLog(int i) 18 { 19 using (FileStream f = new FileStream(@"d:\\A.txt", FileMode.Append)) 20 { 21 using (StreamWriter sw = new StreamWriter(f, Encoding.Default)) 22 { 23 sw.Write(i); 24 } 25 } 26 }
我使用多線程並發向同一個append數據。相信你應該知道接下來運行起來會發生什么!
是的,正如你所料,該文件正由另一個進程使用,因此該進程無法訪問此文件。也許這個場景,就像你寫應用程序運行日志,異常日志。如果你沒使用任何插件或者組件,就只能直接向文件中append。這樣的問題,是你不得不解決的。
這個時候就是隊列出場的時候了。當然隊列的組件有很多,.NET框架下也有自帶的隊列Queue,微軟也有獨立的隊列組件MSQueue。Apache有其ActiveMQ,另外知名的消息隊列還有Equeue,ZeroMQ等。消息隊列的使用場景,大概包括解耦,提高峰值處理能力,送達和排序保證,緩沖等。
RabbitMQ是一個消息中間件,其主要的觀點很簡單:接受和轉發消息。你可以把他想象成郵局,當你發送新建到郵遞箱,你很確定郵遞員最終將會把你的新建傳遞到你的收件人手中。我們使用郵遞箱和郵遞員來隱喻RabbitMQ。關於RabbitMQ和郵局之前的主要區別在於郵局處理紙質信件,而MQ存儲和轉發二進制數據-message。下面提到RabbitMQ的幾個”行話“。
生產者意味着發送消息。一個發送消息的應用程序是一個生產者,我們稱其為"P"。
隊列queue意味着郵遞箱。他存在於RabbitMQ當中.盡管消息在RabbitMQ和你的應用程序中”流通“,他們可以被僅存在一個隊列當中。一個隊列不受任何限制,他可以存儲你想要存儲的消息量,它本質上是一個無限的緩沖區。多個生產者可以向同一個隊列發送消息,多個消費者可以嘗試從同一個消息隊列中接收數據。一個隊列像下面這樣,上面是它的隊列名稱。
消費者意味着接收,消費者是等待接收處理消息的應用程序。
最后的關系就如下圖:
接下來,我將使用RabbitMQ來解決最開始的文件並發問題。也就是說為了避免文件並發,我們要將生產者所需append到文件中的內容存入到消息隊列當中,然后取出隊列中的message讓消費者寫入到文件當中。關於RabbitMQ的安裝和配置請先看張善友老師的文章:http://www.cnblogs.com/shanyou/p/4067250.html ,寫的特別詳細。只不過歷經數月RabbitMQ C#客戶端方法上有略微的更新。
下面我將新建兩個ConsoleApp應用程序。一個負責Receive,消費者,寫文件。另一個Send,生產者,將所需寫入的數據推到RabbitMQ當中。
Receive代碼如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "WuShuang", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); WriteLog(message); }; channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } public static void WriteLog(string i) { using (FileStream f = new FileStream(@"d:\\A.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(f, Encoding.Default)) { sw.Write(i + "\n"); } } }
Send代碼如下:
static void Main(string[] args) { new Thread(write1).Start(); new Thread(write1).Start(); new Thread(write1).Start(); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } public static void write1() { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "WuShuang", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind("hello", "wsExchange", "hello"); for (int i = 0; i < 10000; i++) { string message = "Hello World ws!" + i; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "wsExchange", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } //Console.ReadKey(); }
有了代碼后,首先把Receive項目運行起來。等待生產者Send消息到隊列中,然后Receive訂閱的事件將會取出消息,並將消息寫入到文件當中。
左側的ConsoleApp持續向隊列中寫入消息,右側的Receive的ConsoleApp運行過程中,會不斷寫文件。生產者承受了多線程並發些數據,當然消費者有序的取出隊列中的數據,也不會發生最開始的文件並發異常。
今天的分享就這么簡單,關於RabbitMQ有待繼續深入學習。
如果您覺得我的點滴分享,對您有點滴幫助,歡迎點贊,也為您自己的進步點贊!
點擊下方關注,我們共同進步!
分享的過程中,總會遇到不斷地驚喜。
參考文章:
張善友老師:Windows上安裝RabbitMQ指南
RabbitMQ(開源)官方文檔 Hello world