1、什么是RabbitMQ。詳見 http://www.rabbitmq.com/。
作用就是提高系統的並發性,將一些不需要及時響應客戶端且占用較多資源的操作,放入隊列,再由另外一個線程,去異步處理這些隊列,可極大的提高系統的並發能力。
2、安裝
RabbitMQ服務:http://www.rabbitmq.com/download.html。
(安裝完RabbitMQ服務后,會在Windows服務中看到。如果沒有Erlang運行環境,在安裝過程中會提醒先安裝Erlang環境。http://www.erlang.org/downloads)
.net客戶端類庫:http://www.rabbitmq.com/dotnet.html
3、插件
RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,啟動此插件。
CMD中運行命令:rabbitmq-plugins enable rabbitmq_management
注:rabbitmq-plugins 所在路徑為:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin
web管理工具的地址是:http://localhost:15672,初始用戶名:guest 初始密碼:guest
4、配置
配置文件地址為:C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config,默認沒有rabbit.config文件,需要手工新建(默認會有rabbitmq.config.example 作為參考)。基於安全,做了兩個配置,如下:
[
{rabbit,
[
{loopback_users, [<<"guest">>]}, {tcp_listeners, [{"127.0.0.1", 1234}, {"10.121.1.48", 8009}]} ]} ].
loopback_users:設置只能在與RabbitMq服務同一台機器上訪問服務的用戶。
tcp_listeners:設置RabbitMQ監聽的IP地址與端口。只監聽局域網內網iP、修改默認端口,防止被入侵攻擊。
設置完后,別忘記了以下操作,否則配置不起作用。
- 停止RabbitMQ服務;
- 重新安裝服務使配置生效:rabbitmq-service.bat install
此命令要切換到路徑:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin
- 啟動RabbitMQ服務;
5、Demo練習。
消息生產者: class Program { static void Main(string[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = Constants.MqHost; factory.Port = Constants.MqPort; factory.UserName = Constants.MqUserName; factory.Password = Constants.MqPwd; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 channel.QueueDeclare("MyFirstQueue", true, false, false, null); while (true) { string customStr = Console.ReadLine(); RequestMsg requestMsg = new RequestMsg(); requestMsg.Name = string.Format("Name_{0}", customStr); requestMsg.Code = string.Format("Code_{0}", customStr); string jsonStr = JsonConvert.SerializeObject(requestMsg); byte[] bytes = Encoding.UTF8.GetBytes(jsonStr); //設置消息持久化 IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.BasicPublish("", "MyFirstQueue", properties, bytes); //channel.BasicPublish("", "MyFirstQueue", null, bytes); Console.WriteLine("消息已發送:" + requestMsg.ToString()); } } } } catch (Exception e1) { Console.WriteLine(e1.ToString()); } Console.ReadLine(); } }
class Program { static void Main(string[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = Constants.MqHost; factory.Port = Constants.MqPort; factory.UserName = Constants.MqUserName; factory.Password = Constants.MqPwd; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //在MQ上定義一個持久化隊列,如果名稱相同不會重復創建 channel.QueueDeclare("MyFirstQueue", true, false, false, null); //輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息 channel.BasicQos(0, 1, false); Console.WriteLine("Listening..."); //在隊列上定義一個消費者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //消費隊列,並設置應答模式為程序主動應答 channel.BasicConsume("MyFirstQueue", false, consumer); while (true) { //阻塞函數,獲取隊列中的消息 BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; string str = Encoding.UTF8.GetString(bytes); RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str); Console.WriteLine("HandleMsg:" + msg.ToString()); //回復確認 channel.BasicAck(ea.DeliveryTag, false); } } } } catch (Exception e1) { Console.WriteLine(e1.ToString()); } Console.ReadLine(); } }


