首先創建一個 數據鏈接工作單元
發送端和接收端都可以使用
MessageMQUserName MessageMQPassword MessageMQHostName MessageMQQueueName 對應自己的rabbit服務器的username passward 端口 以及隊列名稱
public class RabbitMQClientUnit : RabbitMQClientIUnit { private readonly IConfiguration _configuration; public RabbitMQClientUnit(IConfiguration configuration) { _configuration = configuration; ConnectionFactory factory = new ConnectionFactory {
UserName = _configuration.GetConnectionString("MessageMQUserName"), Password = _configuration.GetConnectionString("MessageMQPassword"), HostName = _configuration.GetConnectionString("MessageMQHostName"), }; Connection = factory.CreateConnection(); QueueName = _configuration.GetConnectionString("MessageMQQueueName"); Channel = Connection.CreateModel(); } public IConnection Connection { get; } public IModel Channel { get; } public string QueueName { get; } }
在MessageRepository類中使用以下方法
private readonly RabbitMQClientIUnit _rabbitMQClientIUnit; private readonly IConfiguration _configuration; public MessageRepository(RabbitMQClientIUnit rabbitMQClientIUnit, IConfiguration configuration) { _rabbitMQClientIUnit = rabbitMQClientIUnit; _configuration = configuration; } /// <summary> /// MQ下發消息 /// </summary> /// <param name="encryption"></param> public void RabbitMQPush(string encryption) { try { _rabbitMQClientIUnit.Channel.QueueDeclare(_rabbitMQClientIUnit.QueueName, false, false, false, null); var sendBytes = Encoding.UTF8.GetBytes(encryption); //發布消息 _rabbitMQClientIUnit.Channel.BasicPublish("", _configuration.GetConnectionString("MessageExchange"), null, sendBytes); _rabbitMQClientIUnit.Channel.Close(); } catch { throw new ArgumentException("出現異常MQ推送失敗"); } }
這樣就完成發送端RabbitMQ的編寫
接收端稍微有些麻煩 在Core3.1中我也走了一些彎路 一開始想用控制台程序做接收端 但是在linux下面 無法使用console.key 使用console.key會導致啟動服務出錯
所以只能回歸到Core webapi or mvc上面了
編寫RabbitListener類代碼如下
public class RabbitListener : IHostedService { //private readonly IConnection connection; //private readonly IModel channel; private readonly RabbitMQClientIUnit _rabbitMQClientIUnit; private readonly MessageIService _messageIService; public RabbitListener(RabbitMQClientIUnit rabbitMQClientIUnit, MessageIService messageIService) { _rabbitMQClientIUnit = rabbitMQClientIUnit; _messageIService = messageIService; } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } // 處理消息的方法 public virtual bool Process(string message) { throw new NotImplementedException(); } // 注冊消費者監聽在這里 public void Register() { EventingBasicConsumer consumer = new EventingBasicConsumer(_rabbitMQClientIUnit.Channel); //接收到消息事件 consumer.Received += (ch, ea) => {
//切記在.net core 3.1中無法直接使用ea.Body var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"收到消息: {message}"); }; _rabbitMQClientIUnit.Channel.BasicConsume("Message", false, consumer); } public void DeRegister() { _rabbitMQClientIUnit.Connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { _rabbitMQClientIUnit.Connection.Close(); return Task.CompletedTask; } }
最后一步也是整個接收端的核心注入
在Startup中要使用AddHostedService方法注入
services.AddHostedService<RabbitListener>();