.NET Core 事件總線,分布式事務解決方案:CAP


背景

相信前面幾篇關於微服務的文章也介紹了那么多了,在構建微服務的過程中確實需要這么一個東西,即便不是在構建微服務,那么在構建分布式應用的過程中也會遇到分布式事務的問題,那么 CAP 就是在這樣的背景下誕生的。

最初打算做這個東西是在去年(2016)年底,最初是為了解決分布式系統中的分布式事務的問題,然后當時有了一個大概的概念輪廓,當時我對於前面兩篇文章中關於異步消息和微服務之間通訊還不是太了解,只是覺得這樣能夠解決這一系列的問題,然后就着手做了,最后發現和這些概念竟然不謀而合。

經過大半年的不斷重構以及修改,最終 CAP 1.0 版本發布了。作為一個開源項目,最初項目是在我的個人Github下,然后於上個月已經貢獻給了 .NET China Foundation 組織,目前該項目由我和 DotNetCore 項目組共同維護。

CAP 介紹

Github:https://github.com/dotnetcore/CAP

開源協議:MIT

CAP 是一個在分布式系統中(SOA,MicroService)實現事件總線及最終一致性(分布式事務)的一個開源的 C# 庫,她具有輕量級,高性能,易使用等特點。

你可以輕松的在基於 .NET Core 技術的分布式系統中引入CAP,包括但限於 ASP.NET Core 和 ASP.NET Core on .NET Framework。

CAP 以 NuGet 包的形式提供,對項目無任何入侵,你仍然可以以你喜愛的方式來構建分布式系統。

CAP 具有 Event Bus 的所有功能,並且CAP提供了更加簡化的方式來處理EventBus中的發布/訂閱。

CAP 具有消息持久化的功能,也就是當你的服務進行重啟或者宕機時,她可以保證消息的可靠性。

CAP 實現了分布式事務中的最終一致性,你不用再去處理這些瑣碎的細節。

CAP 提供了基於 Microsoft DI 的 API 服務,她可以和你的 ASP.NET Core 系統進行無縫結合,並且能夠和你的業務代碼集成支持強一致性的事務處理。

CAP 是開源免費的。CAP基於MIT協議開源,你可以免費的在你的私人或者商業項目中使用,不會有人向你收取任何費用。

Getting Started

目前, CAP 同時支持使用 RabbitMQ,Kafka,Azure Service Bus 等進行底層之間的消息發送,你不需要具備這些消息隊列的使用經驗,仍然可以輕松的集成到項目中。

CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 數據庫的項目。

CAP 同時支持使用 EntityFrameworkCore 和 ADO.NET 的項目,你可以根據需要選擇不同的配置方式。

下面是CAP在系統中的一個不完全示意圖:

圖中實線部分代表用戶代碼,虛線部分代表CAP內部實現。

下面,我們看一下 CAP 怎么集成到項目中:

Step 1:

你可以運行下面的命令來安裝CAP NuGet 包:

PM> Install-Package DotNetCore.CAP

根據底層消息隊列,你可以選擇引入不同的包:

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP 目前支持使用 SQL Server, PostgreSql, MySql, MongoDB 的項目,你可以選擇引入不同的包:

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB     //需要 MongoDB 4.0+ 集群

Step 2:

Startup.cs 文件中,添加如下配置:

public void ConfigureServices(IServiceCollection services)
{
    ......

    services.AddDbContext<AppDbContext>();

    services.AddCap(x =>
    {
        //如果你使用的 EF 進行數據操作,你需要添加如下配置:
        x.UseEntityFramework<AppDbContext>();  //可選項,你不需要再次配置 x.UseSqlServer 了
		
        //如果你使用的ADO.NET,根據數據庫選擇進行配置:
        x.UseSqlServer("數據庫連接字符串");
        x.UseMySql("數據庫連接字符串");
        x.UsePostgreSql("數據庫連接字符串");

        //如果你使用的 MongoDB,你可以添加如下配置:
        x.UseMongoDB("ConnectionStrings");  //注意,僅支持MongoDB 4.0+集群
	
        //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作為MQ,根據使用選擇配置:
        x.UseRabbitMQ("ConnectionStrings");
        x.UseKafka("ConnectionStrings");
        x.UseAzureServiceBus("ConnectionStrings");
    });
}

發布事件/消息

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 進行消息發布:

public class PublishController : Controller
{
    private readonly ICapPublisher _capBus;

    public PublishController(ICapPublisher capPublisher)
    {
        _capBus = capPublisher;
    }
    
    //不使用事務
    [Route("~/without/transaction")]
    public IActionResult WithoutTransaction()
    {
        _capBus.Publish("xxx.services.show.time", DateTime.Now);
	
        return Ok();
    }

    //Ado.Net 中使用事務,自動提交
    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
            {
                //業務代碼

                _capBus.Publish("xxx.services.show.time", DateTime.Now);
            }
        }
        return Ok();
    }

    //EntityFramework 中使用事務,自動提交
    [Route("~/ef/transaction")]
    public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
    {
        using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
        {
            //業務代碼

            _capBus.Publish("xxx.services.show.time", DateTime.Now);
        }
        return Ok();
    }
}

訂閱事件/消息

Controller 中:
如果是在Controller中,直接添加[CapSubscribe("")] 來訂閱相關消息。

public class PublishController : Controller
{
    [NoAction]
    [CapSubscribe("xxx.services.show.time")]
    public async Task CheckReceivedMessage(DateTime time)
    {
    	Console.WriteLine(time);
    	return Task.CompletedTask;
    }
}

xxxService 中:
如果你的方法沒有位於Controller 中,那么你訂閱的類需要繼承 ICapSubscribe,然后添加[CapSubscribe("")]標記:

namespace xxx.Service
{
    public interface ISubscriberService
    {
    	public void CheckReceivedMessage(DateTime time);
    }
    
    
    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
    	[CapSubscribe("xxxx.services.show.time")]
    	public void CheckReceivedMessage(DateTime time)
    	{
    		
    	}
    }
}

然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();
}

結束了,怎么樣,是不是很簡單?

鳴謝

感謝 lan Ye 同學對本項目的英文翻譯工作。

感謝 AlexLEWIS 同學對本項目的其他支持。

感謝 .NET China Foundation 團隊成員對本項目的支持。

總結

如果你有任何問題,都可以去 Github 給我們提交 Issue,我們會在第一時間處理。

如果你覺得這個開源項目還不錯,給個Github Star 支持一下那就太好了。

如果你覺得本篇文章對你有幫助的話,可以關注一下博主,順手點個【推薦】哦。

GitHub stars
GitHub forks


本文地址:http://www.cnblogs.com/savorboard/p/cap.html
作者博客:Savorboard
歡迎轉載,請在明顯位置給出出處及鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM