消息隊列的地位越來越重要,幾乎是面試的必問問題了,不會使用幾種消息隊列都顯得尷尬,正好本文使用C#來帶你認識rabbitmq消息隊列
首先,我們要安裝rabbitmq,當然,如果有現成的,也可以使用,不知道曾幾何時,我喜歡將數據庫等等軟件安裝在linux虛擬機,如果沒現成的rabbitmq,按照下面的來吧,嘿嘿
rabbitmq安裝:https://www.cnblogs.com/shanfeng1000/p/11951703.html
如果要實現rabbitmq集群,參考:https://www.cnblogs.com/shanfeng1000/p/12097054.html
我這里使用的是rabbitmq集群,但是沒有比較,只是已經安裝好了,就直接使用算了
虛擬機集群地址:192.168.209.133,192.168.209.134,192.168.209.135
端口使用的默認端口,都是5672,也就是AMQP協議端口
Rabbitmq的工作模式
先說說幾個概念
生產者(producer):負責生產消息,可以有多個生產者,可以理解為生成消息的那部分邏輯
消費者(consumer):從隊列中獲取消息,對消息處理的那部分邏輯
隊列(queue):用於存放消息,可以理解為先進先出的一個對象
交換機(exchange):顧名思義,就是個中介的角色,將接收到的消息按不同的規則轉發到其他交換機或者隊列中
路由(route):就是交換機分發消息的規則,交換機可以指定路由規則,生產者在發布消息時也可以指定消息路由,比如交換機中設置A路由表示將消息轉發到隊列1,B路由表示將消息轉發到隊列2,那么當交換機接收到消息時,如果消息的路由滿足A路由,則將消息轉發到隊列1,如果滿足B路由則將消息轉發到隊列2
虛擬主機(virtual host):虛擬地址,用於進行邏輯隔離,一個虛擬主機里面可以有若干個 exchange 和 queue,但是里面不能有相同名稱的 exchange 或 queue
再看看rabbitmq的幾種工作模式,具體可參考rabbitmq官網給出的Demo:https://www.rabbitmq.com/getstarted.html

其中,第6中類似我們常用的請求-響應模式,但是使用的RPC請求響應,用的比較少,這里就不過多解釋,感興趣的可以參考官網文檔:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html。
總的來說,就是生產者將消息發布到rabbitmq上,然后消費者連接rabbitmq,獲取到消息就消費,但是有幾點說明一下
1、rabbitmq中的消息是可被多次消費的,因為rabbitmq提供了ack機制,當消費者在消費消息時,如果將自動ack設置成false,那么需要手動提交ack才能告訴rabbitmq消息已被使用,否則當通道關閉時,消息會繼續呆在隊列中等待消費
2、當存在多個消費者時,默認情況下,一個消費者獲取一個消息,處理完成后再獲取下一個,但是rabbitmq消費一次性獲取多個,當然后當這些消息消費完成后,再獲取下一批,這也就是rabbitmq的Qos機制
C#使用rabbitmq
如果感興趣的人多,到時候再單獨開一篇博文,現在就介紹其中的1-5種,也可以分類成兩種:不使用交換機和使用交換機,所以下面就分這兩種來說明
首先,我們創建了兩個Demo項目:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分別使用使用nuget安裝RabbitMQ.Client:

其中RabbitMQ.PublishConsole是用來生產消息,RabbitMQ.ConsumeConsole用來消費消息
這里我們安裝的是最新版本,舊版本和新版本在使用上可能會有一些區別
不使用交換機情形
不使用交換機有兩種模式:簡單模式和工作模式
這里先貼上生產者生成消息的代碼,簡單模式和工作模式這部分測試代碼是一樣的:
RabbitMQ.PublishConsole
上述代碼執行完成后,隊列queue1中就有了10條消息,可以在rabbitmq的后台管理中看到:

代碼中提到,通道在申明隊列時,如果隊列已經存在,則申明的參數一定要對上,否則會拋出異常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'queue1' in vhost '/': received none but current is the value 'classic' of type 'longstr'', classId=50, methodId=10
比如這里,我實現在rabbitmq后台創建了隊列,那么他們的對應關系如下圖:


簡單模式
這個模式很簡單,其實就是只有一個消費者,簡單的保證操作的順序性

接着貼上消費者代碼:
RabbitMQ.ConsumeConsole
上述代碼執行完成后,在后台管理中可以看到消息被消費掉了

