C#中使用消息隊列RabbitMQ
1、什么是RabbitMQ。詳見 http://www.rabbitmq.com/。
作用就是提高系統的並發性,將一些不需要及時響應客戶端且占用較多資源的操作,放入隊列,再由另外一個線程,去異步處理這些隊列,可極大的提高系統的並發能力。
2、安裝
A.如果沒有Erlang運行環境,在安裝過程中會提醒先安裝Erlang環境。http://www.erlang.org/downloads
注意安裝完成后必須配置環境變量:
計算機->屬性->高級系統設置 ->環境變量 中的系統變量中新建一個變量ERL_HOME 值為本機中erlang的安裝目錄(如:D:\Program Files\erl8.2\),然后再在用戶變量 PATH中添加上erlang的安裝目錄\bin(如D:\Program Files\erl8.2\bin); 就OK了
不然可能會出現:rabbitmq unable to connect to node 錯誤
看是否設置成功:打開 cmd ,輸入 erl 后回車,如果看到如下的信息,表明安裝成功。

B. RabbitMQ服務:http://www.rabbitmq.com/download.html。
C. .net客戶端類庫:http://www.rabbitmq.com/dotnet.html
默認web管理工具的地址是:http://localhost:15672,初始用戶名:guest 初始密碼:guest
3、配置
配置文件地址為:%APPDATA%\RabbitMQ\ ,默認沒有rabbit.config文件,需要手工新建(默認會有rabbitmq.config.example 作為參考)。基於安全,做了兩個配置,如下:
|
1
2
3
|
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, [
"guest"
]}]}
].
|
loopback_users:設置只能在與RabbitMq服務同一台機器上訪問服務的用戶。
tcp_listeners:設置RabbitMQ監聽的IP地址與端口。只監聽局域網內網iP、修改默認端口,防止被入侵攻擊。
設置完后,別忘記了以下操作,否則配置不起作用。
- 停止RabbitMQ服務;
- 重新安裝服務使配置生效:rabbitmq-service.bat install
此命令要切換到路徑:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin
- 啟動RabbitMQ服務;
4、配置完成運行命令
rabbitmq-service remove
rabbitmq-service install
rabbitmq-service start
5、Demo練習。
聯系前先測試前面的東西是否安裝成功:在...\RabbitMQ Server\rabbitmq_server-3.4.0\sbin 運行命令:rabbitmqctl status
出現下圖則安裝正確:

消息生產者:
class Program
{
static void Main(string[] args)
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.MqUserName;
factory.Password = Constants.MqPwd;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定義一個持久化隊列,如果名稱相同不會重復創建
channel.QueueDeclare("MyFirstQueue", true, false, false, null);
while (true)
{
string customStr = Console.ReadLine();
RequestMsg requestMsg = new RequestMsg();
requestMsg.Name = string.Format("Name_{0}", customStr);
requestMsg.Code = string.Format("Code_{0}", customStr);
string jsonStr = JsonConvert.SerializeObject(requestMsg);
byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
//設置消息持久化
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("", "MyFirstQueue", properties, bytes);
//channel.BasicPublish("", "MyFirstQueue", null, bytes);
Console.WriteLine("消息已發送:" + requestMsg.ToString());
}
}
}
}
catch (Exception e1)
{
Console.WriteLine(e1.ToString());
}
Console.ReadLine();
}
}
class Program
{
static void Main(string[] args)
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.MqUserName;
factory.Password = Constants.MqPwd;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定義一個持久化隊列,如果名稱相同不會重復創建
channel.QueueDeclare("MyFirstQueue", true, false, false, null);
//輸入1,那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息
channel.BasicQos(0, 1, false);
Console.WriteLine("Listening...");
//在隊列上定義一個消費者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//消費隊列,並設置應答模式為程序主動應答
channel.BasicConsume("MyFirstQueue", false, consumer);
while (true)
{
//阻塞函數,獲取隊列中的消息
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
string str = Encoding.UTF8.GetString(bytes);
RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
Console.WriteLine("HandleMsg:" + msg.ToString());
//回復確認
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
catch (Exception e1)
{
Console.WriteLine(e1.ToString());
}
Console.ReadLine();
}
}



