RabbitMQ


網上很多人已經總結的很好了,比如今天看到的這個。https://www.cnblogs.com/LipeiNet/p/9877189.html

我就不總結了,貼點代碼。

RabbitMQConnect.cs

using System;
using System.IO;
using System.Net.Sockets;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Common.Tool.RabbitMQ
{
	public class RabbitMQConnect
	{
		
		static string host = "127.0.0.1"; 
		static string UserName = "H"; 
		static string password = "H";

		public readonly static IConnectionFactory _connectionFactory;
		IConnection _connection;
		object sync_root = new object();
		bool _disposed;
		static RabbitMQConnect()
		{
			//if (host == "localhost")
			//{
			//    _connectionFactory = new ConnectionFactory() { HostName = host };
			//}
			//else
			{
				_connectionFactory = new ConnectionFactory() { HostName = host, UserName = UserName, Password = password };
			}
		}
		public bool IsConnected => this._connection != null && this._connection.IsOpen && this._disposed;
		public IModel CreateModel()
		{
			if (!this.IsConnected)
			{
				this.TryConnect();
			}
			return this._connection.CreateModel();
		}
		public bool TryConnect()
		{
			lock (this.sync_root)
			{
				RetryPolicy policy = RetryPolicy.Handle<SocketException>()//如果我們想指定處理多個異常類型通過OR即可
					.Or<BrokerUnreachableException>()//ConnectionFactory.CreateConnection期間無法打開連接時拋出異常
					.WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
						{

						});// 重試次數,提供等待特定重試嘗試的持續時間的函數,每次重試時調用的操作。
				policy.Execute(() =>
				{
					this._connection = _connectionFactory.CreateConnection();

				});

				if (this.IsConnected)
				{
					//當連接被破壞時引發。如果在添加事件處理程序時連接已經被銷毀對於此事件,事件處理程序將立即被觸發。
					this._connection.ConnectionShutdown += this.OnConnectionShutdown;
					//在連接調用的回調中發生異常時發出信號。當ConnectionShutdown處理程序拋出異常時,此事件將發出信號。如果將來有更多的事件出現在RabbitMQ.Client.IConnection上,那么這個事件當這些事件處理程序中的一個拋出異常時,它們將被標記。
					this._connection.CallbackException += this.OnCallbackException;
					this._connection.ConnectionBlocked += this.OnConnectionBlocked;

					//LogHelperNLog.Info($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");

					return true;
				}
				else
				{
					// LogHelperNLog.Info("FATAL ERROR: RabbitMQ connections could not be created and opened");

					return false;
				}
			}
		}

		void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
		{
			if (this._disposed) return;
			//RabbitMQ連接正在關閉。 嘗試重新連接...
			//LogHelperNLog.Info("A RabbitMQ connection is on shutdown. Trying to re-connect...");

			this.TryConnect();
		}
		/// <summary>
		///    
		/// </summary>
		/// <param name="sender"></param>
		/// <param name="e"></param>
		void OnCallbackException(object sender, CallbackExceptionEventArgs e)
		{
			if (this._disposed) return;

			// LogHelperNLog.Info("A RabbitMQ connection throw exception. Trying to re-connect...");

			this.TryConnect();
		}
		private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
		{
			if (this._disposed) return;

			//  LogHelperNLog.Info("A RabbitMQ connection is shutdown. Trying to re-connect...");

			this.TryConnect();
		}

		public void Dispose()
		{
			if (this._disposed) return;

			this._disposed = true;

			try
			{
				this._connection.Dispose();
			}
			catch (IOException ex)
			{
				//_logger.LogCritical(ex.ToString());
				//  LogHelperNLog.Error(ex);
			}
		}
	}
}

  

RabbitMQSend.cs

using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System.Text;

namespace Common.Tool.RabbitMQ
{
    public  class RabbitMQSend
    {
        /// <summary>
        /// Newtonsoft.Json利用IsoDateTimeConverter處理日期類型
        /// </summary>
        static IsoDateTimeConverter dtConverter = new IsoDateTimeConverter { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" };
        static RabbitMQConnect connection=null;

        static RabbitMQSend()
        {
            connection = new RabbitMQConnect();
        }

        /// <summary>
        /// 添加信息到隊列
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="item">信息</param>
        /// <param name="queueName">隊列名</param>
        public static void PushMsgToMq<T>(T item, string queueName)
        {
			string msg = JsonConvert.SerializeObject(item, dtConverter);
            using (global::RabbitMQ.Client.IModel channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
				//Construct a completely empty content header for use with the Basic content class.
				//構造一個完全空的內容標頭,以便與Basic內容類一起使用。
				global::RabbitMQ.Client.IBasicProperties properties = channel.CreateBasicProperties();
                properties.Persistent = true;
				byte[] body = Encoding.UTF8.GetBytes(msg); 
                channel.BasicPublish(exchange: "",
                    routingKey: queueName,
                    basicProperties: properties,
                    body: body);
            }
        }
    }
}

  

RabbitMQReceive.cs

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Common.Tool.RabbitMQ
{
	public class RabbitMQReceive : IDisposable
	{
		IConnection connection = null;
		IModel channel = null;

		public void BindReceiveMqMsg<T>(Func<T, bool> func, Action<string> log, string queueName)
		{
			this.connection = RabbitMQConnect._connectionFactory.CreateConnection();//創建與指定端點的連接。
			this.channel = this.connection.CreateModel(); //創建並返回新的頻道,會話和模型。
			this.channel.QueueDeclare(queue: queueName,//隊列名稱
							 durable: true,//是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
							 exclusive: false,//是否排外的,有兩個作用,一:當連接關閉時connection.close()該隊列是否會自動刪除;二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題,如果是排外的,會對當前隊列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等於true的話用於一個隊列只能有一個消費者來消費的場景
							 autoDelete: false,//是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時隊列就會自動刪除
							 arguments: null);//隊列中的消息什么時候會自動被刪除?

			this.channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//(Spec方法)配置Basic內容類的QoS參數。
																					//第一個參數是可接收消息的大小的  0不受限制
																					//第二個參數是處理消息最大的數量  1 那如果接收一個消息,但是沒有應答,則客戶端不會收到下一個消息,消息只會在隊列中阻塞
																					//第三個參數則設置了是不是針對整個Connection的,因為一個Connection可以有多個Channel,如果是false則說明只是針對於這個Channel的。
			EventingBasicConsumer consumer = new EventingBasicConsumer(this.channel);//構造函數,它將Model屬性設置為給定值。
			consumer.Received += (model, bdea) =>
			{
				byte[] body = bdea.Body;
				string message = Encoding.UTF8.GetString(body);
				log?.Invoke(message);

				T item = JsonConvert.DeserializeObject<T>(message);
				bool result = func(item);
				if (result)
				{
					//(Spec方法)確認一個或多個已傳送的消息。
					this.channel.BasicAck(deliveryTag: bdea.DeliveryTag, multiple: false);
				}
			};
			this.channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); //The consumer is started with noAck = false(i.e.BasicAck is required), an empty consumer tag (i.e. the server creates and returns a fresh consumer tag), noLocal=false and exclusive=false.
		}
		public void Dispose()
		{
			if (this.channel != null)
			{
				this.channel.Close();
			}

			if (this.connection != null)
			{
				this.connection.Close();
			}
		}
	}
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM