AOP +FreeSql 跨方法異步事務


AOP +FreeSql 跨方法異步事務

  • Autofac.Extensions.DependencyInjection
  • Autofac.Extras.DynamicProxy
  • Castle.Core.AsyncInterceptor(異步方法AOP攔截)

源碼

csproj

        <PackageReference Include="Autofac.Extensions.DependencyInjection" Version="6.0.0" />
        <PackageReference Include="Autofac.Extras.DynamicProxy" Version="5.0.0" />
        <PackageReference Include="Castle.Core.AsyncInterceptor" Version="1.7.0" />

使用Autofac實現特性標簽,事務處理

創建一個標識事務的特性標簽

[AttributeUsage(AttributeTargets.Method, Inherited = true)]
public class TransactionalAttribute : Attribute
{
    /// <summary>
    /// 事務傳播方式
    /// </summary>
    public Propagation Propagation { get; set; } = Propagation.Required;

    /// <summary>
    /// 事務隔離級別
    /// </summary>
    public IsolationLevel? IsolationLevel { get; set; }
}

Autofac

Program.CS 替換默認的DI CreateHostBuilder方法

 Host.CreateDefaultBuilder(args).UseServiceProviderFactory(new AutofacServiceProviderFactory())

Startup.cs配置服務

public void ConfigureContainer(ContainerBuilder builder)
{
    builder.RegisterModule(new AutofacModule());
}

這里給BlogService方法注入UnitOfWorkInterceptor攔截處理。直接注入類。

public class AutofacModule : Autofac.Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterType<UnitOfWorkInterceptor>();
        builder.RegisterType<UnitOfWorkAsyncInterceptor>();
        
        builder.RegisterType<BlogService>()
            .InterceptedBy(typeof(UnitOfWorkInterceptor))
            .EnableClassInterceptors();
            
}
    List<Type> interceptorServiceTypes = new List<Type>()
    {
        typeof(UnitOfWorkInterceptor)
    };  
    //service所在dll
    Assembly servicesDllFile = Assembly.Load("LinCms.Application");
    
    builder.RegisterAssemblyTypes(servicesDllFile)
            .Where(a => a.Name.EndsWith("Service"))
            .AsImplementedInterfaces()
            .InstancePerLifetimeScope()
            .PropertiesAutowired()// 屬性注入
            .InterceptedBy(interceptorServiceTypes.ToArray())
            .EnableInterfaceInterceptors();

