在很多一主多從數據庫的場景下,很多開發同學為了復用DbContext往往采用創建一個包含所有DbSet<Model>父類通過繼承派生出Write和ReadOnly類型來實現,其實可以通過命名注入來實現一個類型注冊多個實例來實現。下面來用代碼演示一下。
一、環境准備
數據庫選擇比較流行的postgresql,我們這里選擇使用helm來快速的從開源包管理社區bitnami拉取一個postgresql的chart來搭建一個簡易的主從數據庫作為環境,,執行命令如下:
注意這里我們需要申明architecture為replication來創建主從架構,否則默認的standalone只會創建一個實例副本。同時我們需要暴露一下svc的端口用於驗證以及預設一下root的密碼,避免從secret重新查詢。
helm repo add bitnami https://charts.bitnami.com/bitnami helm install mypg --set global.postgresql.auth.postgresPassword=Mytestpwd#123 --set architecture=replication --set primary.service.type=NodePort --set primary.service.nodePorts.postgresql=32508 --set readReplicas.service.type=NodePort --set readReplicas.service.nodePorts.postgresql=31877 bitnami/postgresql
關於helm安裝集群其他方面的細節可以查看文檔,這里不再展開。安裝完成后我們可以get po 以及get svc看到主從實例已經部署好了,並且服務也按照預期暴露好端口了(注意hl開頭的是無頭服務,一般情況下不需要管他默認我們采用k8s自帶的svc轉發。如果有特殊的負載均衡需求時可以使用他們作為dns服務提供真實后端IP來實現定制化的連接)
接着我們啟動PgAdmin連接一下這兩個庫,看看主從庫是否順利工作
可以看到能夠正確連接,接着我們創建一個數據庫,看看從庫是否可以正確異步訂閱並同步過去
可以看到數據庫這部分應該是可以正確同步了,當然為了測試多個從庫,你現在可以通過以下命令來實現只讀副本的擴容,接下來我們開始第二階段。
kubectl scale --replicas=n statefulset/mypg-postgresql-read
二、實現單一上下文的多實例注入
首先我們創建一個常規的webapi項目,並且引入ef和pgqsql相關的nuget。同時由於需要做數據庫自動化遷移我們引入efcore.tool包,並且引入autofac作為默認的DI容器(由於默認的DI不支持在長周期實例(HostedService-singleton)注入短周期實例(DbContext-scoped))
<ItemGroup> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.1" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="6.0.1"> <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference> <PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.3" /> <PackageReference Include="Autofac.Extensions.DependencyInjection" Version="7.2.0" /> </ItemGroup>
接着我們創建efcontext以及一個model
public class EfContext : DbContext { public DbSet<User> User { get; set; } public EfContext(DbContextOptions<EfContext> options) : base(options) { } } public class User { [Key] public int Id { get; set; } public string Name { get; set; } }
然后我們創建對應的讀寫上下文的工廠用於自動化切換,並創建一個擴展函數用於注冊上下文到多個實例,同時要記得創建對應的接口用於DI容器注冊時的key
首先是我們核心的擴展庫,這是實現多個實例注冊的關鍵:
public static class MultipleEfContextExtension { private static AsyncLocal<ReadWriteType> type = new AsyncLocal<ReadWriteType>(); public static IServiceCollection AddReadWriteDbContext<Context>(this IServiceCollection services, Action<DbContextOptionsBuilder> writeBuilder, Action<DbContextOptionsBuilder> readBuilder) where Context : DbContext, IContextWrite, IContextRead { services.AddDbContext<Context>((serviceProvider, builder) => { if (type.Value == ReadWriteType.Read) readBuilder(builder); else writeBuilder(builder); }, contextLifetime: ServiceLifetime.Transient, optionsLifetime: ServiceLifetime.Transient); services.AddScoped<IContextWrite, Context>(services => { type.Value = ReadWriteType.Write; return services.GetService<Context>(); }); services.AddScoped<IContextRead, Context>(services => { type.Value = ReadWriteType.Read; return services.GetService<Context>(); }); return services; } }
接着是我們需要申明的讀寫接口以及注冊上下文工廠:
public interface IContextRead { } public interface IContextWrite { } public class ContextFactory<TContext> where TContext : DbContext { private ReadWriteType asyncReadWriteType = ReadWriteType.Read; private readonly TContext contextWrite; private readonly TContext contextRead; public ContextFactory(IContextWrite contextWrite, IContextRead contextRead) { this.contextWrite = contextWrite as TContext; this.contextRead = contextRead as TContext; } public TContext Current { get { return asyncReadWriteType == ReadWriteType.Read ? contextRead : contextWrite; } } public void SetReadWrite(ReadWriteType readWriteType) { //只有類型為非強制寫時才變化值 if (asyncReadWriteType != ReadWriteType.ForceWrite) { asyncReadWriteType = readWriteType; } } public ReadWriteType GetReadWrite() { return asyncReadWriteType; } }
同時修改一下EF上下文的繼承,讓上下文繼承這兩個接口:
public class EfContext : DbContext, IContextWrite, IContextRead
然后我們需要在program里使用這個擴展並注入主從庫對應的連接配置
builder.Services.AddReadWriteDbContext<EfContext>(optionsBuilderWrite => { optionsBuilderWrite.UseNpgsql("User ID=postgres;Password=Mytestpwd#123;Host=192.168.1.x;Port=32508;Database=UserDb;Pooling=true;"); }, optionsBuilderRead => { optionsBuilderRead.UseNpgsql("User ID=postgres;Password=Mytestpwd#123;Host=192.168.1.x;Port=31877;Database=UserDb;Pooling=true;"); });
同時這里需要注冊一個啟動服務用於數據庫自動化遷移(注意這里需要注入寫庫實例,連接只讀庫實例則無法創建數據庫遷移)
builder.Services.AddHostedService<MyHostedService>();
public class MyHostedService : IHostedService { private readonly EfContext context; public MyHostedService(IContextWrite contextWrite) { this.context = contextWrite as EfContext; } public async Task StartAsync(CancellationToken cancellationToken) { context.Database.EnsureCreated(); await Task.CompletedTask; } public async Task StopAsync(CancellationToken cancellationToken) { await Task.CompletedTask; } }
再然后我們創建一些傳統的工作單元和倉儲用於簡化orm的操作,並且在准備在控制器開始進行演示
首先定義一個簡單的IRepository並實現幾個常規的方法,接着我們在Repository里實現它,這里會有幾個關鍵代碼我已經標紅
public interface IRepository<T> { bool Add(T t); bool Update(T t); bool Remove(T t); T Find(object key); IQueryable<T> GetByCond(Expression<Func<T, bool>> cond); } public class Repository<T> : IRepository<T> where T:class { private readonly ContextFactory<EfContext> contextFactory; private EfContext context { get { return contextFactory.Current; } } public Repository(ContextFactory<EfContext> contextFactory) { this.contextFactory = contextFactory; } public bool Add(T t) { contextFactory.SetReadWrite(ReadWriteType.Write); context.Add(t); return true; } public bool Remove(T t) { contextFactory.SetReadWrite(ReadWriteType.Write); context.Remove(t); return true; } public T Find(object key) { contextFactory.SetReadWrite(ReadWriteType.Read); var entity = context.Find(typeof(T), key); return entity as T; } public IQueryable<T> GetByCond(Expression<Func<T, bool>> cond) { contextFactory.SetReadWrite(ReadWriteType.Read); return context.Set<T>().Where(cond); } public bool Update(T t) { contextFactory.SetReadWrite(ReadWriteType.Write); context.Update(t); return true; } }
可以看到這些方法就是自動化切庫的關鍵所在,接着我們再實現對應的工作單元用於統一提交和事務,並注入到容器中,這里需要注意到工作單元開啟事務后,傳遞的枚舉是強制寫,也就是會忽略倉儲默認的讀寫策略,強制工廠返回寫庫實例,從而實現事務一致。
public interface IUnitofWork { bool Commit(IDbContextTransaction tran = null); Task<bool> CommitAsync(IDbContextTransaction tran = null); IDbContextTransaction BeginTransaction(); Task<IDbContextTransaction> BeginTransactionAsync(); } public class UnitOnWorkImpl<TContext> : IUnitofWork where TContext : DbContext { private TContext context { get { return contextFactory.Current; } } private readonly ContextFactory<TContext> contextFactory; public UnitOnWorkImpl(ContextFactory<TContext> contextFactory) { this.contextFactory = contextFactory; } public bool Commit(IDbContextTransaction tran = null) { var result = context.SaveChanges() > -1; if (result && tran != null) tran.Commit(); return result; } public async Task<bool> CommitAsync(IDbContextTransaction tran = null) { var result = (await context.SaveChangesAsync()) > -1; if (result && tran != null) await tran.CommitAsync(); return result; } public IDbContextTransaction BeginTransaction() { contextFactory.SetReadWrite(ReadWriteType.ForceWrite); return context.Database.BeginTransaction(); } public async Task<IDbContextTransaction> BeginTransactionAsync() { contextFactory.SetReadWrite(ReadWriteType.ForceWrite); return await context.Database.BeginTransactionAsync(); } }
最后我們將工作單元和倉儲注冊到容器里:
serviceCollection.AddScoped<IUnitofWork, UnitOnWorkImpl<Context>>(); serviceCollection.AddScoped<ContextFactory<Context>>(); typeof(Context).GetProperties().Where(x => x.PropertyType.IsGenericType && typeof(DbSet<>).IsAssignableFrom(x.PropertyType.GetGenericTypeDefinition())).Select(x => x.PropertyType.GetGenericArguments()[0]).ToList().ForEach(x => serviceCollection.AddScoped(typeof(IRepository<>).MakeGenericType(x), typeof(Repository<>).MakeGenericType(x)));
這里的關鍵點在於開啟事務后所有的數據庫請求必須強制提交到主庫,而非事務情況下那種根據倉儲操作類型去訪問各自的讀寫庫,所以這里傳遞一個ForceWrite作為區分。基本的工作就差不多做完了,現在我們設計一個控制器來演示,代碼如下:
[Route("{Controller}/{Action}")] public class HomeController : Controller { private readonly IUnitofWork unitofWork; private readonly IRepository<User> repository; public HomeController(IUnitofWork unitofWork, IRepository<User> repository) { this.unitofWork = unitofWork; this.repository = repository; } [HttpGet] [Route("{id}")] public string Get(int id) { return JsonSerializer.Serialize(repository.Find(id), new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) }); } [HttpGet] [Route("{id}/{name}")] public async Task<bool> Get(int id, string name) { using var tran = await unitofWork.BeginTransactionAsync(); var user = repository.Find(id); if (user == null) { user = new User() { Id = id, Name = name }; repository.Add(user); } else { user.Name = name; repository.Update(user); } unitofWork.Commit(tran); return true; } [HttpGet] [Route("/all")] public async Task<string> GetAll() { return JsonSerializer.Serialize(await repository.GetByCond(x => true).ToListAsync(), new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) }); }
控制器就是比較簡單的三個action,根據id和查所有以及開啟一個事務做事務查詢+編輯 or 新增。現在我們啟動項目,來測試一下接口是否正常工作
我們分別訪問/all /home/get/1 和/home/get/1/小王 ,然后再次訪問/all和get/1。可以看到成功的寫入了。
再看看數據庫的情況,可以看到主從庫都已經成功同步了。
現在我們嘗試用事務連接到從庫試試能否寫入,我們修改以下代碼:讓上下文工廠獲取到枚舉值是ForceWrite時返回錯誤的只讀實例試試:
public class ContextFactory<TContext> where TContext : DbContext { ...... public TContext Current { get { return readWriteType.Value == ReadWriteType.ForceWrite ? contextRead : contextWrite; } } ...... }
接着重啟項目,訪問/home/get/1/小王,可以看到連接從庫的情況下無法正常寫入,同時也驗證了確實可以通過這樣的方式讓單個上下文類型按需連接數據庫了。