工作模式
工作模式是簡單模式的拓展,如果業務簡單,對消息的消費是一個耗時的過程,這個模式是一個好的選擇。
接着調用生產者代碼生產10條消息,下面是消費者的測試代碼
RabbitMQ.ConsumeConsole
另外說明一下,代碼中提到rabbitmq的QOS機制,這里簡單解釋一下,當生產者將消息發布到rabbitmq之后,如果在未配置QOS的情況下,rabbitmq盡可能快速地發送隊列中的所有消息到消費者端,如果消息比較多,消費者來不及處理,就會緩存這些消息,當消息堆積過多,可能導致服務器內存不足而影響其他進程,rabbitmq的QOS可以很好的解決這類問題,QOS就是限制消費者一次性從rabbitmq中獲取消息的個數,而不是獲取所有消息。比如設置rabbitmq的QOS為10,也就是prefetch=10,就是說,哪怕rabbitmq中有100條消息,消費者也只是一次性獲取10條,然后消費者消費這10條消息,剩下的交給其他消費者,當10條消息中的unacked個數少於prefetch * 消費者數目時,會繼續從rabbitmq獲取消息,如果在工作模式中,不使用QOS,你會發現,所有的消息都被一個消費者消費了
使用交換機情形
使用交換機的情形有3種:發布訂閱模式,路由模式,主題模式
上面說了,交換機是一個中介的角色,當一個交換機創建后,可以將其他隊列或者交換機與當前交換機綁定,綁定時需要指定綁定路由規則,這個和交換機類型有關。
當我們不使用交換機時,那么生產者是直接將消息發布到隊列中去的,生產者只需要指定消息接收的隊列即可,而使用交換機做中轉時,生產者只需要將消息發布到交換機,然后交換機根據接收到的消息,按與交換機綁定的路由規則,將消息轉發到其他交換機或者隊列中,這個處理過程和交換機的類型有關,交換機一般分為4類:
direct:直連類型,就是將消息的路由和交換機的綁定路由作比較,當兩者一致時,則匹配成功,然后消息就會被轉發到這個綁定路由后的隊列或者交換機
fanout:這種類型的交換機是不需要指定路由的,當交換機接收到消息時,會將消息廣播到所有綁定到它的所有隊列或交換機中
topic:主題類型,類似direct類型,只不過在將消息的路由和綁定路由做比較時,是通過特定表達式去比較的,其中# 匹配一個或多個,* 匹配一個
headers:頭部交換機,允許使用消息頭中的信息來做匹配規則,這個用的少,基本上不用,這里也就不過多介紹了
到這里,你應該發覺,使用交換機的三種情形,無非就是使用交換機的類型不一樣,發布訂閱模式--fanout,路由模式--direct,主題模式--topic
現在我們先去rabbitmq的后台中,創建這幾種交換機:
交換機的創建及綁定都可以在代碼中實現,如IModel類的QueueBind,ExchangeBind等方法,用多了就自然熟了,這里為了方便截圖,就到后台去創建了

然后我們創建兩個隊列,並按指定類型分別綁定到這3個交換機中:
隊列:

demo.direct綁定隊列規則:

demo.fanout綁定隊列規則:

demo.topic綁定隊列規則:

上面所描述的,無非就是三種模式中發布消息方式的不一樣,消費者當然還是從隊列獲取消息消費的,這里我們就先貼出消費者的代碼:
RabbitMQ.ConsumeConsole
這里我們使用了兩個隊列,每個隊列我們這里只用了一個消費者,對於下面幾種模式,這個消費者代碼都能消費到
發布訂閱模式
發布訂閱模式使用的是fanout類型的交換機,這個類型無需指定路由,交換機會將消息廣播到每個綁定到交換機的隊列或者交換機

RabbitMQ.PublishConsole
代碼中,我們往交換機發布了10條消息,交換機接收到消息后,會將消息轉發到queue1和queue2,因此,queue1和queue2都會收到10條消息:

路由模式
路由模式使用的是direct類型的交換機,也即在進行路由匹配時,需要匹配的路由一直才算匹配成功,我們把發布訂閱模式的代碼稍作修改即可,貼出生產者部分代碼:
RabbitMQ.PublishConsole
代碼中,我們往demo.direct交換機發布了10條消息,其中5條消息的路由是apple,另外5條消息的路由是banana,demo.direct交換機綁定的兩個隊列中,queue1的綁定路由是apple,queue2的綁定路由是banana,那么demo.direct交換機會將路由是apple的消息轉發到queue1,將路由是banana的消息轉發到queue2,從后台可以看每個隊列中已經有5個消息准備好了:

接下來可以使用消費者將它們消費掉
主題模式
主題模式使用的topic類型的交換機,在進行匹配時,是根據表達式去匹配,# 匹配一個或多個,* 匹配一個,我們將路由模式的代碼稍作修改:

RabbitMQ.PublishConsole
代碼中,我們往demo.topic交換機中發布了10條消息,其中5條消息的路由是以apple開頭的,另外5條消息的路由是以banana開頭的,demo.direct交換機綁定的兩個隊列中,queue1的綁定路由是apple.#,就是匹配以apple開頭的路由,queue2的綁定路由是banana.#,就是匹配以banana開頭的路由,那么demo.direct交換機會將路由是以apple開頭的的消息轉發到queue1,將路由是以banana開頭的的消息轉發到queue2,從后台可以看每個隊列中已經有5個消息准備好了:

