.Net Core 5.x Api開發筆記 -- 消息隊列RabbitMQ實現事件總線EventBus(二)


上一節說了事件總線

本節在消息隊列中實現事件處理:.Net Core 5.x Api開發筆記 -- 消息隊列RabbitMQ實現事件總線EventBus(一)

既然是消息隊列,就需要有生產者和消費者(訂閱)

 1 public interface IMessageQueue
 2 {
 3     /// <summary>
 4     /// 發布消息
 5     /// </summary>
 6     void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class;
 7 
 8     /// <summary>
 9     /// 訂閱消息
10     /// </summary>
11     void Consume<T>(Func<T, Task<bool>> func, string queueName = "");
12 }

生產者端實現發布消息接口:

 1 public class RabbitMQPublishClient : IMessageQueue
 2 {
 3     private static ConcurrentDictionary<string, string> QueueDic = new ConcurrentDictionary<string, string>();
 4     public RabbitMQPublishClient(){}
 5     
 6     /// <summary>
 7     /// 發布消息
 8     /// </summary>
 9     /// <typeparam name="T"></typeparam>
10     /// <param name="message">消息實體</param>
11     /// <param name="exchangeName">交換機名稱(默認default)</param>
12     /// <param name="queueName">隊列名稱(默認類名)</param>
13     public void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class
14     {
15         using (var conn = GetConnection())
16         {
17             using (var channel = conn.CreateModel())
18             {
19                 if (!QueueDic.ContainsKey(queueName) || QueueDic[queueName] != exchangeName)
20                 {
21                     CreateQueue(channel, exchangeName, queueName, true);
22                     QueueDic.TryAdd(queueName, exchangeName);
23                 }
24                 var props = channel.CreateBasicProperties();
25                 props.Persistent = true;  //消息持久化
26                 props.DeliveryMode = 2;   //消息持久化
27                 props.CorrelationId = Guid.NewGuid().ToString();
28 
29                 string content = JsonConvert.SerializeObject(message);
30                 var body = Encoding.UTF8.GetBytes(content);
31                 channel.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: body);
32             }
33         }
34     }
35 
36     /// <summary>
37     /// 創建隊列,綁定到交換機
38     /// </summary>
39     private void CreateQueue(IModel channel, string exchangeName, string queueName, bool isDurable = true)
40     {
41         channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: isDurable, autoDelete: false, arguments: null);
42         channel.QueueDeclare(queueName, durable: isDurable, exclusive: false, autoDelete: false, arguments: null);
43         channel.QueueBind(queueName, exchangeName, routingKey: queueName);
44     }
45 }

消費者(訂閱者)實現訂閱接口

 1 public class RabbitMQConsumerClient : IMessageQueue
 2 {
 3     public RabbitMQConsumerClient(){}
 4 
 5     /// <summary>
 6     /// 訂閱消息
 7     /// </summary>
 8     public void Consume<T>(Func<T, Task<bool>> func, string queueName = "")
 9     {
10         string exchangeName = exchangeName.Equals("default") ? $"{customKey}.default" : $"{customKey}.{queueName}";
11 
12         var collection = GetConnection();
13         var channel = collection.CreateModel();
14         channel.ExchangeDeclare(exchangeName, "direct", durable: true, autoDelete: false, arguments: null);
15         channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
16         channel.QueueBind(queueName, exchangeName, routingKey: queueName, arguments: null);
17 
18         //消費事件
19         var consumer = new EventingBasicConsumer(channel);
20         consumer.Received += async (sender, e) =>
21         {
22             try
23             {
24                 string content = Encoding.UTF8.GetString(e.Body.ToArray());
25                 T message = JsonConvert.DeserializeObject<T>(content);
26                 bool isSuccess = await func.Invoke(message);    //執行func委托  這里才是真正執行傳入的事件處理程序
27                 if (isSuccess)
28                 {
29                     channel.BasicAck(e.DeliveryTag, false);     //手動確認消息消費成功
30                 }
31                 else
32                 {
33                    channel.BasicReject(e.DeliveryTag, true);    //手動打回隊列,下次重新消費
34                 }
35             }
36             catch (Exception ex)
37             {
38                 channel.BasicAck(e.DeliveryTag, false);
39             }
40         };
41 
42         channel.BasicQos(0, 1, false);  //限制同時接收消息數量為1,true是將限制應用於channel級,false是將限制應用於consumer級
43         channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);   //訂閱消息   autoAck: false 表示手動確認消息
44     }
45 }

上邊都是平時很常用的功能邏輯,接下來是才是重點!!!

1,事件總線有了

2,生產者訂閱者也有了

接下來還需要做至少3件事:

