1、什么是RabbitMQ。詳見 http://www.rabbitmq.com/。
作用就是提高系統的並發性,將一些不需要及時響應客戶端且占用較多資源的操作,放入隊列,再由另外一個線程,去異步處理這些隊列,可極大的提高系統的並發能力。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過寫和檢索出入列隊的針對應用程序的數據(消息)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較為成熟的MQ產品有MSMQ,ActiveMQ,RabbitMQ,IBM WEBSPHERE MQ 等等。
RabbitMQ是使用Erlang開發的,開源的,一個在高級消息隊列協議(AMQP)基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
RabbitMQ支持你能想到的大多數開發語言如Java,Ruby,Python,.Net,C/C++,Erlang等等。各種語言的客戶端你可以從它的官方網站上下載。
2. RabbitMQ的特點
相比較於其他一些MQ產品,RabbitMQ的一些優勢還是比較明顯。
你可以在下面一些鏈接中找到一些比較
RabbitMQ,ActiveMQ,qpid:http://bhavin.directi.com/rabbitmq-vs-apache-activemq-vs-apache-qpid/
RabbitMQ和ZeroMQ:http://stackoverflow.com/questions/731233/activemq-or-rabbitmq-or-zeromq-or
還有四款消息隊列大比拼:http://www.oschina.net/news/17973/message-queue-shootout
RabbitMQ的一些優點:
安裝部署方便:
RabbitMQ安裝方便,有詳細的安裝文檔。
可靠性:
RabbitMQ 提供了多種多樣的特性讓你在可靠性,可用性和性能之間作出權衡,包括持久化,發送應答,發布確認等。
強大地路由功能:
所有的消息都會通過路由器轉發到各個消息隊列中,RabbitMQ內建了幾個常用的路由器,並且可以通過路由器的組合以及自定義路由器插件來完成復雜的路由功能。
Clustering:
RabbitMQ服務端可以在一個局域網中集群部署,作為一個邏輯服務器。
Federation:
Federration模式可以讓RabbitMQ服務器熱備部署,當系統中其中一個服務器失效而無法運作時,另一個服務器即可自動接手原失效系統所執行的工作。
Completing Consumer:
內置的競爭的消費者模式可以實現消費者的負載均衡。
管理工具:
RabbitMQ提供了一個簡單易用的管理界面,讓用戶可以通過瀏覽器監控和控制你的消息隊列的方方面面。
跟蹤:
如果你在使用RabbitMQ過程中出現了問題,或者得到了你不期望的結果,你可以通過打開內置Tracer功能進行跟蹤,當然這時性能就有所下降。
插件系統:
提供了強大地插件系統,讓用戶能夠方便的對MQ進行擴展。上面所說到的管理工具和Federation組件都是作為一個插件發布的。你可以選擇安裝或者不安裝。
客戶端支持:
RabbitMQ支持你能想到的大多數開發語言如Java,Ruby,Python,.Net,C/C++,Erlang等等。
強大的社區和商業支持:
強大的社區可以幫助你快速的解決關於RabbitMQ的各種問題,如果還是不能解決你的問題,VMWare還提供商業技術支持。
3、安裝
RabbitMQ服務:http://www.rabbitmq.com/download.html。
(安裝完RabbitMQ服務后,會在Windows服務中看到。如果沒有Erlang運行環境,在安裝過程中會提醒先安裝Erlang環境。http://www.erlang.org/downloads)
.net客戶端類庫:http://www.rabbitmq.com/dotnet.html
4、插件
RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,啟動此插件。
CMD中運行命令:rabbitmq-plugins enable rabbitmq_management
注:rabbitmq-plugins 所在路徑為:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\plugins
web管理工具的地址是:http://localhost:15672,初始用戶名:guest 初始密碼:guest
5. RabbitMQ的一些概念
- 連接(Connection),與RabbitMQ Server建立的一個連接,由ConnectionFactory創建,每個connection只與一個物理的Server進行連接,此連接是基於Socket進行連接的,這個可以相似的理解為像一個DB Connection。AMQP一般使用TCP鏈接來保證消息傳輸的可靠性。
- 通道 (Channel),在C#客戶端里應該是叫Model,不明白為什么這么取名字,其他客戶端基本都叫Channel。建立在Connection基礎上的一個通道,相對於Connection來說,它是輕量級的。它就像是Hibernate里面的Session一樣。Channel 主要進行相關定義,發送消息,獲取消息,事務處理等。Channel可以在多線程中使用,但是在必須保證任何時候只有一個線程執行命令。一個Connection可以有多個Channel。客戶端程序有時候會是一個多線程程序,每一個線程都想要和RabbitMQ進行連接,但是又不想共享一個連接,這種需求還是比較普遍的。因為一個Connection就是一個TCP鏈接,RabbitMQ在設計的時候不希望與每一個客戶端保持多個TCP連接,但這確實是有些客戶端的需求,所以在設計中引入了Channel的概念,每一個Channel之間沒有任何聯系,是完全分離的。多個Channel來共享一個Connection。
- 交換器(Exchange),它是發送消息的實體。
- 隊列(Queue),這是接收消息的實體。
- 綁定器(Bind),將交換器和隊列連接起來,並且封裝消息的路由信息。
6、配置
配置文件地址為:C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config,默認沒有rabbit.config文件,需要手工新建(默認會有rabbitmq.config.example 作為參考)。基於安全,做了兩個配置,如下:
[ {rabbit, [ {loopback_users, [<<"guest">>]}, {tcp_listeners, [{"127.0.0.1", 1234}, {"10.121.1.48", 8009}]} ]} ].
loopback_users:設置只能在與RabbitMq服務同一台機器上訪問服務的用戶。
tcp_listeners:設置RabbitMQ監聽的IP地址與端口。只監聽局域網內網iP、修改默認端口,防止被入侵攻擊。
設置完后,別忘記了以下操作,否則配置不起作用。
- 停止RabbitMQ服務;
- 重新安裝服務使配置生效:rabbitmq-service.bat install
此命令要切換到路徑:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin
- 啟動RabbitMQ服務;
6、Demo練習。
消息生產者:
class Program { static void Main(string[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = Constants.MqHost; factory.Port = Constants.MqPort; factory.UserName = Constants.MqUserName; factory.Password = Constants.MqPwd; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 channel.QueueDeclare("MyFirstQueue", true, false, false, null); while (true) { string customStr = Console.ReadLine(); RequestMsg requestMsg = new RequestMsg(); requestMsg.Name = string.Format("Name_{0}", customStr); requestMsg.Code = string.Format("Code_{0}", customStr); string jsonStr = JsonConvert.SerializeObject(requestMsg); byte[] bytes = Encoding.UTF8.GetBytes(jsonStr); //設置消息持久化 IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.BasicPublish("", "MyFirstQueue", properties, bytes); //channel.BasicPublish("", "MyFirstQueue", null, bytes); Console.WriteLine("消息已發送:" + requestMsg.ToString()); } } } } catch (Exception e1) { Console.WriteLine(e1.ToString()); } Console.ReadLine(); } }
class Program { static void Main(string[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = Constants.MqHost; factory.Port = Constants.MqPort; factory.UserName = Constants.MqUserName; factory.Password = Constants.MqPwd; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 channel.QueueDeclare("MyFirstQueue", true, false, false, null); //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 channel.BasicQos(0, 1, false); Console.WriteLine("Listening..."); //在隊列上定義一個消費者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //消費隊列,並設置應答模式為程序主動應答 channel.BasicConsume("MyFirstQueue", false, consumer); while (true) { //阻塞函數,獲取隊列中的消息 BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; string str = Encoding.UTF8.GetString(bytes); RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str); Console.WriteLine("HandleMsg:" + msg.ToString()); //回復確認 channel.BasicAck(ea.DeliveryTag, false); } } } } catch (Exception e1) { Console.WriteLine(e1.ToString()); } Console.ReadLine(); } }
這是個人的總結,只是簡單的安裝和使用,積累了更好的經驗在記錄下來。