什么是事物
例如:事物 所有看到的一切都是事物,不能看到的也是事物
例如:團隊微服務,成員微服務,聚合微服務,網關api,認證中心等等包括類,對象
所有的事件都是事物變化的結果
大家接觸事件最早就是在js 或者是c#高級特性。大家對於事件不默認,但是對於事件不是很好理解
事件就是指事物狀態的變化,每一次事物變化的結果都稱作為事件
什么是事件總線
就是用來管理所有的事件的一種機制就稱作為事件總線
包括事件發布,事件存儲,事件訂閱,事件處理的統稱
作用:
事件總線是一種機制,它允許不同的組件彼此通信而不彼此了解。 組件可以將事件發送到Eventbus,而無需知道是誰來接聽或有多少其他人來接聽。 組件也可以偵聽Eventbus上的事件,而無需知道誰發送了事件。 這樣,組件可以相互通信而無需相互依賴。 同樣,很容易替換一個組件。 只要新組件了解正在發送和接收的事件,其他組件就永遠不會知道.
為什么要使用事件總線
將微服務系統各組件之間進行解耦
使用業務的發展來說
事件總線框架
CAP
masstransit
CAP內部概念
事件 : 就是一些狀態信息
發布者:發布事件的角色 cap
訂閱者:訂閱消費事件的角色 cap
消息傳輸器:傳輸事件
消息存儲器:存儲事件
CAP存儲事件消息隊列類型Transport
Azure
rabbitmq
kafaka
In Memory Queue
CAP存儲事件持久化類型
SQL Server
MySQL
PostgreSQL
MongoDB
InMemoryStorage
CAP事件監控
Dashboard
微服務系統中如何使用CAP
條件
1、微服務系統
2、RabbitMQ
3、SqlServer
4、CAP
步驟
1、微服務系統准備
微服務系統全部准備完畢
2、RabbitMQ准備
2.1 環境准備
Erlang下載地址:https://www.erlang.org/downloads
RabbitMQ下載地址:https://www.rabbitmq.com/download.html
2.2 RabbitMQ 啟動
1、在安裝目錄下添加可視化插件
rabbitmq-plugins enable rabbitmq_management
2、在安裝目錄下啟動
rabbitmq-server
3、查看rabbitmq狀態
rabbitmqctl status
4、在瀏覽器輸入http://127.0.0.1:15672
訪問rabbitmq后台系統
3、SqlServer准備
SqlServer啟動,安裝
4、CAP准備
4.1 CAP環境
CAP官網地址:https://cap.dotnetcore.xyz/user-guide/zh/monitoring/dashboard/
4.2 CPA配置
1、在MicroService.Core項目中添加依賴
CAP Nuget DotNetCore.CAP
CAP傳輸器Nuget DotNetCore.CAP.RabbitMQ
CAP持久化DotNetCore.CAP.SqlServer
2、在MicroService.AggregateService服務中startup.cs中添加
// 8、添加事件總線cap
services.AddCap(x => {
// 8.1 使用內存存儲消息(消息發送失敗處理)
x.UseInMemoryStorage();
// 8.2 使用RabbitMQ進行事件中心處理
x.UseRabbitMQ(rb => {
rb.HostName = "localhost";
rb.UserName = "guest";
rb.Password = "guest";
rb.Port = 5672;
rb.VirtualHost = "/";
});
});
2.1 在AggregateController.cs中注入ICapPublisher
private readonly ICapPublisher capPublisher;
public TeamsController(ICapPublisher capPublisher)
{
this.capPublisher = capPublisher;
}
3、在MicroService.VideoService服務startup.cs中添加
// 8、添加事件總線cap
services.AddCap(x => {
// 8.1 使用RabbitMQ進行事件中心處理
x.UseRabbitMQ(rb => {
rb.HostName = "localhost";
rb.UserName = "guest";
rb.Password = "guest";
rb.Port = 5672;
rb.VirtualHost = "/";
});
});
3.1 在VideoController.cs 中方法上添加特性[CapSubscribe]
/// <summary>
/// 視頻添加(異步添加)
/// </summary>
/// <param name="Video"></param>
/// <returns></returns>
[NonAction]
[CapSubscribe("tontcap")]
public ActionResult<Video> PostVideo(Video Video)
{
videoService.Create(Video);
return CreatedAtAction("GetVideo", new { id = Video.Id }, Video);
}
4、效果展示
RabbitMQ宕機情況
步驟
1、將RabbitMQ直接關閉
事件消息無法發送,存儲到內存緩存中
2、當將RabbitMQ啟動后,消息正常發送
內部使用定時器輪詢機制實現
AggregateService宕機情況
AggregateService 執行業務成功,發送消息前宕機
使用本地消息表解決(思想:持久化操作)
條件
1、本地消息表
步驟
1、在MicroService.AggregateService服務中
1.1 創建Context文件,然后在Context文件夾內創建AggregateContext
/// <summary> /// Aggregate服務上下文 /// </summary> public class AggregateContext : DbContext { public AggregateContext(DbContextOptions<AggregateContext> options) : base(options) { } }
1.2 在appsettings.json中添加
{ "ConnectionStrings": { "DefaultConnection": "Data Source=.;Initial Catalog=aggregateservice;Persist Security Info=True;User ID=sa;Password=tony" } }
1.3 在startup.cs中添加消息持久化
// 9、注冊上下文到IOC容器 services.AddDbContext<AggregateContext>(options => { options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); }); // 8、添加事件總線cap services.AddCap(x => { // 8.1 使用EntityFramework進行存儲操作 x.UseEntityFramework<AggregateContext>(); // 8.2 使用sqlserver進行事務處理 x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); // 8.2 使用RabbitMQ進行事件中心處理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); });
1.4測試演示效果
數據庫中多了兩張表
當業務執行成功,發送消息時,聚合微服務宕機,消息被持久化到數據庫
當重啟聚合微服務時,消息發送成功,被成功消費
1.5 原理
1、定時器 消息重試
2、冪等性 一個函數每次都是相同的結果,狀態只有一個
VideoService宕機情況
VideoService接受消息失敗
當VideoService直接宕機的時候接受消息失敗,
然后重啟VideoService消息消費成功
VideoService接受消息成功執行失敗
條件
1、本地消息表
步驟
1、在MicroService.Core項目中
1.1 安裝SqlServer
Nuget DotNetCore.CAP.SqlServer
2、在MicroService.VideoService項目中
2.1 在startup.cs中添加消息持久化
// 8、添加事件總線cap services.AddCap(x => { // 8.1 使用EntityFramework進行存儲操作 x.UseEntityFramework<AggregateContext>(); // 8.2 使用sqlserver進行事務處理 x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); // 8.2 使用RabbitMQ進行事件中心處理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); });
2.2 效果展示
數據庫多了兩張表
2.3 原理
1、定時器 消息重試
2、冪等性 一個函數每次都是相同的結果,狀態只有一個
消息重試完還是消費失敗情況
使用人工干預實現
條件
1、Dashboard -- 后台管理頁面
步驟
1、在MicroService.Core項目中
1.1 安裝Dashboard
Nuget DotNetCore.CAP.Dashboard
2、在MicroService.VideoService項目中
2.1 在startup.cs中添加Dashboard
// 8、添加事件總線cap services.AddCap(x => { // 8.1 使用EntityFramework進行存儲操作 x.UseEntityFramework<AggregateContext>(); // 8.2 使用sqlserver進行事務處理 x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); // 8.3 使用RabbitMQ進行事件中心處理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); // 8.4添加cap后台監控頁面 x.UseDashboard(); });
2.2 運行打開cap后台監控頁面
對於發送失敗的消息進行重復發送
對於消費失敗的消息進行重復消費