AOP

    public class UnitOfWorkInterceptor : IInterceptor
    {
        private readonly UnitOfWorkAsyncInterceptor asyncInterceptor;

        public UnitOfWorkInterceptor(UnitOfWorkAsyncInterceptor interceptor)
        {
            asyncInterceptor = interceptor;
        }

        public void Intercept(IInvocation invocation)
        {
            asyncInterceptor.ToInterceptor().Intercept(invocation);
        }
    }

    public class UnitOfWorkAsyncInterceptor : IAsyncInterceptor
    {
        private readonly UnitOfWorkManager _unitOfWorkManager;
        private readonly ILogger<UnitOfWorkAsyncInterceptor> _logger;
        IUnitOfWork _unitOfWork;

        public UnitOfWorkAsyncInterceptor(UnitOfWorkManager unitOfWorkManager, ILogger<UnitOfWorkAsyncInterceptor> logger)
        {
            _unitOfWorkManager = unitOfWorkManager;
            _logger = logger;
        }

        private bool TryBegin(IInvocation invocation)
        {
            //_unitOfWork = _unitOfWorkManager.Begin(Propagation.Requierd);
            //return true;
            var method = invocation.MethodInvocationTarget ?? invocation.Method;
            var attribute = method.GetCustomAttributes(typeof(TransactionalAttribute), false).FirstOrDefault();
            if (attribute is TransactionalAttribute transaction)
            {
                _unitOfWork = _unitOfWorkManager.Begin(transaction.Propagation, transaction.IsolationLevel);
                return true;
            }

            return false;
        }

        /// <summary>
        /// 攔截同步執行的方法
        /// </summary>
        /// <param name="invocation"></param>
        public void InterceptSynchronous(IInvocation invocation)
        {
            if (TryBegin(invocation))
            {
                int? hashCode = _unitOfWork.GetHashCode();
                try
                {
                    invocation.Proceed();
                    _logger.LogInformation($"----- 攔截同步執行的方法-事務 {hashCode} 提交前----- ");
                    _unitOfWork.Commit();
                    _logger.LogInformation($"----- 攔截同步執行的方法-事務 {hashCode} 提交成功----- ");
                }
                catch
                {
                    _logger.LogError($"----- 攔截同步執行的方法-事務 {hashCode} 提交失敗----- ");
                    _unitOfWork.Rollback();
                    throw;
                }
                finally
                {
                    _unitOfWork.Dispose();
                }
            }
            else
            {
                invocation.Proceed();
            }
        }

        /// <summary>
        /// 攔截返回結果為Task的方法
        /// </summary>
        /// <param name="invocation"></param>
        public void InterceptAsynchronous(IInvocation invocation)
        {
            if (TryBegin(invocation))
            {
                invocation.ReturnValue = InternalInterceptAsynchronous(invocation);
            }
            else
            {
                invocation.Proceed();
            }
        }

        private async Task InternalInterceptAsynchronous(IInvocation invocation)
        {
            string methodName =
                $"{invocation.MethodInvocationTarget.DeclaringType?.FullName}.{invocation.Method.Name}()";
            int? hashCode = _unitOfWork.GetHashCode();

            using (_logger.BeginScope("_unitOfWork:{hashCode}", hashCode))
            {
                _logger.LogInformation($"----- async Task 開始事務{hashCode} {methodName}----- ");

                invocation.Proceed();

                try
                {
                 //處理Task返回一個null值的情況會導致空指針
                    if (invocation.ReturnValue != null)
                    {
                        await (Task)invocation.ReturnValue;
                    }
                    _unitOfWork.Commit();
                    _logger.LogInformation($"----- async Task 事務 {hashCode} Commit----- ");
                }
                catch (System.Exception)
                {
                    _unitOfWork.Rollback();
                    _logger.LogError($"----- async Task 事務 {hashCode} Rollback----- ");
                    throw;
                }
                finally
                {
                    _unitOfWork.Dispose();
                }
            }

        }


        /// <summary>
        /// 攔截返回結果為Task<TResult>的方法
        /// </summary>
        /// <param name="invocation"></param>
        /// <typeparam name="TResult"></typeparam>
        public void InterceptAsynchronous<TResult>(IInvocation invocation)
        {
            invocation.ReturnValue = InternalInterceptAsynchronous<TResult>(invocation);
        }

        private async Task<TResult> InternalInterceptAsynchronous<TResult>(IInvocation invocation)
        {
            TResult result;
            if (TryBegin(invocation))
            {
                string methodName = $"{invocation.MethodInvocationTarget.DeclaringType?.FullName}.{invocation.Method.Name}()";
                int hashCode = _unitOfWork.GetHashCode();
                _logger.LogInformation($"----- async Task<TResult> 開始事務{hashCode} {methodName}----- ");

                try
                {
                    invocation.Proceed();
                    result = await (Task<TResult>)invocation.ReturnValue;
                    _unitOfWork.Commit();
                    _logger.LogInformation($"----- async Task<TResult> Commit事務{hashCode}----- ");
                }
                catch (System.Exception)
                {
                    _unitOfWork.Rollback();
                    _logger.LogError($"----- async Task<TResult> Rollback事務{hashCode}----- ");
                    throw;
                }
                finally
                {
                    _unitOfWork.Dispose();
                }
            }
            else
            {
                invocation.Proceed();
                result = await (Task<TResult>)invocation.ReturnValue;
            }
            return result;
        }
    }

