FreeSql接入CAP的實踐


CAP

CAP 是一個基於 .NET Standard 的 C# 庫,它是一種處理分布式事務的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高性能等特點。

ADO.NET事務

1.DotNetCore.CAP.MySql中引用 了如下類庫.在Commit事務時,會調用 Flush方法推送消息​

<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.7" />
<PackageReference Include="MySqlConnector" Version="1.0.1" />
    public class MySqlCapTransaction : CapTransactionBase
    {
        public MySqlCapTransaction(
            IDispatcher dispatcher) : base(dispatcher)
        {
        }

        public override void Commit()
        {
            Debug.Assert(DbTransaction != null);

            switch (DbTransaction)
            {
                case IDbTransaction dbTransaction:
                    dbTransaction.Commit();
                    break;
                case IDbContextTransaction dbContextTransaction:
                    dbContextTransaction.Commit();
                    break;
            }
            Flush();
        }
    }

其中我們能看到,事務的提交,會調用父類CapTransactionBase中的方法Flush。他是protected類型的,並未開放出此接口。

       protected virtual void Flush()
        {
            while (!_bufferList.IsEmpty)
            {
                _bufferList.TryDequeue(out var message);

                _dispatcher.EnqueueToPublish(message);
            }
        }

我們來看一下集成 的demo調用

    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, true))
            {
                //your business code
                connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);

                //for (int i = 0; i < 5; i++)
                //{
                _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
                //}
            }
        }

        return Ok();
    }

代碼中通過擴展IDbConnection類,增加BeginTransaction方法,傳遞了注入的_capBus類,傳了autoCommit

private readonly ICapPublisher _capBus;

public PublishController(ICapPublisher capPublisher)
{
    _capBus = capPublisher;
}
/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
    ICapPublisher publisher, bool autoCommit = false)
{
    if (dbConnection.State == ConnectionState.Closed)
    {
        dbConnection.Open();
    }

    var dbTransaction = dbConnection.BeginTransaction();
    publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
    return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
}

autoCommit:false,(此屬性會自動提交事務,集成其他ORM,不建議開啟)因為,我們只要調用 了Publish,他會調用MySqlCapTransaction中的Commit(),並執行Flush,即消息 會發出去。
IDbContextTransaction

這段代碼是非常 重要的。

    publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();

從CapPublisher中可以看出,事務是通過AsyncLocal實現狀態共享的。

internal class CapPublisher : ICapPublisher
{
     public AsyncLocal<ICapTransaction> Transaction { get; }
}

publisher.Transaction.Value的類型實現上才是ICapTransaction ,

CapTransactionExtensions.cs還有一個擴展方法,調用Begin,相當於給當前控制器上注入的ICapPublisher設置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。

      public static ICapTransaction Begin(this ICapTransaction transaction,
            IDbTransaction dbTransaction, bool autoCommit = false)
        {
            transaction.DbTransaction = dbTransaction;
            transaction.AutoCommit = autoCommit;

            return transaction;
        }

對於ADO.NET,我們只要傳遞此transaction,就能保證發送消息和操作DB是一個事務了。。

EF Core事務

同樣,我們看擴展方法和使用方式

    public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
        ICapPublisher publisher, bool autoCommit = false)
    {
        var trans = database.BeginTransaction();
        publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
        var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
        return new CapEFDbTransaction(capTrans);
    }

dbContext.Database就是DatabaseFacade類型。直接能BeginTransaction事務。

[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
    using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))
    {
        dbContext.Persons.Add(new Person() { Name = "ef.transaction" });

        for (int i = 0; i < 1; i++)
        {
            _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
        }

        dbContext.SaveChanges();

        trans.Commit();
    }
    return Ok();
}

同樣,還有一個Begin擴展方法,僅僅是給ICapTransaction賦下值。

public static ICapTransaction Begin(this ICapTransaction transaction,
    IDbContextTransaction dbTransaction, bool autoCommit = false)
{
    transaction.DbTransaction = dbTransaction;
    transaction.AutoCommit = autoCommit;

    return transaction;
}