封裝
其實rabbitmq的使用還是比較簡單的,只需要多謝謝代碼嘗試一下就能熟悉
一般的,像這種第三方插件的調用,我建議自己要做一層封裝,最好是根據自己的需求去封裝,然后項目中只需要調用自己封裝的類就行了,下面貼出我自己封裝的類:
QueueOptions
RabbitMQExchangeType
RabbitBase
RabbitMQProducer
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
public class RabbitMQConsumer : RabbitBase
{
public RabbitMQConsumer(params string[] hosts) : base(hosts)
{
}
public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
{
}
public event Action<RecieveResult> Received;
/// <summary>
/// 構造消費者
/// </summary>
/// <param name="channel"></param>
/// <param name="options"></param>
/// <returns></returns>
private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
{
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
if (!options.AutoAck)
{
cancellationTokenSource.Token.Register(() =>
{
channel.BasicAck(e.DeliveryTag, false);
});
}
Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
}
catch { }
};
if (options.FetchCount != null)
{
channel.BasicQos(0, options.FetchCount.Value, false);
}
return consumer;
}
#region 普通模式、Work模式
/// <summary>
/// 消費消息
/// </summary>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
{
options = options ?? new ConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消費消息
/// </summary>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
{
ConsumeQueueOptions options = new ConsumeQueueOptions();
configure?.Invoke(options);
return Listen(queue, options);
}
#endregion
#region 訂閱模式、路由模式、Topic模式
/// <summary>
/// 消費消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
{
options = options ?? new ExchangeConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
{
foreach (var key in options.RoutingKeys)
{
channel.QueueBind(queue, exchange, key, options.BindArguments);
}
}
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消費消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
{
ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
configure?.Invoke(options);
return Listen(exchange, queue, options);
}
#endregion
}
public class RecieveResult
{
CancellationTokenSource cancellationTokenSource;
public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
{
this.Body = Encoding.UTF8.GetString(arg.Body);
this.ConsumerTag = arg.ConsumerTag;
this.DeliveryTag = arg.DeliveryTag;
this.Exchange = arg.Exchange;
this.Redelivered = arg.Redelivered;
this.RoutingKey = arg.RoutingKey;
this.cancellationTokenSource = cancellationTokenSource;
}
/// <summary>
/// 消息體
/// </summary>
public string Body { get; private set; }
/// <summary>
/// 消費者標簽
/// </summary>
public string ConsumerTag { get; private set; }
/// <summary>
/// Ack標簽
/// </summary>
public ulong DeliveryTag { get; private set; }
/// <summary>
/// 交換機
/// </summary>
public string Exchange { get; private set; }
/// <summary>
/// 是否Ack
/// </summary>
public bool Redelivered { get; private set; }
/// <summary>
/// 路由
/// </summary>
public string RoutingKey { get; private set; }
public void Commit()
{
if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
cancellationTokenSource = null;
}
}
public class ListenResult
{
CancellationTokenSource cancellationTokenSource;
/// <summary>
/// CancellationToken
/// </summary>
public CancellationToken Token { get { return cancellationTokenSource.Token; } }
/// <summary>
/// 是否已停止
/// </summary>
public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }
public ListenResult()
{
cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>
/// 停止監聽
/// </summary>
public void Stop()
{
cancellationTokenSource.Cancel();
}
}
}
測試Demo
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
string queue = "queue1";
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
//消費者
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"接收到數據:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
});
}
}).Start();
//消息生產
using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
{
producer.UserName = userName;
producer.Password = password;
producer.Port = port;
producer.VirtualHost = virtualHost;
string message = "";
do
{
message = Console.ReadLine();
if (string.IsNullOrEmpty(message))
{
break;
}
producer.Publish(queue, message, options => { options.Arguments = arguments; });
} while (true);
}
}
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQ.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
int port = 5672;
string userName = "admin";
string password = "123456";
string virtualHost = "/";
string queue = "queue1";
var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
//消費者1
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消費者1接收到數據:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
options.FetchCount = 1;
});
}
}).Start();
//消費者2
new Thread(() =>
{
using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
{
consumer.UserName = userName;
consumer.Password = password;
consumer.Port = port;
consumer.VirtualHost = virtualHost;
consumer.Received += result =>
{
Console.WriteLine($"消費者2接收到數據:{result.Body}");
result.Commit();//提交
};
consumer.Listen(queue, options =>
{
options.AutoAck = false;
options.Arguments = arguments;
options.FetchCount = 2;
});
}
}).Start();
//消息生產
using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
{
producer.UserName = userName;
producer.Password = password;
producer.Port = port;
producer.VirtualHost = virtualHost;
string message = "";
do
{
message = Console.ReadLine();
if (string.IsNullOrEmpty(message))
{
break;
}
producer.Publish(queue, message, options => { options.Arguments = arguments; });
} while (true);
}
}
}
}
發布訂閱模式
路由模式
主題模式
上面是我自己做的封裝,因為RabbitMQ.Client功能齊全,但是使用比較麻煩,需要編寫的代碼多一些,推薦一下第三方對rabbitmq的封裝插件:EasyNetQ,它是建立在RabbitMQ.Client上的,多數時候可以直接通過EasyNetQ就可以完成消息發布與消費,感興趣的可以了解一下
