上一節說了事件總線
本節在消息隊列中實現事件處理:.Net Core 5.x Api開發筆記 -- 消息隊列RabbitMQ實現事件總線EventBus(一)
既然是消息隊列,就需要有生產者和消費者(訂閱)
1 public interface IMessageQueue 2 { 3 /// <summary> 4 /// 發布消息 5 /// </summary> 6 void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class; 7 8 /// <summary> 9 /// 訂閱消息 10 /// </summary> 11 void Consume<T>(Func<T, Task<bool>> func, string queueName = ""); 12 }
生產者端實現發布消息接口:
1 public class RabbitMQPublishClient : IMessageQueue 2 { 3 private static ConcurrentDictionary<string, string> QueueDic = new ConcurrentDictionary<string, string>(); 4 public RabbitMQPublishClient(){} 5 6 /// <summary> 7 /// 發布消息 8 /// </summary> 9 /// <typeparam name="T"></typeparam> 10 /// <param name="message">消息實體</param> 11 /// <param name="exchangeName">交換機名稱(默認default)</param> 12 /// <param name="queueName">隊列名稱(默認類名)</param> 13 public void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class 14 { 15 using (var conn = GetConnection()) 16 { 17 using (var channel = conn.CreateModel()) 18 { 19 if (!QueueDic.ContainsKey(queueName) || QueueDic[queueName] != exchangeName) 20 { 21 CreateQueue(channel, exchangeName, queueName, true); 22 QueueDic.TryAdd(queueName, exchangeName); 23 } 24 var props = channel.CreateBasicProperties(); 25 props.Persistent = true; //消息持久化 26 props.DeliveryMode = 2; //消息持久化 27 props.CorrelationId = Guid.NewGuid().ToString(); 28 29 string content = JsonConvert.SerializeObject(message); 30 var body = Encoding.UTF8.GetBytes(content); 31 channel.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: body); 32 } 33 } 34 } 35 36 /// <summary> 37 /// 創建隊列,綁定到交換機 38 /// </summary> 39 private void CreateQueue(IModel channel, string exchangeName, string queueName, bool isDurable = true) 40 { 41 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: isDurable, autoDelete: false, arguments: null); 42 channel.QueueDeclare(queueName, durable: isDurable, exclusive: false, autoDelete: false, arguments: null); 43 channel.QueueBind(queueName, exchangeName, routingKey: queueName); 44 } 45 }
消費者(訂閱者)實現訂閱接口
1 public class RabbitMQConsumerClient : IMessageQueue 2 { 3 public RabbitMQConsumerClient(){} 4 5 /// <summary> 6 /// 訂閱消息 7 /// </summary> 8 public void Consume<T>(Func<T, Task<bool>> func, string queueName = "") 9 { 10 string exchangeName = exchangeName.Equals("default") ? $"{customKey}.default" : $"{customKey}.{queueName}"; 11 12 var collection = GetConnection(); 13 var channel = collection.CreateModel(); 14 channel.ExchangeDeclare(exchangeName, "direct", durable: true, autoDelete: false, arguments: null); 15 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); 16 channel.QueueBind(queueName, exchangeName, routingKey: queueName, arguments: null); 17 18 //消費事件 19 var consumer = new EventingBasicConsumer(channel); 20 consumer.Received += async (sender, e) => 21 { 22 try 23 { 24 string content = Encoding.UTF8.GetString(e.Body.ToArray()); 25 T message = JsonConvert.DeserializeObject<T>(content); 26 bool isSuccess = await func.Invoke(message); //執行func委托 這里才是真正執行傳入的事件處理程序 27 if (isSuccess) 28 { 29 channel.BasicAck(e.DeliveryTag, false); //手動確認消息消費成功 30 } 31 else 32 { 33 channel.BasicReject(e.DeliveryTag, true); //手動打回隊列,下次重新消費 34 } 35 } 36 catch (Exception ex) 37 { 38 channel.BasicAck(e.DeliveryTag, false); 39 } 40 }; 41 42 channel.BasicQos(0, 1, false); //限制同時接收消息數量為1,true是將限制應用於channel級,false是將限制應用於consumer級 43 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); //訂閱消息 autoAck: false 表示手動確認消息 44 } 45 }
上邊都是平時很常用的功能邏輯,接下來是才是重點!!!
1,事件總線有了
2,生產者訂閱者也有了
接下來還需要做至少3件事:
1,創建事件處理程序,就是真正干活的
2,注冊消費者、注冊事件處理程序、綁定事件源和事件處理程序(在訂閱端)
3,訂閱端啟動消費者訂閱監控,並觸發事件處理程序
4,發布消息測試
-----------------------------------------------------------------
1,創建事件處理程序
1 /// <summary> 2 /// 事件處理程序,真正干活的是我 3 /// 繼承IEventHandler<T>接口,其中T必須用EventWithData<TData>類型,因為EventWithData<TData>繼承了EventBase類 4 /// </summary> 5 public class Person_EventHandler : IEventHandler<EventWithData<Person>> 6 { 7 //最終干活的 8 public async Task HandleEvent(EventWithData<Person> eventWithData) 9 { 10 var data = eventWithData.Data; 11 if (data == null) 12 { 13 Console.WriteLine("么有數據呀!"); 14 } 15 else 16 { 17 try 18 { 19 Console.WriteLine($"{DateTime.Now}-------" + JsonConvert.SerializeObject(data)); 20 } 21 catch (Exception ex) 22 { 23 Console.WriteLine("異常:" + ex.Message); 24 } 25 } 26 } 27 }
2,注冊消費者、注冊事件處理程序、綁定事件源和事件處理程序(在訂閱端),這一步在 Startup 中實現!!!
1 public IServiceCollection ServiceCollection { get; private set; } 3 public void ConfigureServices(IServiceCollection services) 4 { 5 ServiceCollection = services; //將services傳遞給ServiceCollection 6 services.AddControllersWithViews(); 7 8 //注冊消費者 9 services.AddSingleton<IMessageQueue, RabbitMQConsumerClient>(); 10 //注冊事件處理程序,綁定事件源:IEventHandler<EventWithData<Person>>和事件處理程序:Person_EventHandler的關系 11 services.AddSingleton<Person_EventHandler>(); 12 } 13 14 public void Configure(IApplicationBuilder app, IWebHostEnvironment env) 15 { 16 //綁定事件源和事件處理程序的映射關系 17 //這里跟上邊的綁定不是一回事,這里僅僅就是綁定了一個映射關系,等到后邊事件觸發的時候需要用到該處映射關系 18 EventBus.Default.RegisterHandler<EventWithData<Person>, Person_EventHandler>(); 19 20 //其它中間件。。 21 。。。 22 23 //啟動消費者訂閱監控入口 24 app.Consumer(app.ApplicationServices, ServiceCollection); 25 }
3,啟動消費者訂閱監控,並觸發事件處理程序
1 /// <summary> 2 /// 啟動消費者訂閱監控入口 3 /// </summary> 4 public static void Consumer(this IApplicationBuilder app, IServiceProvider provider, IServiceCollection services) 5 { 6 //獲取實例對象 7 IMessageQueue queue = provider.GetService<IMessageQueue>(); 8 9 //異步調用,這里接收的消息類型T為:EventWithData<Person> 也可以是其它自定義消息類型,沒有限制 10 queue.Consume<EventWithData<Person>>(async message => 11 { 12 using (var currentContainer = services.BuildServiceProvider().CreateScope()) //使用系統內置服務容器 13 { 14 IocManager.Configure(currentContainer.ServiceProvider); //將容器服務傳遞過去 15 16 var e = EventWithData<Person>.New(message.Data); //這里要傳遞的消息類型必須使用 EventWithData<TData>類型初始化消息 17 18 await EventBus.Default.TriggerAsync(e); //調用觸發事件邏輯(需要提前綁定映射關系和注冊容器) 19 20 return true; 21 } 22 }); 23 }
說明:
IocManager.Configure(currentContainer.ServiceProvider) 實際上就是聲明了一個全局的 IServiceProvider 變量,然后將當前的ServiceProvider賦值給全局變量
await EventBus.Default.TriggerAsync(e) 執行的就是上一節事件總線中的 public async Task TriggerAsync<TEvent>(TEvent e) 方法
到這里 事件源和事件處理程序就一一對應上了。
接下來開始測試一下是否可用,先在發布的應用程序Startup中注冊一下消息隊列容器
1 services.AddSingleton<IMessageQueue, RabbitMQPublishClient>();
然后發布一條消息,注意:這里消息的類型EventWithData<Person>沒有限制的,你可以不使用EventWithData<Person>類型的消息也行
1 //發布消息 2 var message = EventWithData<Person>.New( 3 new Person() 4 { 5 UserId = 1, 6 UserName = "張三" 7 } 8 ); 9 10 messageQueue.Publish(message);
最后測試,訂閱者成功消費了剛剛生產的消息