一、什么是Cap
CAP 是一個基於 .NET Standard 的 C# 庫,它是一種處理分布式事務的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高性能等特點。
在我們構建 SOA 或者 微服務系統的過程中,我們通常需要使用事件來對各個服務進行集成,在這過程中簡單的使用消息隊列並不能保證數據的最終一致性, CAP 采用的是和當前數據庫集成的本地消息表的方案來解決在分布式系統互相調用的各個環節可能出現的異常,它能夠保證任何情況下事件消息都是不會丟失的。
你同樣可以把 CAP 當做 EventBus 來使用,CAP提供了一種更加簡單的方式來實現事件消息的發布和訂閱,在訂閱以及發布的過程中,你不需要繼承或實現任何接口。
以下是CAP集在ASP.NET Core 微服務架構中的一個示意圖:
二、安裝
你可以運行以下下命令在你的項目中安裝 CAP。
PM> Install-Package DotNetCore.CAP
CAP 支持 Kafka、RabbitMQ、AzureServiceBus 等消息隊列,你可以按需選擇下面的包進行安裝:
PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
我們這里采用RabbitMQ,安裝教程請轉到另一篇文章:Winows下安裝RabbitMQ
CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的擴展作為數據庫存儲:
// 按需選擇安裝你正在使用的數據庫
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB
我們這里采用SqlServer
三、配置
首先配置CAP到 Startup.cs 文件中,如下:

public void ConfigureServices(IServiceCollection services) { ...... services.AddDbContext<AppDbContext>(); services.AddCap(x => { //如果你使用的 EF 進行數據操作,你需要添加如下配置: x.UseEntityFramework<AppDbContext>(); //可選項,你不需要再次配置 x.UseSqlServer 了 //如果你使用的ADO.NET,根據數據庫選擇進行配置: x.UseSqlServer("數據庫連接字符串"); x.UseMySql("數據庫連接字符串"); x.UsePostgreSql("數據庫連接字符串"); //如果你使用的 MongoDB,你可以添加如下配置: x.UseMongoDB("ConnectionStrings"); //注意,僅支持MongoDB 4.0+集群 //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作為MQ,根據使用選擇配置: x.UseRabbitMQ("ConnectionStrings"); x.UseKafka("ConnectionStrings"); x.UseAzureServiceBus("ConnectionStrings"); }); }
我們這里采用EF數據庫配置和RabbitMQ如下:

services.AddCap(x => { //如果你使用的 EF 進行數據操作,你需要添加如下配置: x.UseEntityFramework<SysContext>(); x.UseRabbitMQ("localhost"); });
四、啟動
運行程序,將在數據庫生成Cap.Published和Cap.Received表如下圖所示:
五、發布和訂閱
在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 進行消息發送

private readonly ICapPublisher _capBus; public ValuesController(ICapPublisher capPublisher) { _capBus = capPublisher; } // GET api/values [HttpGet] public ActionResult<IEnumerable<string>> Get() { _capBus.Publish("xxx.services.show.time", DateTime.Now); return new string[] { "value1", "value2" }; } [CapSubscribe("xxx.services.show.time")] public void ShowTime(DateTime datetime) { }
運行程序,Cap.Published和Cap.Received將生成發布消息和訂閱消息的記錄
如果你的訂閱方法沒有位於 Controller 中,則你訂閱的類需要繼承 ICapSubscribe:

namespace xxx.Service { public interface ISubscriberService { public void CheckReceivedMessage(DateTime datetime); } public class SubscriberService: ISubscriberService, ICapSubscribe { [CapSubscribe("xxx.services.show.time")] public void CheckReceivedMessage(DateTime datetime) { } } }
然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService 類

public void ConfigureServices(IServiceCollection services) { //注意: 注入的服務需要在 `services.AddCap()` 之前 services.AddTransient<ISubscriberService,SubscriberService>(); services.AddCap(x=>{}); }
六、訂閱者組
訂閱者組的概念類似於 Kafka 中的消費者組,它和消息隊列中的廣播模式相同,用來處理不同微服務實例之間同時消費相同的消息。
當CAP啟動的時候,她將創建一個默認的消費者組,如果多個相同消費者組的消費者消費同一個Topic消息的時候,只會有一個消費者被執行。 相反,如果消費者都位於不同的消費者組,則所有的消費者都會被執行。
相同的實例中,你可以通過下面的方式來指定他們位於不同的消費者組。

[CapSubscribe("xxx.services.show.time", Group = "group1" )] public void ShowTime1(DateTime datetime) { } [CapSubscribe("xxx.services.show.time", Group = "group2")] public void ShowTime2(DateTime datetime) { }
ShowTime1 和 ShowTime2 處於不同的組,他們將會被同時調用。
PS,你可以通過下面的方式來指定默認的消費者組名稱:

services.AddCap(x => { x.DefaultGroup = "default-group-name"; });
七、Dashboard
CAP 2.1+ 以上版本中提供了儀表盤(Dashboard)功能,你可以很方便的查看發出和接收到的消息。除此之外,你還可以在儀表盤中實時查看發送或者接收到的消息。
在分布式環境中,儀表盤內置集成了 Consul 作為節點的注冊發現,同時實現了網關代理功能,你同樣可以方便的查看本節點或者其他節點的數據,它就像你訪問本地資源一樣。

services.AddCap(x => { //... // 注冊 Dashboard x.UseDashboard(); // 注冊節點到 Consul x.UseDiscovery(d => { d.DiscoveryServerHostName = "localhost"; d.DiscoveryServerPort = 8500; d.CurrentNodeHostName = "localhost"; d.CurrentNodePort = 5800; d.NodeId = 1; d.NodeName = "CAP No.1 Node"; }); });
儀表盤默認的訪問地址是:http://localhost:xxx/cap,你可以在d.MatchPath配置項中修改cap路徑后綴為其他的名字。
參考資料:https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md