ShardingCore
ShardingCore
易用、簡單、高性能、普適性,是一款擴展針對efcore生態下的分表分庫的擴展解決方案,支持efcore2+的所有版本,支持efcore2+的所有數據庫、支持自定義路由、動態路由、高性能分頁、讀寫分離的一款組件,如果你喜歡這組件或者這個組件對你有幫助請點擊下發star讓更多的.neter可以看到使用
Gitee Star 助力dotnet 生態 Github Star
經過了3個星期再次發一篇博客來介紹本框架的實現原理通過本篇文章可以有助於您閱讀源碼和提出寶貴意見。之前通過兩篇文章簡單的介紹了sharding-core的核心聚合原理(ShardingCore 如何呈現“完美”分表)和高性能分頁原理實現(ShardingCore是如何針對分表下的分頁進行優化的),這兩篇文章主要是針對分表分庫下數據獲取的一個解決方案的思路並不涉及到太多efcore(.net)的知識。
通過關系圖我們可以看到目前一個shardingdbcontext下主要是以entity作為媒介通過兩個虛擬表和虛擬數據源為橋梁來實現一對多的關系映射
首先先說下經過了3個星期目前本框架已經具有了3個星期前不具備的一些功能,主要是有以下幾個功能上的改進和添加
分庫支持
之前的框架僅支持分表,思路是先將分表做到相對完成度比較高后在實現分庫,畢竟分表對於大部分用戶而言使用場景更高,目前已經實現針對數據對象實現了分庫的實現,當然您還是可以在分庫的基礎上在實現分表,這兩者是不沖突的
services.AddShardingDbContext<DefaultShardingDbContext, DefaultDbContext>(
o =>
o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBxx0;Integrated Security=True;")
).Begin(o =>
{
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
})
.AddShardingQuery((conStr, builder) => builder.UseSqlServer(conStr).UseLoggerFactory(efLogger)
.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking))
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection).UseLoggerFactory(efLogger))
.AddDefaultDataSource("ds0","Data Source=localhost;Initial Catalog=ShardingCoreDBxx0;Integrated Security=True;")
.AddShardingDataSource(sp =>//添加額外兩個數據源一共3個庫
{
return new Dictionary<string, string>()
{
{"ds1", "Data Source=localhost;Initial Catalog=ShardingCoreDBxx1;Integrated Security=True;"},
{"ds2", "Data Source=localhost;Initial Catalog=ShardingCoreDBxx2;Integrated Security=True;"},
};
}).AddShardingDataSourceRoute(o =>
{
o.AddShardingDatabaseRoute<SysUserModVirtualDataSourceRoute>();
}).End();
支持code-first
相信很多使用efcore的用戶其實是更加喜歡脫離數據庫開發,在開發的時候不進行數據庫層面的操作而只專注於代碼的業務編寫來保證高效性,配合efcore的fluent api 可以做到很完美的開發時候不關注數據庫,效率拉滿 Migrations
//創建遷移sqlgenerator
/// <summary>
/// https://github.com/Coldairarrow/EFCore.Sharding/blob/master/src/EFCore.Sharding.SqlServer/ShardingSqlServerMigrationsSqlGenerator.cs
/// </summary>
public class ShardingSqlServerMigrationsSqlGenerator<TShardingDbContext> : SqlServerMigrationsSqlGenerator where TShardingDbContext:DbContext,IShardingDbContext
{
public ShardingSqlServerMigrationsSqlGenerator(MigrationsSqlGeneratorDependencies dependencies, IRelationalAnnotationProvider migrationsAnnotations) : base(dependencies, migrationsAnnotations)
{
}
protected override void Generate(
MigrationOperation operation,
IModel model,
MigrationCommandListBuilder builder)
{
var oldCmds = builder.GetCommandList().ToList();
base.Generate(operation, model, builder);
var newCmds = builder.GetCommandList().ToList();
var addCmds = newCmds.Where(x => !oldCmds.Contains(x)).ToList();
MigrationHelper.Generate<TShardingDbContext>(operation, builder, Dependencies.SqlGenerationHelper, addCmds);
}
}
//添加遷移codefirst的contextfactory基本和starup一樣如果是以非命令執行比如 `_context.Database.Migrate()`那么startup也需要添加` .ReplaceService<IMigrationsSqlGenerator, ShardingSqlServerMigrationsSqlGenerator<DefaultShardingTableDbContext>>()`
public class DefaultDesignTimeDbContextFactory: IDesignTimeDbContextFactory<DefaultShardingTableDbContext>
{
static DefaultDesignTimeDbContextFactory()
{
var services = new ServiceCollection();
services.AddShardingDbContext<DefaultShardingTableDbContext, DefaultTableDbContext>(
o =>
o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBMigration;Integrated Security=True;")
.ReplaceService<IMigrationsSqlGenerator, ShardingSqlServerMigrationsSqlGenerator<DefaultShardingTableDbContext>>()//區別替換掉原先的遷移
).Begin(o =>
{
o.CreateShardingTableOnStart = false;
o.EnsureCreatedWithOutShardingTable = false;
o.AutoTrackEntity = true;
})
.AddShardingQuery((conStr, builder) => builder.UseSqlServer(conStr)
.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking))
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection))
.AddDefaultDataSource("ds0",
"Data Source=localhost;Initial Catalog=ShardingCoreDBMigration;Integrated Security=True;")
.AddShardingTableRoute(o =>
{
o.AddShardingTableRoute<ShardingWithModVirtualTableRoute>();
o.AddShardingTableRoute<ShardingWithDateTimeVirtualTableRoute>();
}).End();
services.AddLogging();
var buildServiceProvider = services.BuildServiceProvider();
ShardingContainer.SetServices(buildServiceProvider);
new ShardingBootstrapper(buildServiceProvider).Start();
}
public DefaultShardingTableDbContext CreateDbContext(string[] args)
{
return ShardingContainer.GetService<DefaultShardingTableDbContext>();
}
}
1.初始化添加遷移(Add-Migration EFCoreSharding -Context DefaultShardingTableDbContext -OutputDir Migrations\ShardingMigrations)
2.更新數據庫(Update-Database -Context DefaultShardingTableDbContext -Verbose)
3.獲取遷移腳本( Script-Migration -Context DefaultShardingTableDbContext)用於生產環境
支持自動追蹤
efcore的好用功能之一(自動追蹤)開啟后可以幫助程序實現更多的功能,雖然之前也是支持的但是就是用體驗而言之前的需要手動attach而目前支持了自動化,當然也不可能和efcore原生的100%完美,當然框架默認不開啟自動追蹤
services.AddShardingDbContext<DefaultShardingTableDbContext, DefaultTableDbContext>(
o =>
o.UseSqlServer("Data Source=localhost;Initial Catalog=ShardingCoreDBMigration;Integrated Security=True;")
.ReplaceService<IMigrationsSqlGenerator,ShardingSqlServerMigrationsSqlGenerator<DefaultShardingTableDbContext>>()
).Begin(o =>
{
o.CreateShardingTableOnStart = false;
o.EnsureCreatedWithOutShardingTable = false;
o.AutoTrackEntity = true;//添加對應代碼可以讓整個框架進行自動追蹤支持
})
.AddShardingQuery((conStr, builder) => builder.UseSqlServer(conStr))
.AddShardingTransaction((connection, builder) =>
builder.UseSqlServer(connection))
.AddDefaultDataSource("ds0",
"Data Source=localhost;Initial Catalog=ShardingCoreDBMigration;Integrated Security=True;")
.AddShardingTableRoute(o =>
{
o.AddShardingTableRoute<ShardingWithModVirtualTableRoute>();
o.AddShardingTableRoute<ShardingWithDateTimeVirtualTableRoute>();
}).End();
單次查詢核心線程數控制
說人話就是本次查詢路由坐落到10張表,之前的做法是開啟10個線程並行查詢10次后獲取到對應的迭代器,目前添加了核心查詢線程數控制,如果您設置了5,本次查詢路由到10張表,會議開始開啟5個線程,后續每完成一個開啟一個新新線程,並且支持超時時間,可以保證在一定時間內執行完成,完不成就超時,防止查詢坐落的表過多而一次性大量開啟線程從而導致程序消耗過多資源
.Begin(o =>
{
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
o.AutoTrackEntity = true;
o.ParallelQueryMaxThreadCount = 10;//並發查詢線程數
o.ParallelQueryTimeOut=TimeSpan.FromSeconds(10);//查詢並發等待超時時間
}
讀寫分離延遲處理
框架目前支持全局定義和局部定義是否啟用讀寫分離,如果您開啟了讀寫分離那么數據庫和數據庫之間的數據同步延遲會是一個很嚴重的問題他會讓你沒辦法很好的查詢到剛修改的數據,而sharding-core為這個場景提供了手動切換是否使用writeonly字符串;用來保證消除讀寫分離時帶來的延遲,而造成數據處理上的異常。而且程序也提供了讀寫分離策略除了隨機和輪詢外額外有一個配置可以配置讀寫分離真正執行是依據dbcontext還是每次都是最新的,每次都是最新的會有一個問題,你明明分頁count出來是10條可能查詢只返回了9條或者其他數據,所以再次基礎上進行了設置是否按dbcontext就是說同一個dbcontext是一樣的鏈接,dbcontext默認是scope就是說一次請求下面是一樣的當然也可以設置成每次都是最新的具體自行考慮根據業務
以上一些功能的添加和優化是之前sharding-core版本所不具備的,其他功能也在不斷的完善中。
接下來我將來講解下sharding-core的實現原理如何讓efcore實現sharding功能,並且完美的無感知使用dbcontext。
ShardingDbContext的擴展
在sharding-core中核心api接口依然是通過dbcontext的繼承來實現的,首先是攔截sql,總的有兩條路可以走1.通過efcore提供的攔截器攔截sql配合antlr4實現對sql語句的分析和從新分裂出對應的語句來進行查詢最后通過多個datareader進行流式聚合。2.通過攔截iqueryable的lambda表達式來分裂成多個ienumerator進行聚合,在這里我選擇了后者因為相比表達式的解析字符串的解析更加吃力而且本人也不是很熟悉antlr4所以選擇了后者。那么如何進行攔截的,這個熟悉linq的同學肯定都知道一個iqueryable都會有一個對應的provider這兩個是一對的,又得益於efcore的開放型設計通過替換兩個核心接口來實現IDbSetSource
和 IQueryCompiler
,下面就簡單說下這兩個接口在efcore中的作用
IDbSetSource
用於針對efcore的dbcontext.set<entity>()
和dbset<entity>()
進行攔截和api重構具體是現代嗎ShardingDbSetSource
IQueryCompiler
efcore核心查詢編譯,用於對表達式進行編譯后緩存起來,所有的查詢都會通過IQueryCompiler
核心接口,那么通過自己實現這兩個接口接管對應的表達式后對表達式進行分析就可以獲取到對應的where子句,在通過將表達式進行路由后並行請求流式聚合返回對應的IEnumerator
或者IAsyncEnumerator
就可以實現無感知使用sharding-core,感覺和使用efcore一毛一樣。具體實現代碼ShardingQueryCompiler
AtcualDbContext擴展
用過efcore的都應該知道目前efcore的機制就是一個對象一張表,在這個機制下面如果你想實現上圖的功能只能創建多個dbcontext然后讓對應的dbcontext的對象映射到對應的表里面而不是固定的Entitiy對應table,那么如何讓對應的對象Entity對應table1和table2和table3呢?
//dbcontext下的這個方法在dbcontext被創建后第一次調用Model屬性會被加載如果緩存已存在那么不會被多次加載
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
}
說人話就是我可以再這邊通過modelBuilder獲取我自己想要的對象但是如果我把Entity映射到了table1那么這個dbcontext就會被緩存起來entity-table1這個關系也會被緩存起來沒辦法改變了,那么是否有辦法可以解決這個機制呢有兩個efcore的接口可以幫助我們實現這個功能,這個在博客園很多大神都已經實現過了具體是 IModelCacheKeyFactory
和 IModelCustomizer
IModelCacheKeyFactory
用於將efcore的模型緩存進行判斷是否和之前的模型緩存一致具體實現ShardingModelCacheKeyFactory
public class ShardingModelCacheKeyFactory : IModelCacheKeyFactory
{
public object Create(DbContext context)
{
if (context is IShardingTableDbContext shardingTableDbContext&&!string.IsNullOrWhiteSpace(shardingTableDbContext.RouteTail.GetRouteTailIdentity()))
{
return $"{context.GetType()}_{shardingTableDbContext.RouteTail.GetRouteTailIdentity()}";
}
else
{
return context.GetType();
}
}
}
IModelCustomizer
這個接口是efcore開放出來在模型緩存結構定義完成后初始化緩存前可以使用的接口,就是說我們並不需要在OnModelCreating方法中使用或者說不需要再次地方進行修改可以在IModelCustomizer
接口內部實現,具體代碼ShardingModelCustomizer
public class ShardingModelCustomizer<TShardingDbContext> : ModelCustomizer where TShardingDbContext : DbContext, IShardingDbContext
{
private Type _shardingDbContextType => typeof(TShardingDbContext);
public ShardingModelCustomizer(ModelCustomizerDependencies dependencies) : base(dependencies)
{
}
public override void Customize(ModelBuilder modelBuilder, DbContext context)
{
base.Customize(modelBuilder, context);
if (context is IShardingTableDbContext shardingTableDbContext&& shardingTableDbContext.RouteTail.IsShardingTableQuery())
{
var isMultiEntityQuery = shardingTableDbContext.RouteTail.IsMultiEntityQuery();
if (!isMultiEntityQuery)
{
var singleQueryRouteTail = (ISingleQueryRouteTail) shardingTableDbContext.RouteTail;
var tail = singleQueryRouteTail.GetTail();
var virtualTableManager = ShardingContainer.GetService<IVirtualTableManager<TShardingDbContext>>();
var typeMap = virtualTableManager.GetAllVirtualTables().Where(o => o.GetTableAllTails().Contains(tail)).Select(o => o.EntityType).ToHashSet();
//設置分表
var mutableEntityTypes = modelBuilder.Model.GetEntityTypes().Where(o => o.ClrType.IsShardingTable() && typeMap.Contains(o.ClrType));
foreach (var entityType in mutableEntityTypes)
{
MappingToTable(entityType.ClrType, modelBuilder, tail);
}
}
else
{
var multiQueryRouteTail = (IMultiQueryRouteTail) shardingTableDbContext.RouteTail;
var entityTypes = multiQueryRouteTail.GetEntityTypes();
var mutableEntityTypes = modelBuilder.Model.GetEntityTypes().Where(o => o.ClrType.IsShardingTable() && entityTypes.Contains(o.ClrType)).ToArray();
foreach (var entityType in mutableEntityTypes)
{
var queryTail = multiQueryRouteTail.GetEntityTail(entityType.ClrType);
if (queryTail != null)
{
MappingToTable(entityType.ClrType, modelBuilder, queryTail);
}
}
}
}
}
private void MappingToTable(Type clrType, ModelBuilder modelBuilder, string tail)
{
var shardingEntityConfig = ShardingUtil.Parse(clrType);
var shardingEntity = shardingEntityConfig.EntityType;
var tailPrefix = shardingEntityConfig.TailPrefix;
var entity = modelBuilder.Entity(shardingEntity);
var tableName = shardingEntityConfig.VirtualTableName;
if (string.IsNullOrWhiteSpace(tableName))
throw new ArgumentNullException($"{shardingEntity}: not found original table name。");
#if DEBUG
Console.WriteLine($"mapping table :[tableName]-->[{tableName}{tailPrefix}{tail}]");
#endif
entity.ToTable($"{tableName}{tailPrefix}{tail}");
}
}
稍作解析進入后會先判斷dbcontext真正執行的那個是否是需要分表的並且判斷本次查詢涉及到的表示一張還是多張,對此對象在數據庫里的映射關系改成分表
到此為止efcore的查詢架構已經算是非常清晰了
- 通過替換模型緩存接口和查詢編譯接口來實現查詢編譯時攔截sql和模型重建
- 通過類似適配器模式來實現對外dbcontext其實內部有多個dbcontext在進行真正的工作
上述幾步讓sharding-core在使用上和efcore一樣除了配置方面,后續將會出更多的efcore的分表分庫實踐文章和繼續開發完成其他orm的支持,當然這個改動將會非常大也希望各位.neter有喜歡的或者希望了解源碼的或者想參與完善的多多支持
下一篇實現如何自定義路由,自定義路由的原理 where left
最后
本人會一致維護該框架,希望為.net生態做一份共享
Gitee Star 助力dotnet 生態 Github Star
QQ群:771630778
個人QQ:326308290(歡迎技術支持提供您寶貴的意見)
個人郵箱:326308290@qq.com