在安裝和配置完成RabbitMQ之后,我們就可以嘗試做一個最簡單的例子:發送和接收消息。
我們先來看客戶端也就是發送者的代碼:
public class RabbitClient
{
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
public RabbitClient()
{
//指定要連接的RabbitMQ服務地址
factory.HostName = "localhost";
}
public void Send()
{
//定義要發送的數據
RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "this is a 請求。" };
//創建一個 AMQP 連接
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
//在MQ上定義一個隊列
channel.QueueDeclare("esbtest.rmq.consoleserver", false, false, false, null);
//序列化消息對象,RabbitMQ並不支持復雜對象的序列化,所以對於自定義的類型需要自己序列化
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
using (MemoryStream ms = new MemoryStream())
{
xs.Serialize(ms, message);
byte[] bytes = ms.ToArray();
//指定發送的路由,通過默認的exchange直接發送到指定的隊列中。
channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);
}
Console.WriteLine(string.Format("Request Message Sent, Id:{0}, Message:{1}", message.MessageId, message.Message));
}
}
}
}
在方法
channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);
中的第一個參數是需要輸入一個exchange。在RabbitMQ中,所有的消息都必須要通過exchange發送到各個queue里面去。發送者發送消息,其實也就是把消息放到exchange中去。而exchange知道應該把消息放到哪里去。在這個方法中,我們沒有輸入exchange的名稱,只是定義了一個空的echange,而在第二個參數routeKey中輸入了我們目標隊列的名稱。RabbitMQ會幫我定義一個默認的exchange,這個exchange會把消息直接投遞到
我們輸入的隊列中,這樣服務端只需要直接去這個定義了的隊列中獲取消息就可以了。
服務端的代碼:
public class RabbitServer
{
ConnectionFactory factory = null;
public void Listen()
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
//在MQ上定義一個隊列,如果名稱相同不會重復創建
channel.QueueDeclare("esbtest.rmq.consoleserver", false, false, false, null);
Console.WriteLine("Listening...");
//在隊列上定義一個消費者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("esbtest.rmq.consoleserver", true, consumer);
while (true)
{
//阻塞函數,獲取隊列中的消息
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] body = ea.Body;
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
using (MemoryStream ms = new MemoryStream(body))
{
RequestMessage message = (RequestMessage)xs.Deserialize(ms);
Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
}
}
}
}
}
}
至此,簡單的發送接收程序就可以運行了,運行RabbitMQ,然后分別運行客戶端和服務端。運行結果如圖:
客戶端:
服務端:
代碼可以從這里下載。