在這個demo,上,,autoCommit是false,因為dbContext有自己的SaveChanges(),如果發送不太合適。SaveChanges()要做好些操作,具體不太情況是什么,但要在Commit事務前的吧。。具體不詳細研究。

但我們可以看下CapTransactionBase源碼,DbTransaction是Object類型。

EF Core中的事務類型是IDbContextTransaction​

ADO.NET實際是IDbTransaction類型。

 public object DbTransaction { get; set; }

所以在最開始的那段代碼,判斷DbTransaction,是哪種類型,然后調用自身內部使用的事務進行Commit()。如果要集成其他ORM,但又想去掉EFCore的依賴,然后增加其他ORM,如下類似的處理,就是關鍵,比如CommitAsync,Commit,Roolback()

    public override void Commit()
    {
        Debug.Assert(DbTransaction != null);

        switch (DbTransaction)
        {
            case IDbTransaction dbTransaction:
                dbTransaction.Commit();
                break;
            case IDbContextTransaction dbContextTransaction:
                dbContextTransaction.Commit();
                break;
        }
        Flush();
    }

還有MySqlDataStorage.cs

判斷dbTransaction的類型,然后獲取當前事務,引用其他ORM,記得修改此處。

    var dbTrans = dbTransaction as IDbTransaction;
    if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
    {
        dbTrans = dbContextTrans.GetDbTransaction();
    }

參考項目(不打算維護)

FreeSql接入CAP(最簡單的方式)

關於此問題的想法

我們還是引用各自官方的庫

Install-Package DotNetCore.CAP.Dashboard
Install-Package DotNetCore.CAP.MySql
Install-Package DotNetCore.CAP.RabbitMQ
Install-Package FreeSql
Install-Package FreeSql.DbContext
Install-Package FreeSql.Provider.MySqlConnector

關於CAP集成的方式,配置項,這里不做詳情,官方地址有中文: http://cap.dotnetcore.xyz/

重寫擴展方法,BeginTransaction。是基於IUnitOfWork的擴展。

提交事務調用Commit(IUnitOfWork)時,內部再通過反射調用 ICapTransaction中protected類型的方法Flush。

  public static class CapUnitOfWorkExtensions
    {

        public static void Flush(this ICapTransaction capTransaction)
        {
            capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null);
        }

       
        public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false)
        {
            publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction));
            return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit);
        }

        public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork)
        {
            unitOfWork.Commit();
            capTransaction.Flush();
        }
    }

注入我們的FreeSql

public void ConfigureServices(IServiceCollection services)
 {
    IConfigurationSection configurationSection = Configuration.GetSection($"ConnectionStrings:MySql");
    IFreeSql fsql = new FreeSqlBuilder()
           .UseConnectionString(DataType.MySql, configurationSection.Value);
           .UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower)
           .UseAutoSyncStructure(true)
           .UseNoneCommandParameter(true)
           .UseMonitorCommand(cmd =>
           {
               Trace.WriteLine(cmd.CommandText + ";");
           }
           )
           .Build();


    services.AddSingleton(fsql);
    services.AddFreeRepository();
    services.AddScoped<UnitOfWorkManager>();
}

示例

    [HttpGet("~/freesql/unitofwork/{id}")]
    public DateTime UnitOfWorkManagerTransaction(int id, [FromServices] IBaseRepository<Book> repo)
    {
        DateTime now = DateTime.Now;
        using (IUnitOfWork uow = _unitOfWorkManager.Begin())
        {
            ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);
            repo.Insert(new Book()
            {
                Author = "luoyunchong",
                Summary = "2",
                Title = "122"
            });

            _capBus.Publish("freesql.time", now);
            trans.Commit(uow);
        }
        return now;
    }
    
    [NonAction]
    [CapSubscribe("freesql.time")]
    public void GetTime(DateTime time)
    {
        Console.WriteLine($"time:{time}");
    }

注意trans不需要using,freesql內部會釋放資源。,也可using,但請更新到最新的freesql版本。

ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);

提交事務,也請調用擴展方法,否則事務無法正常。

trans.Commit(uow);

源碼位置


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM