NETCORE - CAP的使用
CAP 同時支持使用 RabbitMQ,Kafka,Azure Service Bus 等進行底層之間的消息發送。
CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 數據庫的項目。
安裝部署RabbbitMQ:https://www.cnblogs.com/1285026182YUAN/p/12896851.html
文中以 CAP + MySql + RabbitMQ +EF 為例:
由於樣例項目為 netcore 2.2 框架。
安裝DotNetCore.CAP nuGet包,此處安裝 2.6.0版本。
安裝EF nuGet包。
配置 appsettings.json 數據 。

{ "Logging": { "LogLevel": { "Default": "Warning" } }, "ConnectionStrings": { "Mysql_Conn": "Server=localhost;port=3306;Database=db1;UserId=root;Password=123456", }, "RabbitMQ": { "HostName": "192.168.122.199", "UserName": "admin", "Password": "123456", "VirtualHost": "vhost_lihy", "Port": 5672, "ExchangeName": "cap.text.lihy.exchange" }, "AllowedHosts": "*" }
根據底層消息隊列,你可以選擇引入不同的包:
PM> Install-Package DotNetCore.CAP.Kafka PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.AzureServiceBus
CAP 目前支持使用 SQL Server, PostgreSql, MySql, MongoDB 的項目,你可以選擇引入不同的包:
PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql PM> Install-Package DotNetCore.CAP.PostgreSql PM> Install-Package DotNetCore.CAP.MongoDB //需要 MongoDB 4.0+ 集群
在 Startup.cs
文件中,添加如下配置:

public void ConfigureServices(IServiceCollection services) { services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); services.AddDbContext<CapDbContext>(options => options.UseMySql(Configuration.GetConnectionString("Mysql_Conn"))); services.AddCap(x => { //如果你使用的 EF 進行數據操作,你需要添加如下配置: x.UseEntityFramework<CapDbContext>(); //可選項,你不需要再次配置 x.UseSqlServer 了 //如果你使用的ADO.NET,根據數據庫選擇進行配置: //x.UseSqlServer("數據庫連接字符串"); //x.UseMySql("server=localhost;port=3306;userid=root;password=123456;database=db1;SslMode=none"); //x.UsePostgreSql("數據庫連接字符串"); //如果你使用的 MongoDB,你可以添加如下配置: //x.UseMongoDB("ConnectionStrings"); //注意,僅支持MongoDB 4.0+集群 //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作為MQ,根據使用選擇配置: x.UseRabbitMQ(o => { o.HostName = Configuration.GetSection("RabbitMQ")["HostName"]; o.UserName = Configuration.GetSection("RabbitMQ")["UserName"]; o.Password = Configuration.GetSection("RabbitMQ")["Password"]; o.VirtualHost = Configuration.GetSection("RabbitMQ")["VirtualHost"]; o.Port = Convert.ToInt32(Configuration.GetSection("RabbitMQ")["Port"]); //指定Topic exchange名稱,不指定的話會用默認的 o.ExchangeName = Configuration.GetSection("RabbitMQ")["ExchangeName"]; }); //設置處理成功的數據在數據庫中保存的時間(秒),為保證系統新能,數據會定期清理。 x.SucceedMessageExpiredAfter = 24 * 3600; //設置失敗重試次數 x.FailedRetryCount = 5; //x.UseKafka("ConnectionStrings"); //x.UseAzureServiceBus("ConnectionStrings"); x.UseDashboard(); }); }
發布事件/消息
新建 PublishController 控制器

using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using DotNetCore.CAP; using Microsoft.AspNetCore.Mvc; using MySql.Data.MySqlClient; // For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860 namespace NETCORE.CAP.Controllers { [Route("api/[controller]")] public class PublishController : Controller { private readonly ICapPublisher _capBus; public PublishController(ICapPublisher capPublisher) { _capBus = capPublisher; } /// <summary> /// 不使用事務 /// </summary> /// <returns></returns> [Route("~/without/transaction")] public IActionResult WithoutTransaction() { _capBus.Publish("xxx.services.show.time", DateTime.Now); return Ok(); } ////Ado.Net 中使用事務,自動提交 //[Route("~/adonet/transaction")] //public IActionResult AdonetWithTransaction() //{ // using (var connection = new MySqlConnection(ConnectionString)) // { // using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true)) // { // //業務代碼 // _capBus.Publish("xxx.services.show.time", DateTime.Now); // } // } // return Ok(); //} ////EntityFramework 中使用事務,自動提交 //[Route("~/ef/transaction")] //public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext) //{ // using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true)) // { // //業務代碼 // _capBus.Publish("xxx.services.show.time", DateTime.Now); // } // return Ok(); //} } }
訂閱事件/消息
新建 ReceivedController 控制器

using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using DotNetCore.CAP; using Microsoft.AspNetCore.Mvc; // For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860 namespace NETCORE.CAP.Controllers { [Route("api/[controller]")] public class ReceivedController : Controller { [NonAction] [CapSubscribe("xxx.services.show.time")] public void CheckReceivedMessage(DateTime time) { Console.WriteLine(time); //return Task.CompletedTask; } } }
運行后,
數據庫即生成兩張表
調用接口 https://localhost:5001/without/transaction
在數據表中可查看相關狀態。
Cap 儀表盤
默認地址 https://localhost:5001/cap
Dashboard介紹
capOptions.UseDashboard(dashoptions => { dashoptions.AppPath = "applicationpath"; dashoptions.PathMatch = "/cap"; dashoptions.Authorization = new[] { new CapDashboardFilter() }; });
這里只說這幾個參數
AppPath:應用程序路徑 訪問dashboard的時候會有一個返回應用的操作,這個即是應用的地址
PathMatch:不設置的情況下都是cap,可以指定自己的dashboard路由地址
Authorization:授權處理
授權處理具體實現
只需要實現接口IDashboardAuthorizationFilter即可
public class CapDashboardFilter : IDashboardAuthorizationFilter { public bool Authorize(DashboardContext context) { return true; } }
通過DashboardContext上下文處理請求,允許返回true,不允許返回false
附代碼:https://gitee.com/wuxincaicai/NETCORE.git
參考:https://www.cnblogs.com/savorboard/p/cap.html
參考:https://www.sohu.com/a/290918943_120050810
參考:https://blog.csdn.net/weixin_30802273/article/details/94977889