眾所周知RabbitMQ使用的是AMQP協議。我們知道AMQP是一種網絡協議,能夠支持符合要求的客戶端應用和消息中間件代理之間進行通信。
其中消息代理扮演的角色就是從生產者那兒接受消息,並根據既定的路由規則把接受到的消息發送給消息的處理者又稱消費者。由此可以看出RabbitMQ在整個消息發送,處理的過程中有三個比較重要的角色:
生產者:producer,消息生產者,就是投遞消息的程序
消息代理:broker,簡單來說就是消息隊列服務器實體,這里簡單理解為我們安裝的RabbitMQ服務
消費者:consumer,消息消費者,就是接受消息的程序
接下來我們將以一個簡單的控制台程序來實現消息隊列的發送及接收(使用.NET版RabbitMQ客戶端):
主要功能為: 一個producer發送消息,一個consumer接收消息,並在控制台打印出來。
使用Nuget添加RabbitMQ.Client程序包至項目中
Install-Package RabbitMQ.Client
創建消息的生產者 Producer.cs ,發送一條消息給消費者
-
using RabbitMQ.Client;
-
using System;
-
using System.Text;
-
-
namespace RabbitMQProducer
-
{
-
public class Producer
-
{
-
public static void Send()
-
{
-
//創建連接連接到RabbitMQ服務器,就是一個位於客戶端和Broker之間的TCP連接,建議共用此TCP連接,每次使用時創建一個新的channel即可,
-
var factory = new ConnectionFactory();
-
IConnection connection = null;
-
//方式1:使用AMQP協議URL amqp://username:password@hostname:port/virtual host 可通過http://127.0.0.1:15672/ RabbitMQWeb管理頁面查看每個參數的具體內容
-
factory.Uri = "amqp://guest:guest@127.0.0.1:5672//";
-
connection = factory.CreateConnection();
-
-
////方式2:使用ConnectionFactory屬性賦值
-
//factory.UserName = ConnectionFactory.DefaultUser;
-
//factory.Password = ConnectionFactory.DefaultPass;
-
//factory.VirtualHost = ConnectionFactory.DefaultVHost;
-
//factory.HostName = "127.0.0.1"; //設置RabbitMQ服務器所在的IP或主機名
-
//factory.Port = AmqpTcpEndpoint.UseDefaultPort;
-
//connection = factory.CreateConnection();
-
-
////方式3:使用CreateConnection方法創建連接,默認使用第一個地址連接服務端,如果第一個不可用會依次使用后面的連接
-
//List<AmqpTcpEndpoint> endpoints = new List<AmqpTcpEndpoint>() {
-
// new AmqpTcpEndpoint() { HostName="localhost1",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost2",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost3",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost4",Port=5672}
-
//};
-
//connection = factory.CreateConnection(endpoints);
-
-
using (connection)
-
{
-
//創建一個消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。類似與Hibernate中的Session
-
//AMQP協議規定只有通過channel才能指定AMQP命令,所以僅僅在創建了connection后客戶端還是不能發送消息的,必須要創建一個channel才行
-
//RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,但是建議盡量共用Connection
-
using (IModel channel = connection.CreateModel())
-
{
-
//創建一個queue(消息隊列)
-
channel.QueueDeclare(
-
queue: "hello",
-
durable: false,
-
exclusive: false,
-
autoDelete: false,
-
arguments: null);
-
-
string message = "你好消費者,我是生產者發送的消息";
-
-
//往隊列中發出一條消息 使用了默認交換機並且綁定路由鍵(route key)與隊列名稱相同
-
channel.BasicPublish(
-
exchange: "",
-
routingKey: "hello",
-
basicProperties: null,
-
body: Encoding.UTF8.GetBytes(message));
-
-
Console.WriteLine($"我是生產者,我發送了一條消息{message}");
-
-
Console.WriteLine(" Press [enter] to exit.");
-
Console.ReadLine();
-
}
-
}
-
}
-
}
-
}
注意:1.隊列只會在它不存在的時候創建,多次聲明並不會重復創建。
2.信息的內容是字節數組,也就意味着可以傳遞任何數據。
3.創建消息的消費者Consumer.cs ,從隊列中獲取消息並打印到屏幕
-
using RabbitMQ.Client;
-
using RabbitMQ.Client.Events;
-
using System;
-
using System.Text;
-
-
namespace RabbitMQConsumer
-
{
-
public class Consumer
-
{
-
public static void Receive()
-
{
-
var factory = new ConnectionFactory();
-
factory.Uri = "amqp://guest:guest@127.0.0.1:5672//";
-
using (var connection = factory.CreateConnection())
-
{
-
using (var channel = connection.CreateModel())
-
{
-
//聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列
-
channel.QueueDeclare(
-
queue: "hello",
-
durable: false,
-
exclusive: false,
-
autoDelete: false,
-
arguments: null);
-
-
//創建事件驅動的消費者類型,盡量不要使用while(ture)循環來獲取消息
-
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
-
consumer.Received += (model, ea) =>
-
{
-
var body = ea.Body;
-
var message = Encoding.UTF8.GetString(body);
-
Console.WriteLine(" 我是消費者我接收到消息: {0}", message);
-
};
-
-
//指定消費隊列
-
channel.BasicConsume(queue: "hello",
-
noAck: true,
-
consumer: consumer);
-
-
Console.WriteLine(" Press [enter] to exit.");
-
Console.ReadLine();
-
}
-
}
-
}
-
}
-
}
消息隊列的使用過程大致如下:
-
CreateConnection 創建一個連接連接到broker
-
CreateModel 創建一個channel 使用它來發送AMQP指令
-
ExchangeDeclare 創建一個exchange 對消息進行路由
-
QueueDeclare 創建一個queue 消息隊列 這是一個裝載消息的容器
-
QueueBind 把exchange和queue按照路由規則綁定起來
-
BasicPublish 往隊列中發送一條消息
-
BasicConsume 從隊列中獲取一條消息
exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
本文中由於使用了默認交換機所以並沒有用到 ExchangeDeclare和 QueueBind兩個方法
默認交換機實際上是一個由消息代理預先聲明好的沒有名字(名字為空字符串)的直連交換機。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同