當Service層沒有接口,則必須使用virtual虛方法。

    public class BlogService
    {
        /// <summary>
        /// 當出現異常時,不會插入數據
        /// </summary>
        /// <param name="createBlogDto"></param>
        [Transactional]
        public virtual void CreateBlogTransactional(CreateBlogDto createBlogDto)
        {
            Blog blog = _mapper.Map<Blog>(createBlogDto);
            blog.CreateTime = DateTime.Now;
            _blogRepository.Insert(blog);

            List<Tag> tags = new List<Tag>();
            createBlogDto.Tags.ForEach(r =>
            {
                tags.Add(new Tag { TagName = r });
            });
            if (createBlogDto.Title == "abc")
            {
                throw new Exception("test exception");
            }
            _tagRepository.Insert(tags);
        }
    }

單獨開啟事務,注入Service,也可通過AutoFac注入。

 services.AddScoped<TransBlogService>();

也可直接在Service層使用UnitOfWorkManager創建UnitOfWork

  • TransBlogService.cs
private readonly IBaseRepository<Blog, int> _blogRepository;
private readonly IBaseRepository<Tag, int> _tagRepository;
private readonly UnitOfWorkManager _unitOfWorkManager;

public TransBlogService(IBaseRepository<Blog, int> blogRepository, IBaseRepository<Tag, int> tagRepository,UnitOfWorkManager unitOfWorkManager)
{
    _blogRepository = blogRepository ;
    _tagRepository = tagRepository ;
    _unitOfWorkManager = unitOfWorkManager;
}

public async Task CreateBlogUnitOfWorkAsync(Blog blog,List<Tag>tagList)
{
    using (IUnitOfWork unitOfWork = _unitOfWorkManager.Begin())
    {
        try
        {
            await _blogRepository.InsertAsync(blog);
            tagList.ForEach(r =>
            {
                r.PostId = blog.Id;
            });
            await _tagRepository.InsertAsync(tagList);
            unitOfWork.Commit();
        }
        catch (Exception e)
        {     
            //實際 可以不Rollback。因為IUnitOfWork內部Dispose,會把沒有Commit的事務Rollback回來,但能提前Rollback
        
            unitOfWork.Rollback();
            //記錄日志、或繼續throw;出來
        }
    }
}

public async Task UpdateBlogAsync(int id)
{
    using (IUnitOfWork unitOfWork = _unitOfWorkManager.Begin())
    {
        try
        {
            Blog blog = _blogRepository.Select.Where(r => r.Id == id).First();
            blog.IsDeleted = true;
            await _blogRepository.UpdateAsync(blog);
            unitOfWork.Commit();
        }
        catch (Exception e)
        {
           //記錄日志、或繼續throw;出來
            unitOfWork.Rollback();
        }
    }
}
IUnitOfWork 成員 說明
IFreeSql Orm 該對象 Select/Delete/Insert/Update/InsertOrUpdate 與工作單元事務保持一致,可省略傳遞 WithTransaction
DbTransaction GetOrBeginTransaction() 開啟事務,或者返回已開啟的事務
void Commit() 提交事務
void Rollback() 回滾事務
DbContext.EntityChangeReport EntityChangeReport 工作單元內的實體變化跟蹤

完整的代碼

以上使用的是泛型倉儲,那我們如果是重寫一個倉儲 如何保持和UnitOfWorkManager同一個事務呢。
繼承現有的DefaultRepository<,>倉儲,實現自定義的倉儲BlogRepository.cs,

    public class BlogRepository : DefaultRepository<Blog, int>, IBlogRepository
    {
        public BlogRepository(UnitOfWorkManager uowm) : base(uowm?.Orm, uowm)
        {
        }

        public List<Blog> GetBlogs()
        {
            return Select.Page(1, 10).ToList();
        }
    }

其中接口。IBlogRepository.cs

    public interface IBlogRepository : IBaseRepository<Blog, int>
    {
        List<Blog> GetBlogs();
    }

在 startup.cs注入此服務

    services.AddScoped<IBlogRepository, BlogRepository>();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM