事件總線--參考文檔


事件總線

什么是事物

例如:事物 所有看到的一切都是事物,不能看到的也是事物

例如:團隊微服務,成員微服務,聚合微服務,網關api,認證中心等等包括類,對象

所有的事件都是事物變化的結果

大家接觸事件最早就是在js 或者是c#高級特性。大家對於事件不默認,但是對於事件不是很好理解

什么是事件

事件就是指事物狀態的變化,每一次事物變化的結果都稱作為事件

什么是事件總線

就是用來管理所有的事件的一種機制就稱作為事件總線

包括事件發布,事件存儲,事件訂閱,事件處理的統稱

作用:

事件總線是一種機制,它允許不同的組件彼此通信而不彼此了解。 組件可以將事件發送到Eventbus,而無需知道是誰來接聽或有多少其他人來接聽。 組件也可以偵聽Eventbus上的事件,而無需知道誰發送了事件。 這樣,組件可以相互通信而無需相互依賴。 同樣,很容易替換一個組件。 只要新組件了解正在發送和接收的事件,其他組件就永遠不會知道.

為什么要使用事件總線

將微服務系統各組件之間進行解耦

使用業務的發展來說

事件總線框架

CAP

masstransit

CAP內部概念

事件 : 就是一些狀態信息

發布者:發布事件的角色 cap

訂閱者:訂閱消費事件的角色 cap

消息傳輸器:傳輸事件

消息存儲器:存儲事件

CAP存儲事件消息隊列類型Transport

Azure

rabbitmq

kafaka

In Memory Queue

CAP存儲事件持久化類型

SQL Server

MySQL

PostgreSQL

MongoDB

InMemoryStorage

CAP事件監控

Dashboard

微服務系統中如何使用CAP

條件

1、微服務系統

2、RabbitMQ

3、SqlServer

4、CAP

步驟

1、微服務系統准備

微服務系統全部准備完畢

2、RabbitMQ准備

2.1 環境准備

Erlang下載地址:https://www.erlang.org/downloads

RabbitMQ下載地址:https://www.rabbitmq.com/download.html

2.2 RabbitMQ 啟動

1、在安裝目錄下添加可視化插件

rabbitmq-plugins enable rabbitmq_management

2、在安裝目錄下啟動

 rabbitmq-server 

3、查看rabbitmq狀態

    rabbitmqctl status

4、在瀏覽器輸入http://127.0.0.1:15672

訪問rabbitmq后台系統

3、SqlServer准備

SqlServer啟動,安裝

4、CAP准備

4.1 CAP環境

CAP官網地址:https://cap.dotnetcore.xyz/user-guide/zh/monitoring/dashboard/

4.2 CPA配置

1、在MicroService.Core項目中添加依賴

CAP Nuget DotNetCore.CAP
CAP傳輸器Nuget DotNetCore.CAP.RabbitMQ
CAP持久化DotNetCore.CAP.SqlServer

2、在MicroService.AggregateService服務中startup.cs中添加

 // 8、添加事件總線cap
          services.AddCap(x => {
              // 8.1 使用內存存儲消息(消息發送失敗處理)
              x.UseInMemoryStorage();

              // 8.2 使用RabbitMQ進行事件中心處理
              x.UseRabbitMQ(rb => {
                  rb.HostName = "localhost";
                  rb.UserName = "guest";
                  rb.Password = "guest";
                  rb.Port = 5672;
                  rb.VirtualHost = "/";
              });
          });

2.1 在AggregateController.cs中注入ICapPublisher

    private readonly ICapPublisher capPublisher;
public TeamsController(ICapPublisher capPublisher)
  {
      this.capPublisher = capPublisher;
  }

3、在MicroService.VideoService服務startup.cs中添加

 // 8、添加事件總線cap
          services.AddCap(x => {
              // 8.1 使用RabbitMQ進行事件中心處理
              x.UseRabbitMQ(rb => {
                  rb.HostName = "localhost";
                  rb.UserName = "guest";
                  rb.Password = "guest";
                  rb.Port = 5672;
                  rb.VirtualHost = "/";
              });
          });

3.1 在VideoController.cs 中方法上添加特性[CapSubscribe]

       /// <summary>
      /// 視頻添加(異步添加)
      /// </summary>
      /// <param name="Video"></param>
      /// <returns></returns>
      [NonAction]
      [CapSubscribe("tontcap")]
      public ActionResult<Video> PostVideo(Video Video)
      {
          videoService.Create(Video);
      return CreatedAtAction("GetVideo", new { id = Video.Id }, Video);
  }