1,創建事件處理程序,就是真正干活的

2,注冊消費者、注冊事件處理程序、綁定事件源和事件處理程序(在訂閱端)

3,訂閱端啟動消費者訂閱監控,並觸發事件處理程序

4,發布消息測試

-----------------------------------------------------------------

1,創建事件處理程序

 1 /// <summary>
 2 /// 事件處理程序,真正干活的是我
 3 /// 繼承IEventHandler<T>接口,其中T必須用EventWithData<TData>類型,因為EventWithData<TData>繼承了EventBase類
 4 /// </summary>
 5 public class Person_EventHandler : IEventHandler<EventWithData<Person>>
 6 {
 7     //最終干活的
 8     public async Task HandleEvent(EventWithData<Person> eventWithData)
 9     {
10         var data = eventWithData.Data;
11         if (data == null)
12         {
13             Console.WriteLine("么有數據呀!");
14         }
15         else
16         {
17             try
18             {
19                 Console.WriteLine($"{DateTime.Now}-------" + JsonConvert.SerializeObject(data));
20             }
21             catch (Exception ex)
22             {
23                 Console.WriteLine("異常:" + ex.Message);
24             }
25         }
26     }
27 }

2,注冊消費者、注冊事件處理程序、綁定事件源和事件處理程序(在訂閱端),這一步在 Startup 中實現!!!

 1 public IServiceCollection ServiceCollection { get; private set; }
 3 public void ConfigureServices(IServiceCollection services)
 4 {
 5     ServiceCollection = services;    //將services傳遞給ServiceCollection
 6     services.AddControllersWithViews();
 7 
 8     //注冊消費者
 9     services.AddSingleton<IMessageQueue, RabbitMQConsumerClient>();
10     //注冊事件處理程序,綁定事件源:IEventHandler<EventWithData<Person>>和事件處理程序:Person_EventHandler的關系
11     services.AddSingleton<Person_EventHandler>();
12 }
13 
14 public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
15 {
16     //綁定事件源和事件處理程序的映射關系  
17     //這里跟上邊的綁定不是一回事,這里僅僅就是綁定了一個映射關系,等到后邊事件觸發的時候需要用到該處映射關系
18     EventBus.Default.RegisterHandler<EventWithData<Person>, Person_EventHandler>();
19 
20     //其它中間件。。
21     。。。
22 
23     //啟動消費者訂閱監控入口
24     app.Consumer(app.ApplicationServices, ServiceCollection);
25 }

3,啟動消費者訂閱監控,並觸發事件處理程序

 1 /// <summary>
 2 /// 啟動消費者訂閱監控入口
 3 /// </summary>
 4 public static void Consumer(this IApplicationBuilder app, IServiceProvider provider, IServiceCollection services)
 5 {
 6     //獲取實例對象 
 7     IMessageQueue queue = provider.GetService<IMessageQueue>();   
 8 
 9     //異步調用,這里接收的消息類型T為:EventWithData<Person>  也可以是其它自定義消息類型,沒有限制
10     queue.Consume<EventWithData<Person>>(async message =>
11     {
12         using (var currentContainer = services.BuildServiceProvider().CreateScope())   //使用系統內置服務容器
13         {
14             IocManager.Configure(currentContainer.ServiceProvider);                //將容器服務傳遞過去
15             
16             var e = EventWithData<Person>.New(message.Data);      //這里要傳遞的消息類型必須使用 EventWithData<TData>類型初始化消息
17             
18             await EventBus.Default.TriggerAsync(e);               //調用觸發事件邏輯(需要提前綁定映射關系和注冊容器)
19             
20             return true;
21         }
22     });
23 }

說明:

IocManager.Configure(currentContainer.ServiceProvider)  實際上就是聲明了一個全局的 IServiceProvider 變量,然后將當前的ServiceProvider賦值給全局變量

await EventBus.Default.TriggerAsync(e)   執行的就是上一節事件總線中的 public async Task TriggerAsync<TEvent>(TEvent e)  方法

到這里 事件源和事件處理程序就一一對應上了。

接下來開始測試一下是否可用,先在發布的應用程序Startup中注冊一下消息隊列容器

1 services.AddSingleton<IMessageQueue, RabbitMQPublishClient>();

然后發布一條消息,注意:這里消息的類型EventWithData<Person>沒有限制的,你可以不使用EventWithData<Person>類型的消息也行

 1 //發布消息
 2 var message = EventWithData<Person>.New(
 3     new Person()
 4     {
 5         UserId = 1,
 6         UserName = "張三"
 7     }
 8 );
 9 
10 messageQueue.Publish(message);

最后測試,訂閱者成功消費了剛剛生產的消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM