.Net Core 3.1 WebAPI OR Mvc 中使用 RabbitMQ


 

首先創建一個 數據鏈接工作單元 

發送端和接收端都可以使用 

MessageMQUserName MessageMQPassword MessageMQHostName MessageMQQueueName 對應自己的rabbit服務器的username passward 端口 以及隊列名稱
  public class RabbitMQClientUnit : RabbitMQClientIUnit
    {
        private readonly IConfiguration _configuration;
        public RabbitMQClientUnit(IConfiguration configuration)
        {
            _configuration = configuration;

            ConnectionFactory factory = new ConnectionFactory
            {
UserName
= _configuration.GetConnectionString("MessageMQUserName"), Password = _configuration.GetConnectionString("MessageMQPassword"), HostName = _configuration.GetConnectionString("MessageMQHostName"), }; Connection = factory.CreateConnection(); QueueName = _configuration.GetConnectionString("MessageMQQueueName"); Channel = Connection.CreateModel(); } public IConnection Connection { get; } public IModel Channel { get; } public string QueueName { get; } }

在MessageRepository類中使用以下方法

  private readonly RabbitMQClientIUnit _rabbitMQClientIUnit;
        private readonly IConfiguration _configuration;

        public MessageRepository(RabbitMQClientIUnit rabbitMQClientIUnit, IConfiguration configuration)
        {
            _rabbitMQClientIUnit = rabbitMQClientIUnit;
            _configuration = configuration;
      
        }
        /// <summary>
        /// MQ下發消息
        /// </summary>
        /// <param name="encryption"></param>
        public void RabbitMQPush(string encryption)
        {
            try
            {
                _rabbitMQClientIUnit.Channel.QueueDeclare(_rabbitMQClientIUnit.QueueName, false, false, false, null);
                var sendBytes = Encoding.UTF8.GetBytes(encryption);
                //發布消息
                _rabbitMQClientIUnit.Channel.BasicPublish("", _configuration.GetConnectionString("MessageExchange"), null, sendBytes);
                _rabbitMQClientIUnit.Channel.Close();
            }
            catch
            {
                throw new ArgumentException("出現異常MQ推送失敗");
            }
        }

這樣就完成發送端RabbitMQ的編寫

 

接收端稍微有些麻煩  在Core3.1中我也走了一些彎路 一開始想用控制台程序做接收端 但是在linux下面 無法使用console.key 使用console.key會導致啟動服務出錯

所以只能回歸到Core webapi or mvc上面了

編寫RabbitListener類代碼如下

  public class RabbitListener : IHostedService
    {
        //private readonly IConnection connection;
        //private readonly IModel channel;

        private readonly RabbitMQClientIUnit  _rabbitMQClientIUnit;
        private readonly MessageIService _messageIService;
        public RabbitListener(RabbitMQClientIUnit rabbitMQClientIUnit, MessageIService messageIService)
        {
            _rabbitMQClientIUnit = rabbitMQClientIUnit;
            _messageIService = messageIService;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            Register();
            return Task.CompletedTask;
        }


        // 處理消息的方法
        public virtual bool Process(string message)
        {
            throw new NotImplementedException();
        }

        // 注冊消費者監聽在這里
        public void Register()
        {

            EventingBasicConsumer consumer = new EventingBasicConsumer(_rabbitMQClientIUnit.Channel);
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
          //切記在.net core 3.1中無法直接使用ea.Body
var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"收到消息: {message}"); }; _rabbitMQClientIUnit.Channel.BasicConsume("Message", false, consumer); } public void DeRegister() { _rabbitMQClientIUnit.Connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { _rabbitMQClientIUnit.Connection.Close(); return Task.CompletedTask; } }

最后一步也是整個接收端的核心注入

在Startup中要使用AddHostedService方法注入

 

  services.AddHostedService<RabbitListener>();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM