參考:
事件總線
什么是事件總線
就是用來管理所有的事件的一種機制就稱作為事件總線,包括事件發布,事件存儲,事件訂閱,事件處理的統稱
作用
事件總線是一種機制,它允許不同的組件彼此通信而不彼此了解。 組件可以將事件發送到Eventbus,而無需知道是誰來接聽或有多少其他人來接聽。 組件也可以偵聽Eventbus上的事件,而無需知道誰發送了事件。 這樣,組件可以相互通信而無需相互依賴。 同樣,很容易替換一個組件。 只要新組件了解正在發送和接收的事件,其他組件就永遠不會知道
CAP框架
CAP 是一個EventBus,同時也是一個在微服務或者SOA系統中解決分布式事務問題的一個框架。它有助於創建可擴展,可靠並且易於更改的微服務系統。
CAP 支持以下幾種運輸方式:
存儲方式分為內存和數據庫
RabbitMQ
RabbitMQ是消息代理:它接受並轉發消息。您可以將其視為郵局:將您要發布的郵件放在郵箱中時,可以確保Mailperson先生或女士最終將郵件傳遞給收件人。以此類推,RabbitMQ是一個郵箱,一個郵局和一個郵遞員。
RabbitMQ與郵局之間的主要區別在於,它不處理紙張,而是接收,存儲和轉發數據消息的二進制斑點。
六種隊列模式:先進先出,后進后出
發布訂閱模式:一般是聚合服務發布消息給RabbitMQ,RabbitMQ再發消息發動訂閱者服務,通過RabbitMQ解耦,兩個服務不直接通信,是RabbitMQ調用服務,不再是聚合服務調用服務
- 發布(Publish):程序把請求消息發布給RabbitMQ后直接返回,不用再等請求執行完畢再返回,對應數據庫發布表:Published
- 訂閱(Subscrib):程序訂閱RabbitMQ后,RabbitMQ把消息逐一推送給訂閱者,也就是RabbitMQ調用訂閱者服務,對應數據庫訂閱表:Received
連接參數:
- HostName: 宿主地址
- UserName: 用戶名
- Password: 密碼
- VirtualHost: 虛擬主機
- Port: 端口號
- TopicExchangeName: CAP默認Exchange名稱
- QueueMessageExpires: 隊列中消息自動刪除時間
- ConnectionFactoryOptions: 本機連接工廠選項
微服務中使用CAP框架的RabbitMQ方式
環境准備
RabbitMQ的開發語言環境:Erlang下載地址
RabbitMQ下載地址
RabbitMQ啟動
1、在安裝目錄下添加可視化插件命令
rabbitmq-plugins enable rabbitmq_management
2、在安裝目錄sbin文件夾下啟動
rabbitmq-server
3、sbin文件夾下查看rabbitmq狀態,最后兩行就是端口號,默認是15672
rabbitmqctl status
4、在瀏覽器輸入http://127.0.0.1:15672
訪問rabbitmq后台系統,默認賬號和密碼都是:guest
在項目中使用RabbitMQ
MicroService.Core項目安裝NuGet包
- CAP: Nuget DotNetCore.CAP
- CAP傳輸器:Nuget DotNetCore.CAP.RabbitMQ
- CAPSql持久化:DotNetCore.CAP.SqlServer
- CAP內存持久化:DotNetCore.CAP.InMemoryStorage
在聚合服務 MicroService.AggregateService服務中startup.cs中使用CAP
// 添加事件總線cap services.AddCap(x => { // 使用內存存儲消息(消息發送失敗處理) x.UseInMemoryStorage();
//使用EntityFramework進行存儲操作,指定數據庫上下文就好,不用配置 //x.UseEntityFramework<AggregateContext>(); // 使用RabbitMQ進行事件中心處理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); });
在聚合服務MicroService.AggregateService服務中AggregateController.cs中注入且使用ICapPublisher,把消息發布給RabbitMQ
using DotNetCore.CAP;
private readonly ICapPublisher capPublisher; public TeamsController(ICapPublisher capPublisher) { this.capPublisher = capPublisher; } //異步發布對象消息 capPublisher.PublishAsync<Video>("video.event.1", video);
在視頻服務MicroService.VideoService服務startup.cs中使用CAP
// 添加事件總線cap services.AddCap(x => { // 使用RabbitMQ進行事件中心處理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); });
在視頻服務MicroService.VideoService服務VideoController.cs 的PostVideo方法上添加過濾器特性[CapSubscribe] ,從RabbitMQ中訂閱,且處理這些請求
using DotNetCore.CAP; /// <summary> /// 視頻添加: /// 從消息隊列獲取請求 /// </summary> /// <param name="Video"></param> /// <returns></returns> [NonAction] //指示控制器方法不是動作方法,不讓控制器調用 [CapSubscribe("video.*")] //訂閱消息:RabbitMQ推送消息給訂閱者 public ActionResult<Video> PostVideo(Video Video) { Console.WriteLine($"接受到視頻事件消息"); videoService.Create(Video); return CreatedAtAction("GetVideo", new { id = Video.Id }, Video); }