4、效果展示​

RabbitMQ宕機情況

步驟

1、將RabbitMQ直接關閉

事件消息無法發送,存儲到內存緩存中

2、當將RabbitMQ啟動后,消息正常發送

內部使用定時器輪詢機制實現

AggregateService宕機情況

AggregateService 執行業務成功,發送消息前宕機

使用本地消息表解決(思想:持久化操作)

條件

1、本地消息表

步驟

1、在MicroService.AggregateService服務中

1.1 創建Context文件,然后在Context文件夾內創建AggregateContext

/// <summary>
    /// Aggregate服務上下文
    /// </summary>
    public class AggregateContext : DbContext
    {
        public AggregateContext(DbContextOptions<AggregateContext> options) : base(options)
        {
        }
}

1.2 在appsettings.json中添加

{
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=.;Initial Catalog=aggregateservice;Persist Security Info=True;User ID=sa;Password=tony"
  }
}

1.3 在startup.cs中添加消息持久化

         // 9、注冊上下文到IOC容器
            services.AddDbContext<AggregateContext>(options =>
            {
                options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
            });
        // 8、添加事件總線cap
        services.AddCap(x => {
            // 8.1 使用EntityFramework進行存儲操作
            x.UseEntityFramework<AggregateContext>();
            // 8.2 使用sqlserver進行事務處理
            x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
        // 8.2 使用RabbitMQ進行事件中心處理
        x.UseRabbitMQ(rb => {
            rb.HostName = "localhost";
            rb.UserName = "guest";
            rb.Password = "guest";
            rb.Port = 5672;
            rb.VirtualHost = "/";
        });
    });

1.4測試演示效果

數據庫中多了兩張表

1589706447269

當業務執行成功,發送消息時,聚合微服務宕機,消息被持久化到數據庫

當重啟聚合微服務時,消息發送成功,被成功消費

1.5 原理

1、定時器 消息重試

2、冪等性 一個函數每次都是相同的結果,狀態只有一個

VideoService宕機情況

VideoService接受消息失敗

當VideoService直接宕機的時候接受消息失敗,

然后重啟VideoService消息消費成功

VideoService接受消息成功執行失敗

條件

1、本地消息表

步驟

1、在MicroService.Core項目中

1.1 安裝SqlServer

Nuget DotNetCore.CAP.SqlServer

2、在MicroService.VideoService項目中

2.1 在startup.cs中添加消息持久化

    // 8、添加事件總線cap
    services.AddCap(x => {
        // 8.1 使用EntityFramework進行存儲操作
        x.UseEntityFramework<AggregateContext>();
        // 8.2 使用sqlserver進行事務處理
        x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
    // 8.2 使用RabbitMQ進行事件中心處理
    x.UseRabbitMQ(rb => {
        rb.HostName = "localhost";
        rb.UserName = "guest";
        rb.Password = "guest";
        rb.Port = 5672;
        rb.VirtualHost = "/";
    });
});

2.2 效果展示

數據庫多了兩張表

1589706483154

2.3 原理

1、定時器 消息重試

2、冪等性 一個函數每次都是相同的結果,狀態只有一個

消息重試完還是消費失敗情況

使用人工干預實現

條件

1、Dashboard -- 后台管理頁面

步驟

1、在MicroService.Core項目中

1.1 安裝Dashboard

Nuget DotNetCore.CAP.Dashboard

2、在MicroService.VideoService項目中

2.1 在startup.cs中添加Dashboard

// 8、添加事件總線cap
services.AddCap(x => {
    // 8.1 使用EntityFramework進行存儲操作
    x.UseEntityFramework<AggregateContext>();
    // 8.2 使用sqlserver進行事務處理
    x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
// 8.3 使用RabbitMQ進行事件中心處理
x.UseRabbitMQ(rb => {
    rb.HostName = "localhost";
    rb.UserName = "guest";
    rb.Password = "guest";
    rb.Port = 5672;
    rb.VirtualHost = "/";
});

// 8.4添加cap后台監控頁面
    x.UseDashboard();
});

2.2 運行打開cap后台監控頁面

http://localhost:5007/cap

對於發送失敗的消息進行重復發送

對於消費失敗的消息進行重復消費

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM