CAP
CAP 是一個基於 .NET Standard 的 C# 庫,它是一種處理分布式事務的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高性能等特點。
ADO.NET事務
1.DotNetCore.CAP.MySql中引用 了如下類庫.在Commit事務時,會調用 Flush方法推送消息
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.7" />
<PackageReference Include="MySqlConnector" Version="1.0.1" />
public class MySqlCapTransaction : CapTransactionBase
{
public MySqlCapTransaction(
IDispatcher dispatcher) : base(dispatcher)
{
}
public override void Commit()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Commit();
break;
}
Flush();
}
}
其中我們能看到,事務的提交,會調用父類CapTransactionBase中的方法Flush。他是protected類型的,並未開放出此接口。
protected virtual void Flush()
{
while (!_bufferList.IsEmpty)
{
_bufferList.TryDequeue(out var message);
_dispatcher.EnqueueToPublish(message);
}
}
我們來看一下集成 的demo調用
[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, true))
{
//your business code
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);
//for (int i = 0; i < 5; i++)
//{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
//}
}
}
return Ok();
}
代碼中通過擴展IDbConnection類,增加BeginTransaction方法,傳遞了注入的_capBus類,傳了autoCommit
private readonly ICapPublisher _capBus;
public PublishController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}
/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
ICapPublisher publisher, bool autoCommit = false)
{
if (dbConnection.State == ConnectionState.Closed)
{
dbConnection.Open();
}
var dbTransaction = dbConnection.BeginTransaction();
publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
}
autoCommit:false,(此屬性會自動提交事務,集成其他ORM,不建議開啟)因為,我們只要調用 了Publish,他會調用MySqlCapTransaction中的Commit(),並執行Flush,即消息 會發出去。
IDbContextTransaction
這段代碼是非常 重要的。
publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
從CapPublisher中可以看出,事務是通過AsyncLocal實現狀態共享的。
internal class CapPublisher : ICapPublisher
{
public AsyncLocal<ICapTransaction> Transaction { get; }
}
publisher.Transaction.Value的類型實現上才是ICapTransaction ,
CapTransactionExtensions.cs還有一個擴展方法,調用Begin,相當於給當前控制器上注入的ICapPublisher設置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
對於ADO.NET,我們只要傳遞此transaction,就能保證發送消息和操作DB是一個事務了。。
EF Core事務
同樣,我們看擴展方法和使用方式
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
ICapPublisher publisher, bool autoCommit = false)
{
var trans = database.BeginTransaction();
publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
return new CapEFDbTransaction(capTrans);
}
dbContext.Database就是DatabaseFacade類型。直接能BeginTransaction事務。
[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))
{
dbContext.Persons.Add(new Person() { Name = "ef.transaction" });
for (int i = 0; i < 1; i++)
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
dbContext.SaveChanges();
trans.Commit();
}
return Ok();
}
同樣,還有一個Begin擴展方法,僅僅是給ICapTransaction賦下值。
public static ICapTransaction Begin(this ICapTransaction transaction,
IDbContextTransaction dbTransaction, bool autoCommit = false)
{
transaction.DbTransaction = dbTransaction;
transaction.AutoCommit = autoCommit;
return transaction;
}
在這個demo,上,,autoCommit是false,因為dbContext有自己的SaveChanges(),如果發送不太合適。SaveChanges()要做好些操作,具體不太情況是什么,但要在Commit事務前的吧。。具體不詳細研究。
但我們可以看下CapTransactionBase源碼,DbTransaction是Object類型。
EF Core中的事務類型是IDbContextTransaction
ADO.NET實際是IDbTransaction類型。
public object DbTransaction { get; set; }
所以在最開始的那段代碼,判斷DbTransaction,是哪種類型,然后調用自身內部使用的事務進行Commit()。如果要集成其他ORM,但又想去掉EFCore的依賴,然后增加其他ORM,如下類似的處理,就是關鍵,比如CommitAsync,Commit,Roolback()
public override void Commit()
{
Debug.Assert(DbTransaction != null);
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
dbContextTransaction.Commit();
break;
}
Flush();
}
還有MySqlDataStorage.cs
判斷dbTransaction的類型,然后獲取當前事務,引用其他ORM,記得修改此處。
var dbTrans = dbTransaction as IDbTransaction;
if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
參考項目(不打算維護)
-
維護就要保證與上層Dotnetcore/cap項目保持同步,這是一件困難的事。
-
還有一個重要的原因是:我們有更簡單的方式。
FreeSql接入CAP(最簡單的方式)
關於此問題的想法
我們還是引用各自官方的庫
Install-Package DotNetCore.CAP.Dashboard
Install-Package DotNetCore.CAP.MySql
Install-Package DotNetCore.CAP.RabbitMQ
Install-Package FreeSql
Install-Package FreeSql.DbContext
Install-Package FreeSql.Provider.MySqlConnector
關於CAP集成的方式,配置項,這里不做詳情,官方地址有中文: http://cap.dotnetcore.xyz/
- 代碼參考。因為只有一個類,我們自行復制項目即可。
- https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Application/CapUnitOfWorkExtensions.cs
重寫擴展方法,BeginTransaction。是基於IUnitOfWork的擴展。
提交事務調用Commit(IUnitOfWork)時,內部再通過反射調用 ICapTransaction中protected類型的方法Flush。
public static class CapUnitOfWorkExtensions
{
public static void Flush(this ICapTransaction capTransaction)
{
capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null);
}
public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false)
{
publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction));
return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit);
}
public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork)
{
unitOfWork.Commit();
capTransaction.Flush();
}
}
注入我們的FreeSql
public void ConfigureServices(IServiceCollection services)
{
IConfigurationSection configurationSection = Configuration.GetSection($"ConnectionStrings:MySql");
IFreeSql fsql = new FreeSqlBuilder()
.UseConnectionString(DataType.MySql, configurationSection.Value);
.UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower)
.UseAutoSyncStructure(true)
.UseNoneCommandParameter(true)
.UseMonitorCommand(cmd =>
{
Trace.WriteLine(cmd.CommandText + ";");
}
)
.Build();
services.AddSingleton(fsql);
services.AddFreeRepository();
services.AddScoped<UnitOfWorkManager>();
}
示例
[HttpGet("~/freesql/unitofwork/{id}")]
public DateTime UnitOfWorkManagerTransaction(int id, [FromServices] IBaseRepository<Book> repo)
{
DateTime now = DateTime.Now;
using (IUnitOfWork uow = _unitOfWorkManager.Begin())
{
ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);
repo.Insert(new Book()
{
Author = "luoyunchong",
Summary = "2",
Title = "122"
});
_capBus.Publish("freesql.time", now);
trans.Commit(uow);
}
return now;
}
[NonAction]
[CapSubscribe("freesql.time")]
public void GetTime(DateTime time)
{
Console.WriteLine($"time:{time}");
}
注意trans不需要using,freesql內部會釋放資源。,也可using,但請更新到最新的freesql版本。
ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);
提交事務,也請調用擴展方法,否則事務無法正常。
trans.Commit(uow);
源碼位置