.NET Core 使用RabbitMQ


RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。

RabbitMQ安裝

RabbitMQ安裝,網上已經有許多教程了,這里簡單介紹一下在CentOS下安裝RabbitMQ。使用的版本為3.6.12最新版。

1.首先安裝erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安裝socat

yum install socat

3.最后安裝RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

RabbitMQ常用命令

啟用Web控制台

rabbitmq-plugins enable rabbitmq_management

開啟服務

systemctl start rabbitmq-server.service

停止服務

systemctl stop rabbitmq-server.service

查看服務狀態

systemctl status rabbitmq-server.service

查看RabbitMQ狀態

rabbitmqctl status

添加用戶賦予管理員權限

rabbitmqctl  add_user  username  password
rabbitmqctl  set_user_tags  username  administrator

查看用戶列表

rabbitmqctl list_users

刪除用戶

rabbitmqctl delete_user username

修改用戶密碼

rabbitmqctl oldPassword Username newPassword

訪問Web控制台

http://服務器ip:15672/ 注意配置防火牆,默認用戶名密碼都是guest,若新建用戶一定要記得配置權限。

.NET Core 使用RabbitMQ

通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/

定義生產者
//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
	UserName = "admin",//用戶名
	Password = "admin",//密碼
	HostName = "192.168.157.130"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();
//聲明一個隊列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ連接成功,請輸入消息,輸入exit退出!");

string input;
do
{
	input = Console.ReadLine();

	var sendBytes = Encoding.UTF8.GetBytes(input);
	//發布消息
	channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();
定義消費者
			//創建連接工廠
	        ConnectionFactory factory = new ConnectionFactory
	        {
		        UserName = "admin",//用戶名
		        Password = "admin",//密碼
		        HostName = "192.168.157.130"//rabbitmq ip
	        };

	        //創建連接
	        var connection = factory.CreateConnection();
	        //創建通道
	        var channel = connection.CreateModel();

			//事件基本消費者
			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

			//接收到消息事件
	        consumer.Received += (ch, ea) =>
	        {
		        var message = Encoding.UTF8.GetString(ea.Body);
		        Console.WriteLine($"收到消息: {message}");
				//確認該消息已被消費
		        channel.BasicAck(ea.DeliveryTag, false);
			};
			//啟動消費者 設置為手動應答消息
			channel.BasicConsume("hello", false, consumer);
	        Console.WriteLine("消費者已啟動");
	        Console.ReadKey();
	        channel.Dispose();
	        connection.Close();
運行

啟動了一個生產者,兩個消費者,可以看見兩個消費者都能收到消息,消息投遞到哪個消費者是由RabbitMQ決定的。

RabbitMQ消費失敗的處理

RabbitMQ采用消息應答機制,即消費者收到一個消息之后,需要發送一個應答,然后RabbitMQ才會將這個消息從隊列中刪除,如果消費者在消費過程中出現異常,斷開連接切沒有發送應答,那么RabbitMQ會將這個消息重新投遞。

修改一下消費者的代碼:

//接收到消息事件
consumer.Received += (ch, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body);

	Console.WriteLine($"收到消息: {message}");

	Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執");
	Thread.Sleep(10000);
	//確認該消息已被消費
	channel.BasicAck(ea.DeliveryTag, false);
	Console.WriteLine($"已發送回執[{ea.DeliveryTag}]");
};

演示:

從圖中可以看出,設置了消息應答延遲10s,如果在這10s中,該消費者斷開了連接,那么消息會被RabbitMQ重新投遞。

使用RabbitMQ的Exchange

前面我們可以看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個或多個Queue中(或者丟棄)

AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然后Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似於一個交換機,轉發各個消息分發到相應的隊列中。

RabbitMQ提供了四種Exchange模式:direct,fanout,topic,header 。但是 header模式在實際使用中較少,所以這里只介紹前三種模式。

Exchange不是消費者關心的,所以消費者的代碼完全不用變,用上面的消費者就行了。
由於避免文章過長,影響閱讀,所以只貼了部分代碼,但是demo里面是完整可運行的,詳細代碼請查看demo。

Direct Exchange

