接上一篇:ActiveMQ基礎教程(三):C#連接使用ActiveMQ消息隊列
這里繼續說下.net core集成使用ActiveMQ。因為代碼比較多,所以放到gitee上:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Activemq
感興趣的可以克隆下來再按照自己的需求修改,這里簡單介紹一下使用的Demo(Demo基於.net core3.1的版本,其他版本可能需要自行測試)
生產者(AspNetCore.WebApi.Producer)
在Startup中添加相關服務項:
public void ConfigureServices(IServiceCollection services) { var brokerUris = new string[] { "192.168.209.133:61616", "192.168.209.134:61616", "192.168.209.135:61616" }; string userName = "test"; string password = "123456"; #region 日志記錄 services.AddLogging(builder => { builder.SetMinimumLevel(LogLevel.Trace); }); services.AddActiveLogger(options => { options.IsCluster = true; options.ApplicationName = "WebApi"; options.BrokerUris = brokerUris; options.Category = "Home"; options.UseQueue = false; options.Destination = "logger"; options.MinLevel = LogLevel.Warning; options.InitializeCount = 10; options.IsPersistent = true; options.Password = password; options.UserName = userName; }); #endregion #region Active services.AddActiveProducer("active.queue", options => { options.IsCluster = true; options.BrokerUris = brokerUris; options.Destination = "active.queue"; options.IsPersistent = true; options.Transactional = false; options.Password = password; options.UserName = userName; }); services.AddActiveProducer("active.topic", options => { options.IsCluster = true; options.BrokerUris = brokerUris; options.Destination = "active.topic"; options.IsPersistent = true; options.Transactional = false; options.Password = password; options.UserName = userName; }); #endregion ...... }
說明一下,對於日志記錄,使用AddActiveLogger拓展方法注入ActiveLoggerProvider,這樣當使用.net core的ILogger機制發送消息時,就可以直接將消息發送到ActiveMQ中去了。
如果是普通的發布消息到ActiveMQ,需要先聲明生產者的配置,在使用生產者時,只需要注入IActiveProducerFactory接口,然后使用這個接口的創建生產者就可以了,比如Home控制器中的用法:
[ApiController] [Route("[controller]")] public class HomeController : ControllerBase { ILogger<HomeController> logger; IActiveProducerFactory activeProducerFactory; public HomeController(ILogger<HomeController> logger, IActiveProducerFactory activeProducerFactory) { this.logger = logger; this.activeProducerFactory = activeProducerFactory; } /// <summary> /// 日志 /// </summary> /// <param name="message"></param> /// <returns></returns> [HttpGet] public string Get(string message) { logger.LogTrace($"Trace:{message}"); logger.LogDebug($"Debug:{message}"); logger.LogInformation($"Information:{message}"); logger.LogWarning($"Warning:{message}"); logger.LogError($"Error:{message}"); logger.LogCritical($"Critical:{message}"); return "success"; } /// <summary> /// 發送消息到隊列 /// </summary> /// <param name="message">消息</param> /// <returns>success</returns> [HttpGet("Queue")] public async Task<object> Queue(string message) { message = message ?? ""; var producer = activeProducerFactory.Create("active.queue"); await producer.SendAsync(message); return "success"; } /// <summary> /// 發送消息到Topic /// </summary> /// <param name="message">消息</param> /// <returns>success</returns> [HttpGet("Topic")] public async Task<object> Topic(string message) { message = message ?? ""; var producer = activeProducerFactory.Create("active.topic"); await producer.PublishAsync(message); return "success"; } }
消費者(AspNetCore.WebApi.Consumer)
消費者注入就簡單了,只需要在Startup中聲明消費者配置及消息處理過程就可以了:
public void ConfigureServices(IServiceCollection services) { var brokerUris = new string[] { "192.168.209.133:61616", "192.168.209.134:61616", "192.168.209.135:61616" }; string userName = "test"; string password = "123456"; #region 日志記錄 services.AddActiveConsumer(options => { options.IsCluster = true; options.BrokerUris = brokerUris; options.ClientId = "logger"; options.Durable = true; options.FromQueue = false; options.Destination = "logger"; options.AutoAcknowledge = true; options.SubscriberName = "logger"; options.Password = password; options.UserName = userName; }).AddListener(result => { Console.WriteLine("Message From Topic logger:" + result.Message); }); #endregion #region Active services.AddActiveConsumer(options => { options.IsCluster = true; options.BrokerUris = brokerUris; options.Durable = false; options.Destination = "active.queue"; options.AutoAcknowledge = false; options.FromQueue = true; options.Password = password; options.UserName = userName; }).AddListener(result => { Console.WriteLine("Message From queue:" + result.Message); result.Commit(); }); services.AddActiveConsumer(options => { options.IsCluster = true; options.BrokerUris = brokerUris; options.Durable = true; options.Destination = "active.topic"; options.AutoAcknowledge = false; options.FromQueue = false; options.Password = password; options.UserName = userName; options.ClientId = "active.topic"; options.PrefetchCount = 10; }).AddListener<MyActiveConsumerListener>();#endregion ...... }
聲明消費者使用AddActiveConsumer拓展方法,它返回一個builder,通過它的AddListener方法添加監聽消息的處理程序,可以采用一個委托作為,也可以采用一個實現了IActiveConsumerListener接口的類,比這里的MyActiveConsumerListener:
public class MyActiveConsumerListener : IActiveConsumerListener { public Task ConsumeAsync(RecieveResult result) { Console.WriteLine("Message From topic:" + result.Message); result.Commit(); return Task.CompletedTask; } }