一、RabbitMQ是什么?
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
二、消息隊列的特性
解耦:消息的生產者與消費者均基於AMQP協議(相同的接口與規范)進行發送與接收消息,互相不存依賴;
冗余:消息只有處理了才會被刪除,除非明確允許多個消費者可以收到同一消息的多個副本,否則每個消息只會被單個消費者接收並處理;
擴展性:可增加或減少多個消息的生產者與消費者,兩者的改動均不會影響到雙方;
靈活性 & 峰值處理能力:因為有良好的擴展性,所以可視服務器的處理情況【可稱為:消費者】(比如:高並發負載過大)動態的增減服務器,以提提高處理能力(可稱為:負載均衡);
可恢復性:消息的生產者與消費者不論哪一方出現問題,均不會影響消息的正常發出與接收(當然單一的生產者與消費者除外,如果是這樣也就沒有必要使用分布式消息隊列);
送達保證:只有消息被確認成功處理后才會被刪除,否則會重新分發給其它的消費者進行處理,直到確認處理成功為止;
排序保證:先進先出是隊列的基本特性;
緩沖:同一時間有多個消息進入消息隊列,但是同一時間可以指定一個多個消息被消息者接收並處理,其余的消息處理等待狀態,這樣可以降低服務器的壓力,起到緩沖的作用;
理解數據流:傳遞的消息內容以字節數組為主,但可以將對象序列化后成字節數組,然后在消費者接收到消息后,可反序列化成對象並進行相關的處理,應用場景:CQRS;
異步通信:允許將一個或多個消息放入消息隊列,但並不立即處理它,而是在恰當的時候再去由一個或多個消費者分別接收並處理它們;
以上是我的個人理解,也可參看《使用消息隊列的 10 個理由》
應用場景:針對高並發且無需立即返回處理結果的時候,可以考慮使用消息隊列,如果處理需要立即返回結果則不適合;
三、RabbitMQ環境的安裝
1.服務器端:
A.需要先安裝Erlang環境,下載地址:http://www.erlang.org/download.html,可能有時無法正常訪問,可以通過VPN代理來訪問該網站或在其它網站上下載(比如:CSDN)
B.安裝RabbitMQ Server(有針對多個操作系統的下載,我這邊以WINDOWS平台為主),下載地址:http://www.rabbitmq.com/download.html,
說明:最新版的Erlang及abbitMQ Server安裝后,一般WINDOWS環境變量及服務均都已正常安裝與並正常啟動,可不是最新版或沒有安裝好,則可執行以下命令:
Setx ERLANG_HOME “C:\Program Files\erl7.1″ -Erlang的-安裝目錄,也可通過系統屬性-->高級-->環境變量來手動設置;
cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.5.6\sbin --切換到RabbitMQ Server的sbin目錄下,然后執行如下命令:
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
安裝並設置OK后,可以通過:rabbitmqctl status查看運行情況、rabbitmqctl list_users查看當前用戶、以下命令增加一個新用戶:
rabbitmqctl add_user username password
rabbitmqctl set_permissions username ".*" ".*" ".*"
rabbitmqctl set_user_tags username administrator
修改密碼:rabbitmqctl change_password username newpassowrd
刪除指定的用戶:rabbitmqctl delete_user username
列出所有queue:rabbitmqctl list_queues
列出指定queue的信息:rabbitmqctl list_queues [the queue name] messages_ready messages_unacknowledged
列出所有exchange:rabbitmqctl list_exchanges
列出所有binding:rabbitmqctl list_bindings
安裝基於web的管理插件:rabbitmq-plugins.bat enable rabbitmq_management
當然還有其它的命令,大家可以去查看官網及其它資料,但我認為知道以上的命令足夠用了
四、RabbitMQ的基本用法
使用RabbitMQ客戶端就必需在項目中引用其相關的組件,這里可以通過NuGet安裝或從官網下載再引用均可,方法很簡單,不再重述;
1.普通用法:采用默認的exchange(交換機,或稱路由器)+默認的exchange類型:direct+noAck(自動應答,接收就應答)
/// <summary>
/// 消息發送者,一般用在客戶端
/// </summary>
class
RabbitMQPublish
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.QueueDeclare(
"hello"
,
false
,
false
,
false
,
null
);
//創建一個隊列
string
message =
""
;
while
(message!=
"exit"
)
{
Console.Write(
"Please enter the message to be sent:"
);
message = Console.ReadLine();
var
body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
""
,
"hello"
,
null
, body);
//發送消息
Console.WriteLine(
"set message: {0}"
, message);
}
}
}
}
}
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class
RabbitMQConsume
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.QueueDeclare(
"hello"
,
false
,
false
,
false
,
null
);
//創建一個隊列
var
consumer =
new
QueueingBasicConsumer(channel);
//創建一個消費者
channel.BasicConsume(
"hello"
,
true
, consumer);
//開啟消息者與通道、隊列關聯
Console.WriteLine(
" waiting for message."
);
while
(
true
)
{
var
ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//接收消息並出列
var
body = ea.Body;
//消息主體
var
message = Encoding.UTF8.GetString(body);
Console.WriteLine(
"Received {0}"
, message);
if
(message ==
"exit"
)
{
Console.WriteLine(
"exit!"
);
break
;
}
}
}
}
}
}
2.負載均衡處理模式:采用默認的exchange(交換機)+智能分發+默認的exchange類型:direct+手動應答
消息生產者/發布者代碼與上面相同;
以下是消費者代碼:
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class
RabbitMQConsume
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.QueueDeclare(
"hello"
,
false
,
false
,
false
,
null
);
//創建一個隊列
channel.BasicQos(0, 1,
false
);
//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
var
consumer =
new
QueueingBasicConsumer(channel);
//創建一個消費者
channel.BasicConsume(
"hello"
,
false
, consumer);
//開啟消息者與通道、隊列關聯
Console.WriteLine(
" waiting for message."
);
while
(
true
)
{
var
ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//接收消息並出列
var
body = ea.Body;
//消息主體
var
message = Encoding.UTF8.GetString(body);
Console.WriteLine(
"Received {0}"
, message);
channel.BasicAck(ea.DeliveryTag,
false
);
if
(message ==
"exit"
)
{
Console.WriteLine(
"exit!"
);
break
;
}
Thread.Sleep(1000);
}
}
}
}
}
/// <summary>
/// 消息發送者,一般用在客戶端
/// </summary>
class
RabbitMQPublish
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.QueueDeclare(
"hello"
,
true
,
false
,
false
,
null
);
//創建一個隊列,第2個參數為true表示為持久隊列
var
properties = channel.CreateBasicProperties();
//properties.SetPersistent(true);這個方法提示過時,不建議使用
properties.DeliveryMode = 2;
//1表示不持久,2.表示持久化
string
message =
""
;
while
(message!=
"exit"
)
{
Console.Write(
"Please enter the message to be sent:"
);
message = Console.ReadLine();
var
body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
""
,
"hello"
, properties, body);
//發送消息
Console.WriteLine(
"set message: {0}"
, message);
}
}
}
}
}
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class
RabbitMQConsume
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.QueueDeclare(
"hello"
,
true
,
false
,
false
,
null
);
//創建一個隊列,第2個參數為true表示為持久隊列
channel.BasicQos(0, 1,
false
);
//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
var
consumer =
new
QueueingBasicConsumer(channel);
//創建一個消費者
channel.BasicConsume(
"hello"
,
false
, consumer);
//開啟消息者與通道、隊列關聯
Console.WriteLine(
" waiting for message."
);
while
(
true
)
{
var
ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//接收消息並出列
var
body = ea.Body;
//消息主體
var
message = Encoding.UTF8.GetString(body);
Console.WriteLine(
"Received {0}"
, message);
channel.BasicAck(ea.DeliveryTag,
false
);
if
(message ==
"exit"
)
{
Console.WriteLine(
"exit!"
);
break
;
}
Thread.Sleep(1000);
}
}
}
}
}
/// <summary>
/// 消息發送者/生產者,一般用在客戶端
/// </summary>
class
RabbitMQPublish
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.ExchangeDeclare(
"publish"
,
"fanout"
,
true
);
//定義一個交換機,且采用廣播類型,並設為持久化
string
queueName = channel.QueueDeclare(
"hello"
,
true
,
false
,
false
,
null
);
//創建一個隊列,第2個參數為true表示為持久隊列,這里將結果隱式轉換成string
var
properties = channel.CreateBasicProperties();
//properties.SetPersistent(true);這個方法提示過時,不建議使用
properties.DeliveryMode = 2;
//1表示不持久,2.表示持久化
string
message =
""
;
while
(message!=
"exit"
)
{
Console.Write(
"Please enter the message to be sent:"
);
message = Console.ReadLine();
var
body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
"publish"
,
"hello"
, properties, body);
//發送消息,這里指定了交換機名稱,且routeKey會被忽略
Console.WriteLine(
"set message: {0}"
, message);
}
}
}
}
}
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class
RabbitMQConsume
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.ExchangeDeclare(
"publish"
,
"fanout"
,
true
);
//定義一個交換機,且采用廣播類型,並持久化該交換機,並設為持久化
string
queueName = channel.QueueDeclare(
"hello"
,
true
,
false
,
false
,
null
);
//創建一個隊列,第2個參數為true表示為持久隊列
channel.QueueBind(queueName,
"publish"
,
""
);
//將隊列綁定到名publish的交換機上,實現消息訂閱
channel.BasicQos(0, 1,
false
);
//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
var
consumer =
new
QueueingBasicConsumer(channel);
//創建一個消費者
channel.BasicConsume(queueName,
false
, consumer);
//開啟消息者與通道、隊列關聯
Console.WriteLine(
" waiting for message."
);
while
(
true
)
{
var
ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//接收消息並出列
var
body = ea.Body;
//消息主體
var
message = Encoding.UTF8.GetString(body);
Console.WriteLine(
"Received {0}"
, message);
channel.BasicAck(ea.DeliveryTag,
false
);
//應答
if
(message ==
"exit"
)
{
Console.WriteLine(
"exit!"
);
break
;
}
Thread.Sleep(1000);
}
}
}
}
}
/// <summary>
/// 消息發送者/生產者,一般用在客戶端
/// </summary>
class
RabbitMQPublish
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.ExchangeDeclare(
"publish-topic"
,
"topic"
,
true
);
//定義一個交換機,且采用廣播類型,並持久化該交換機
channel.QueueDeclare(
"hello-mq"
,
true
,
false
,
false
,
null
);
//創建一個隊列,第2個參數為true表示為持久隊列
var
properties = channel.CreateBasicProperties();
//properties.SetPersistent(true);這個方法提示過時,不建議使用
properties.DeliveryMode = 2;
//1表示不持久,2.表示持久化
string
message =
""
;
while
(message!=
"exit"
)
{
Console.Write(
"Please enter the message to be sent:"
);
message = Console.ReadLine();
var
body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
"publish-topic"
,
"hello.test"
, properties, body);
//發送消息,這里指定了交換機名稱,且routeKey會被忽略
Console.WriteLine(
"set message: {0}"
, message);
}
}
}
}
}
/// <summary>
/// 消費者,一般用在服務端
/// </summary>
class
RabbitMQConsume
{
static
void
Main(
string
[] args)
{
var
factory =
new
ConnectionFactory();
//創建連接工廠並初始連接
factory.HostName =
"localhost"
;
factory.UserName =
"zwj"
;
factory.Password =
"www.zuowenjun.cn"
;
using
(
var
connection = factory.CreateConnection())
//創建一個連接
{
using
(
var
channel = connection.CreateModel())
//創建一個通道
{
channel.ExchangeDeclare(
"publish-topic"
,
"topic"
,
true
);
//定義一個交換機,且采用廣播類型,並持久化該交換機
string
queueName = channel.QueueDeclare(
"hello-mq"
,
true
,
false
,
false
,
null
);
//創建一個隊列,第2個參數為true表示為持久隊列
channel.QueueBind(queueName,
"publish-topic"
,
"*.test"
);
//將隊列綁定到路由上,實現消息訂閱
channel.BasicQos(0, 1,
false
);
//在一個工作者還在處理消息,並且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。
var
consumer =
new
QueueingBasicConsumer(channel);
//創建一個消費者
channel.BasicConsume(queueName,
false
, consumer);
//開啟消息者與通道、隊列關聯
Console.WriteLine(
" waiting for message."
);
while
(
true
)
{
var
ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//接收消息並出列
var
body = ea.Body;
//消息主體
var
message = Encoding.UTF8.GetString(body);
Console.WriteLine(
"Received {0}"
, message);
channel.BasicAck(ea.DeliveryTag,
false
);
//應答
if
(message ==
"exit"
)
{
Console.WriteLine(
"exit!"
);
break
;
}
Thread.Sleep(1000);
}
}
}
}
}
交換機路由類型如下:
Direct Exchange:直接匹配,通過Exchange名稱+RoutingKey來發送與接收消息;
Fanout Exchange:廣播訂閱,向所有消費者發布消息,但只有消費者將隊列綁定到該路由才能收到消息,忽略RoutingKey;
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個詞,只有消費者將隊列綁定到該路由且指定的RoutingKey符合匹配規則時才能收到消息;
Headers Exchange:消息頭訂閱,消息發布前,為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息時同樣需要定義類似的鍵值對請求頭,里面需要多包含一個匹配模式(有:x-mactch=all,或者x-mactch=any),只有請求頭與消息頭相匹配,才能接收到消息,忽略RoutingKey;