所有發送到Direct Exchange的消息被轉發到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();

//定義一個Direct類型交換機
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

//定義一個隊列
channel.QueueDeclare(queueName, false, false, false, null);

//將隊列綁定到交換機
channel.QueueBind(queueName, exchangeName, routeKey, null);

運行:

Fanout Exchange

所有發送到Fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的所有Queue上。

Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每台子網內的主機都獲得了一份復制的消息。

所以,Fanout Exchange 轉發消息是最快的。

為了演示效果,定義了兩個隊列,分別為hello1,hello2,每個隊列都擁有一個消費者。

static void Main(string[] args)
{
	string exchangeName = "TestFanoutChange";
	string queueName1 = "hello1";
	string queueName2 = "hello2";
	string routeKey = "";

	//創建連接工廠
	ConnectionFactory factory = new ConnectionFactory
	{
		UserName = "admin",//用戶名
		Password = "admin",//密碼
		HostName = "192.168.157.130"//rabbitmq ip
	};

	//創建連接
	var connection = factory.CreateConnection();
	//創建通道
	var channel = connection.CreateModel();

	//定義一個Direct類型交換機
	channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

	//定義隊列1
	channel.QueueDeclare(queueName1, false, false, false, null);
	//定義隊列2
	channel.QueueDeclare(queueName2, false, false, false, null);

	//將隊列綁定到交換機
	channel.QueueBind(queueName1, exchangeName, routeKey, null);
	channel.QueueBind(queueName2, exchangeName, routeKey, null);

	//生成兩個隊列的消費者
	ConsumerGenerator(queueName1);
	ConsumerGenerator(queueName2);


	Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

	string input;
	do
	{
		input = Console.ReadLine();

		var sendBytes = Encoding.UTF8.GetBytes(input);
		//發布消息
		channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

	} while (input.Trim().ToLower() != "exit");
	channel.Close();
	connection.Close();
}

/// <summary>
/// 根據隊列名稱生成消費者
/// </summary>
/// <param name="queueName"></param>
static void ConsumerGenerator(string queueName)
{
	//創建連接工廠
	ConnectionFactory factory = new ConnectionFactory
	{
		UserName = "admin",//用戶名
		Password = "admin",//密碼
		HostName = "192.168.157.130"//rabbitmq ip
	};

	//創建連接
	var connection = factory.CreateConnection();
	//創建通道
	var channel = connection.CreateModel();

	//事件基本消費者
	EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

	//接收到消息事件
	consumer.Received += (ch, ea) =>
	{
		var message = Encoding.UTF8.GetString(ea.Body);

		Console.WriteLine($"Queue:{queueName}收到消息: {message}");
		//確認該消息已被消費
		channel.BasicAck(ea.DeliveryTag, false);
	};
	//啟動消費者 設置為手動應答消息
	channel.BasicConsume(queueName, false, consumer);
	Console.WriteLine($"Queue:{queueName},消費者已啟動");
}

運行:

Topic Exchange

所有發送到Topic Exchange的消息被轉發到能和Topic匹配的Queue上,

Exchange 將路由進行模糊匹配。可以使用通配符進行模糊匹配,符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“XiaoChen.#”能夠匹配到“XiaoChen.pets.cat”,但是“XiaoChen.*” 只會匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常靈活。

string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";

//創建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
	UserName = "admin",//用戶名
	Password = "admin",//密碼
	HostName = "192.168.157.130"//rabbitmq ip
};

//創建連接
var connection = factory.CreateConnection();
//創建通道
var channel = connection.CreateModel();

//定義一個Direct類型交換機
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

//定義隊列1
channel.QueueDeclare(queueName, false, false, false, null);

//將隊列綁定到交換機
channel.QueueBind(queueName, exchangeName, routeKey, null);



Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入消息,輸入exit退出!");

string input;
do
{
	input = Console.ReadLine();

	var sendBytes = Encoding.UTF8.GetBytes(input);
	//發布消息
	channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

運行

Demo下載:DotNetCore.RabbitMQ

最后:歡迎加入 .net core 交流群一起學習,群號:4656606 加入QQ群


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM