前言
之前有關EF並發探討過幾次,但是呢,博主感覺還是有問題,為什么會覺得有問題,其實就是理解不夠透徹罷了,於是在項目中都是用的存儲過程或者SQL語句來實現,利用放假時間好好補補EF Core並發的問題,本文比較長,請耐心點看。
EntityFramework Core並發初級版初探
關於並發無非就兩種:樂觀並發和悲觀並發,悲觀並發簡言之則是當客戶端對數據庫中同一值進行修改時會造成阻塞,而樂觀並發則任何客戶端都可以對可以對數據進行查詢或者讀取,在EF Core中不支持悲觀並發,結果則產生並發沖突,所以產生的沖突則需要我們去解決。
為了便於理解我們從基礎內容開始講起,稍安勿躁,我們循序漸進稍后會講到並發沖突、並發解決、並發高級三個方面的內容。我們建立實體類如下:
public class Blog { public int Id { get; set; } public string Name { get; set; } public string Url { get; set; } public int Count { get; set; } }
接下來簡單配置下映射:
public class EFCoreContext : DbContext { public DbSet<Blog> Blogs { get; set; } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) => optionsBuilder.UseSqlServer(@"Server=.;Database=EFCoreDb;Trusted_Connection=True;"); protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity<Blog>(pc => { pc.ToTable("Blog").HasKey(k => k.Id); pc.Property(p => p.Name).IsRequired(); pc.Property(p => p.Url).IsRequired(); pc.Property(p => p.Count).IsRequired(); }); } }
接下來我們簡單封裝下進行查詢和更新數據的類 DbQueryCommit
public class DbQueryCommit : IDisposable { private readonly EFCoreContext context; public DbQueryCommit(EFCoreContext context) => this.context = context; public TEntity Query<TEntity>(params object[] keys) where TEntity : class => this.context.Set<TEntity>().Find(keys); public int Commit(Action change) { change(); return context.SaveChanges(); } public DbSet<TEntity> Set<TEntity>() where TEntity : class => context.Set<TEntity>(); public void Dispose() => context.Dispose(); }
接下來我們來看看非並發的情況,進行如下查詢和修改:
public static void NoCheck( DbQueryCommit readerWriter1, DbQueryCommit readerWriter2, DbQueryCommit readerWriter3) { int id = 1; Blog blog1 = readerWriter1.Query<Blog>(id); Blog blog2 = readerWriter2.Query<Blog>(id); readerWriter1.Commit(() => blog1.Name = nameof(readerWriter1)); readerWriter2.Commit(() => blog2.Name = nameof(readerWriter2)); Blog category3 = readerWriter3.Query<Blog>(id); Console.WriteLine(category3.Name); }
當前博主VS版本為2017,演示該程序在控制台,之前我們有講過若要進行遷移需要安裝 Microsoft.EntityFrameworkCore.Tools.DotNet 程序包,此時我們會發現根本都安裝不上,如下:
不知為何錯誤,此時我們需要在項目文件中手動添加如上程序包,(解決方案來源於:https://docs.microsoft.com/en-us/ef/core/miscellaneous/cli/dotnet)如下:
<ItemGroup> <DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" /> </ItemGroup>
然后添加程序包 Microsoft.EntityFrameworkCore.Design ,此時我們再來 dotnet restore 則會看到如下執行EF的命令:
接下來我們實例化上下文進行修改數據。
var efContext1 = new EFCoreContext(); var d1 = new DbQueryCommit(efContext1); var efContext2 = new EFCoreContext(); var d2 = new DbQueryCommit(efContext2); var efContext3 = new EFCoreContext(); var d3 = new DbQueryCommit(efContext3); Concurrency.NoCheck(d1, d2, d3);
此時我們在數據庫中默認插入一條數據:
此時界面打印最后讀取到的Name值如下:
數據庫也對應進行了更新,這也充分說明EF Core對於並發為樂觀並發:
接下來我們對Name屬性定義為並發Token。
pc.Property(p => p.Name).IsRequired().IsConcurrencyToken();
此時為了很好演示各個方法,我們同樣再來定義並發方法,如下:
public static void ConcurrencyCheck(DbQueryCommit readerWriter1, DbQueryCommit readerWriter2) { int id = 1; Blog blog1 = readerWriter1.Query<Blog>(id); Blog blog2 = readerWriter2.Query<Blog>(id); readerWriter1.Commit(() => { blog1.Name = nameof(readerWriter1); blog1.Count = 2; }); readerWriter2.Commit(() => { blog2.Name = nameof(readerWriter2); blog2.Count = 2; }); }
此時再來調用該方法:
var efContext1 = new EFCoreContext(); var d1 = new DbQueryCommit(efContext1); var efContext2 = new EFCoreContext(); var d2 = new DbQueryCommit(efContext2); //var efContext3 = new EFCoreContext(); //var d3 = new DbQueryCommit(efContext3); Concurrency.ConcurrencyCheck(d1, d2);
當我們利用兩個上下文1和2去讀取數據時此時Name = 'Jeffcky',當上下文1更新時在快照中根據主鍵和Name去查找數據庫,查找到Name后並令Name = 'readerWriter1'成功更新,但是上下2去更新Name = 'readerWriter2'時,此時在快照中根據主鍵和Name去查找數據庫,發現不存在該條數據,同時我們設置了並發Token,最終導致出現 DbUpdateConcurrencyException 並發更新異常。解決並發個兩點一個是上述設置並發Token,另外一個則是設置行版本,下面我們也來看下,首先我們在類中增加一個行版本的字節屬性。
public byte[] RowVersion { get; set; }
同時對該行版本進行映射標識。
pc.Property(p => p.RowVersion).IsRequired().IsRowVersion().ValueGeneratedOnAddOrUpdate();
為了很好演示行版本並發,我們增加一個屬性來打印行版本字符串。
public string RowVersionString => $"0x{BitConverter.ToUInt64(RowVersion.Reverse().ToArray(), 0).ToString("X16")}";
同樣我們定義一個調用行版本的方法:
public static void RowVersion(DbQueryCommit readerWriter1, DbQueryCommit readerWriter2) { int id = 1; Blog blog1 = readerWriter1.Query<Blog>(id); Console.WriteLine(blog1.RowVersionString); Blog blog2 = readerWriter2.Query<Blog>(id); Console.WriteLine(blog2.RowVersionString); readerWriter1.Commit(() => blog1.Name = nameof(readerWriter1)); Console.WriteLine(blog1.RowVersionString); readerWriter2.Commit(() => readerWriter2.Set<Blog>().Remove(blog2)); }
接下來我們調用演示看看。
var efContext1 = new EFCoreContext(); var d1 = new DbQueryCommit(efContext1); var efContext2 = new EFCoreContext(); var d2 = new DbQueryCommit(efContext2); //var efContext3 = new EFCoreContext(); //var d3 = new DbQueryCommit(efContext3); Concurrency.RowVersion(d1, d2);
我們從上可以明顯看出當查出數據庫中的行版本值為 0x000000000073 ,接着readerWriter1更新后其行版本增加為 0x000000000074 ,當我們利用readerWriter2去刪除查詢出id = 1的數據時,此時會根據當前主鍵和行版本為 0x000000000073 去查找數據庫,但是此時沒有找到數據,導致同樣如上述並發Token一樣出現並發異常。
EntityFramework Core並發中級版解析
並發異常我們可以通過 DbUpdateConcurrencyException 來獲取,該類繼承自 DbUpdateException ,該類中的參數 EntityEntry 為一個集合,利用它則可以獲取到對應的數據庫中的值以及當前更新值等,所以我們可以自定義並發異常解析,如下:
public class DbUpdateException : Exception { public virtual IReadOnlyList<EntityEntry> Entries { get; } } public class DbUpdateConcurrencyException : DbUpdateException { //TODO }
這里我們需要弄明白存在EntityEntry中的值類型,比如DbUpdateConcurrencyException的參數為exception。我們通過如下則可以獲取到被跟蹤的實體狀態。
var tracking = exception.Entries.Single();
此時存在數據庫中的原始值則為如下:
var original = tracking.OriginalValues.ToObject();
而當前需要更新的值則為如下:
var current = tracking.CurrentValues.ToObject();
而數據庫中的值則為已經提交更新的值:
var database = '第一次已經更新的對象';
上述既然出現並發異常,接下來我們則需要解析並發異常並解決異常,大部分情況下無論是提交事務失敗也好還是對數據進行操作也好都會進行重試機制,所以這里我們解析到並發異常並采取重試機制。之前我們進行提交時定義如下:
public int Commit(Action change) { change(); return context.SaveChanges(); }
此時我們對該方法進行重載,遇到並發異常后並采取重試機制重試三次,如下:
public int Commit(Action change, Action<DbUpdateConcurrencyException> handleException, int retryCount = 3) { change(); for (int retry = 0; retry < retryCount; retry++) { try { return context.SaveChanges(); } catch (DbUpdateConcurrencyException exception) { handleException(exception); } } return context.SaveChanges(); }
然后我們定義一個需要出現並發並調用上述重試機制的更新方法,如下:
public static void UpdateBlog( DbQueryCommit readerWriter1, DbQueryCommit readerWriter2, DbQueryCommit readerWriter3, Action<EntityEntry> resolveConflict) { int id = 1; Blog blog1 = readerWriter1.Query<Blog>(id); Blog blog2 = readerWriter2.Query<Blog>(id); Console.WriteLine($"查詢行版本:{blog1.RowVersionString}"); Console.WriteLine("----------------------------------------------------------"); Console.WriteLine($"查詢行版本:{blog2.RowVersionString}"); Console.WriteLine("----------------------------------------------------------"); readerWriter1.Commit(() => { blog1.Name = nameof(readerWriter1); blog1.Count = 2; }); Console.WriteLine($"更新blog1后行版本:{blog1.RowVersionString}"); Console.WriteLine("----------------------------------------------------------"); readerWriter2.Commit( change: () => { blog2.Name = nameof(readerWriter2); blog2.Count = 1; }, handleException: exception => { EntityEntry tracking = exception.Entries.Single(); Blog original = (Blog)tracking.OriginalValues.ToObject(); Blog current = (Blog)tracking.CurrentValues.ToObject(); Blog database = blog1; var origin = $"原始值:({original.Name},{original.Count},{original.Id},{original.RowVersionString})"; Console.WriteLine(original); Console.WriteLine("----------------------------------------------------------"); var databaseValue = $"數據庫中值:({database.Name},{database.Count},{database.Id},{database.RowVersionString})"; Console.WriteLine(databaseValue); Console.WriteLine("----------------------------------------------------------"); var update = $"更新的值:({current.Name},{current.Count},{current.Id},{current.RowVersionString})"; Console.WriteLine(update); Console.WriteLine("----------------------------------------------------------"); resolveConflict(tracking); }); Blog resolved = readerWriter3.Query<Blog>(id); var resolvedValue = $"查詢並發解析后中的值: ({resolved.Name}, {resolved.Count}, {resolved.Id},{resolved.RowVersionString})"; Console.WriteLine(resolvedValue); }
接下來我們實例化三個上下文,稍后我會一一進行解釋,避免看文章的童鞋看暈了幾個值。
var efContext1 = new EFCoreContext(); var d1 = new DbQueryCommit(efContext1); var efContext2 = new EFCoreContext(); var d2 = new DbQueryCommit(efContext2); var efContext3 = new EFCoreContext(); var d3 = new DbQueryCommit(efContext3);
【溫馨提示】:有很多童鞋在用.net core時控制台時會遇見中文亂碼的問題,主要是.net core都需要安裝包來進行,以此來說明不再依賴本地程序集達到更好的跨平台,真正實現模塊化,所以需要在控制台注冊中文編碼包,如下:
Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
同時需要安裝如下包:
System.Text.Encoding.CodePages
在注冊后還需要設置控制台輸出行編碼為GB2312
Console.OutputEncoding = Encoding.GetEncoding("GB2312");
接下來進行方法調用:
Concurrency.UpdateBlog(d1, d2, d3, (d) => { PropertyValues databaseValues = d.GetDatabaseValues(); if (databaseValues == null) { d.State = EntityState.Detached; } else { d.OriginalValues.SetValues(databaseValues); } });
此時控制台打印和數據庫更新值如下:
我們從調用方法開始解釋:
當執行到更新上下文一中的Name值時為readerWriter1都沒有問題,此時行版本將變為 0x0000000000833 ,當執行到上下文2中時此時去更新Name值時,此時會根據主鍵和行版本 0x0000000000832 去數據庫中查找值,此時卻沒找到,然后將執行並發異常,最終開始執行 resolveConflict(tracking); 來解析沖突,然后就來到了上述圖片所示,此時databaseValues中所存的值就是readrWriter1,也就是說此時數據庫中的原始值變為了Name = 'readerWriter1',我們需要做的是將數據庫中的Name = 'readerWriter1'設置為原始值,這樣下次再去解析沖突會根據主鍵id = 1和行版本0x00000000833去查找,此時找到該行數據最終進行更新到數據庫,所以結果如上圖所給。到這里你是不是就覺得難道就這么結束了嗎?NO,這個是最簡單的一個場景,上述還只是兩個並發客戶端,如果是多個根本無法保證能夠完全解析並發,同時中間還存在一個問題,我們到底是讓客戶端更新值獲勝還是讓數據庫中原始值獲勝呢,這又是一個問題,如果我們完全不借助SQL語句或者存儲過程來執行事務的話,這個將是個很嚴重的問題,比如在秒殺場景中,產品只有1000個,那么每次都讓客戶端獲勝,好吧,那就導致庫存溢出的問題,那就呵呵了,還有一個很大的問題則是合並,如果有多個並發請求過來可能我們只需要對於產品中的數量進行並發控制,其他的數據更新完全可以進行合並,這又是一個問題,那么到底該如何解決呢,請繼續往下看終極解決方案。
EntityFramweork Core高並發獲勝者初級版解析
EntityFramework Core並發數據庫獲勝
既然是數據庫中獲勝那么對於客戶端出現的並發異常我們就不需要進行解析,此時我們只需要終止異常直接返回值即可,如下定義方法:
public int DatabaseWin(Action change, Action<DbUpdateConcurrencyException> handleException) { change(); try { return context.SaveChanges(); } catch (DbUpdateConcurrencyException exception) { return 0; } }
接着我們在UpdateBlog方法中在上下文2中提交數據時調用上述方法並無需再進行並發解析,如下:
readerWriter2.DatabaseWin( change: () => { blog2.Name = nameof(readerWriter2); blog2.Count = 1; }, handleException: exception => { EntityEntry tracking = exception.Entries.Single(); Blog original = (Blog)tracking.OriginalValues.ToObject(); Blog current = (Blog)tracking.CurrentValues.ToObject(); Blog database = blog1; var origin = $"原始值:({original.Name},{original.Count},{original.Id},{original.RowVersionString})"; Console.WriteLine(original); Console.WriteLine("----------------------------------------------------------"); var databaseValue = $"數據庫中值:({database.Name},{database.Count},{database.Id},{database.RowVersionString})"; Console.WriteLine(databaseValue); Console.WriteLine("----------------------------------------------------------"); var update = $"更新的值:({current.Name},{current.Count},{current.Id},{current.RowVersionString})"; Console.WriteLine(update); Console.WriteLine("----------------------------------------------------------"); //resolveConflict(tracking); });
此時打印和數據庫中值如下:
上述就無需再多講了,根本沒有去解析異常。
EntityFramework Core並發客戶端獲勝
上一大節我們演示的則是客戶端獲勝,這里我們只需要設置異常解析的值即可解決問題,封裝一個方法,如下:
public static void ClientWins( DbQueryCommit readerWriter1, DbQueryCommit readerWriter2, DbQueryCommit readerWriter3) => UpdateBlog(readerWriter1, readerWriter2, readerWriter3, resolveConflict: tracking => { PropertyValues databaseValues = tracking.GetDatabaseValues(); tracking.OriginalValues.SetValues(databaseValues); Console.WriteLine(tracking.State); Console.WriteLine(tracking.Property(nameof(Blog.Count)).IsModified); Console.WriteLine(tracking.Property(nameof(Blog.Name)).IsModified); Console.WriteLine(tracking.Property(nameof(Blog.Id)).IsModified); });
結果就不再演示和之前演示結果等同。我們將重點放在客戶端和數據庫值合並的問題,請繼續往下看。
EntityFramework Core並發數據庫和客戶端合並
當出現並發時我們對前者使其客戶端獲勝而后者對於前者未有的屬性則進行更新,所以我們需要首先對數據庫原始值克隆一份,然后將其客戶端獲勝,然后將原始值和客戶端屬性進行比較,若數據庫中的屬性在原始值中的屬性中沒有,我們則將數據庫中的值不進行更新,此時將導致當前並發中的值進行更新則呈現出我們所說客戶端和數據庫值進行合並更新,如下首先克隆:
PropertyValues originalValues = tracking.OriginalValues.Clone(); PropertyValues databaseValues = tracking.GetDatabaseValues(); tracking.OriginalValues.SetValues(databaseValues);
比較原始值和數據庫中的屬性進行比較判斷,不存在則不更新。
databaseValues.Properties .Where(property => !object.Equals(originalValues[property.Name], databaseValues[property.Name])) .ToList() .ForEach(property => tracking.Property(property.Name).IsModified = false);
最終我們定義如下合並方法:
public static void MergeClientAndDatabase( DbQueryCommit readerWriter1, DbQueryCommit readerWriter2, DbQueryCommit readerWriter3) => UpdateBlog(readerWriter1, readerWriter2, readerWriter3, resolveConflict: tracking => { PropertyValues originalValues = tracking.OriginalValues.Clone(); PropertyValues databaseValues = tracking.GetDatabaseValues(); tracking.OriginalValues.SetValues(databaseValues); #if selfDefine databaseValues.PropertyNames .Where(property => !object.Equals(originalValues[property], databaseValues[property])) .ForEach(property => tracking.Property(property).IsModified = false); #else databaseValues.Properties .Where(property => !object.Equals(originalValues[property.Name], databaseValues[property.Name])) .ToList() .ForEach(property => tracking.Property(property.Name).IsModified = false); #endif Console.WriteLine(tracking.State); Console.WriteLine(tracking.Property(nameof(Blog.Count)).IsModified); Console.WriteLine(tracking.Property(nameof(Blog.Name)).IsModified); Console.WriteLine(tracking.Property(nameof(Blog.Id)).IsModified); });
此時我們再在UpdateBlog方法添加二者不同的屬性,如下:
readerWriter1.Commit(() => { blog1.Name = nameof(readerWriter1); blog1.Count = 3; }); Console.WriteLine($"更新blog1后行版本:{blog1.RowVersionString}"); Console.WriteLine("----------------------------------------------------------"); readerWriter2.Commit( change: () => { blog2.Name = nameof(readerWriter2); blog2.Count = 4; blog2.Url = "http://www.cnblogs.com/CreateMyself"; }.......
為了便於閱讀者觀察和對比,我們給出數據庫中默認的初始值,如下:
好了到了這里關於EF Core中並發內容算是全部結束,別着急,還剩下最后一點內容,那就是終極解決並發方案,請繼續往下看。
EntityFramework Core並發高級終極版解決方案
我們定義一個名為 RefreshConflict 枚舉,當提交時定義是否為數據庫或者客戶端或者數據庫和客戶端數據合並:
public enum RefreshConflict { StoreWins, ClientWins, MergeClientAndStore }
根據上述不同的獲勝模式來刷新數據庫中的值,我們定義如下刷新狀態擴展方法:
public static class RefreshEFStateExtensions { public static EntityEntry Refresh(this EntityEntry tracking, RefreshConflict refreshMode) { switch (refreshMode) { case RefreshConflict.StoreWins: { //當實體被刪除時,重新加載設置追蹤狀態為Detached //當實體被更新時,重新加載設置追蹤狀態為Unchanged tracking.Reload(); break; } case RefreshConflict.ClientWins: { PropertyValues databaseValues = tracking.GetDatabaseValues(); if (databaseValues == null) { //當實體被刪除時,設置追蹤狀態為Detached,當然此時客戶端無所謂獲勝 tracking.State = EntityState.Detached; } else { //當實體被更新時,刷新數據庫原始值 tracking.OriginalValues.SetValues(databaseValues); } break; } case RefreshConflict.MergeClientAndStore: { PropertyValues databaseValues = tracking.GetDatabaseValues(); if (databaseValues == null) { /*當實體被刪除時,設置追蹤狀態為Detached,當然此時客戶端沒有合並的數據 並設置追蹤狀態為Detached */ tracking.State = EntityState.Detached; } else { //當實體被更新時,刷新數據庫原始值 PropertyValues originalValues = tracking.OriginalValues.Clone(); tracking.OriginalValues.SetValues(databaseValues); //如果數據庫中對於屬性有不同的值保留數據庫中的值 #if SelfDefine databaseValues.PropertyNames // Navigation properties are not included. .Where(property => !object.Equals(originalValues[property], databaseValues[property])) .ForEach(property => tracking.Property(property).IsModified = false); #else databaseValues.Properties .Where(property => !object.Equals(originalValues[property.Name], databaseValues[property.Name])) .ToList() .ForEach(property => tracking.Property(property.Name).IsModified = false); #endif } break; } } return tracking; } }
默認重試機制采取自定義重試三次:
public static int SaveChanges( this DbContext context, Action<IEnumerable<EntityEntry>> resolveConflicts, int retryCount = 3) { if (retryCount <= 0) { throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必須大於0."); } for (int retry = 1; retry < retryCount; retry++) { try { return context.SaveChanges(); } catch (DbUpdateConcurrencyException exception) when (retry < retryCount) { resolveConflicts(exception.Entries); } } return context.SaveChanges(); }
另外找到一種重試機制包,安裝如下程序包。
EnterpriseLibrary.TransientFaultHandling.Core
我們來簡單看一個例子。我們自定義實現需要繼承自該程序包中重試策略類 RetryStrategy ,此時需要實現內置如下抽象方法:
public abstract ShouldRetry GetShouldRetry();
最終我們自定義如下實現方法:
public class ConcurrentcyStrategy : RetryStrategy { public ConcurrentcyStrategy(string name, bool firstFastRetry) : base(name, firstFastRetry) { } private bool ConcurrentcyShouldRetry(int retryCount, Exception lastException, out TimeSpan delay) { if (retryCount <= 0) { throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必須大於0"); } if (lastException is ArgumentNullException) { return true; } return true; } public override ShouldRetry GetShouldRetry() { var shouldRetry = new ShouldRetry(ConcurrentcyShouldRetry); return shouldRetry; } }
上述是定義策略類,接下來我們需要實現 ITransientErrorDetectionStrategy 接口來實現需要獲取到的異常類,定義如下:
public class TransientErrorDetection<TException> : ITransientErrorDetectionStrategy where TException : Exception { public bool IsTransient(Exception ex) => ex is TException; }
最后則是檢測到我們所定義的異常並解析重試解析異常,如下:
public class TransientDetectionExample { public int TransientDetectionTest(Func<string, bool> str, RetryStrategy retryStrategy) { RetryPolicy retryPolicy = new RetryPolicy( errorDetectionStrategy: new TransientDetection<ArgumentException>(), retryStrategy: retryStrategy); retryPolicy.Retrying += (sender, e) => str(((ArgumentNullException)e.LastException).StackTrace); return retryPolicy.ExecuteAction(RetryCalcu); } public int RetryCalcu() { return -1; } }
我們給出如下測試數據,並給出參數為空,觀察是否結果會執行RetryCalcu並返回-1:
var stratrgy = new ConcurrentcyStrategy("test", true); var isNull = string.Empty; var example = new TransientDetectionExample(); var result = example.TransientDetectionTest(d => isNull.Contains("11"), stratrgy);
所以此時基於上述情況我們可以利用現有的輪子來實現重試機制重載一個SaveChanges方法,最終重試機制我們可以定義如下另一個重載方法:
public class TransientDetection<TException> : ITransientErrorDetectionStrategy where TException : Exception { public bool IsTransient(Exception ex) => ex is TException; } public static partial class DbContextExtensions { public static int SaveChanges( this DbContext context, Action<IEnumerable<EntityEntry>> resolveConflicts, int retryCount = 3) { if (retryCount <= 0) { throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必須大於0"); } for (int retry = 1; retry < retryCount; retry++) { try { return context.SaveChanges(); } catch (DbUpdateConcurrencyException exception) when (retry < retryCount) { resolveConflicts(exception.Entries); } } return context.SaveChanges(); } public static int SaveChanges( this DbContext context, Action<IEnumerable<EntityEntry>> resolveConflicts, RetryStrategy retryStrategy) { RetryPolicy retryPolicy = new RetryPolicy( errorDetectionStrategy: new TransientDetection<DbUpdateConcurrencyException>(), retryStrategy: retryStrategy); retryPolicy.Retrying += (sender, e) => resolveConflicts(((DbUpdateConcurrencyException)e.LastException).Entries); return retryPolicy.ExecuteAction(context.SaveChanges); } }
同上我們最終提交數據也分別對應兩個方法,一個是自定義重試三次,一個利用輪子重試機制,如下:
public static partial class DbContextExtensions { public static int SaveChanges(this DbContext context, RefreshConflict refreshMode, int retryCount = 3) { if (retryCount <= 0) { throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必須大於0"); } return context.SaveChanges( conflicts => conflicts.ToList().ForEach(tracking => tracking.Refresh(refreshMode)), retryCount); } public static int SaveChanges( this DbContext context, RefreshConflict refreshMode, RetryStrategy retryStrategy) => context.SaveChanges( conflicts => conflicts.ToList().ForEach(tracking => tracking.Refresh(refreshMode)), retryStrategy); }
接下來我們來分別演示客戶端獲勝、數據庫獲勝以及客戶端和數據庫合並情況。首先我們放一張數據庫默認數據以便對比:
EntityFramework Core並發客戶端獲勝
var efContext1 = new EFCoreContext(); var efContext2 = new EFCoreContext(); var b1 = efContext1.Blogs.Find(1); var b2 = efContext2.Blogs.Find(1); b1.Name = nameof(efContext1); efContext1.SaveChanges(); b2.Name = nameof(efContext2); b2.Url = "http://www.cnblogs.com/CreateSelf"; efContext2.SaveChanges(RefreshConflict.ClientWins);
上述我們看到數據庫中的值完全更新為在上下文2中的數據。
EntityFramework Core並發數據庫獲勝
var efContext1 = new EFCoreContext(); var efContext2 = new EFCoreContext(); var b1 = efContext1.Blogs.Find(1); var b2 = efContext2.Blogs.Find(1); b1.Name = nameof(efContext1); efContext1.SaveChanges(); b2.Name = nameof(efContext2); b2.Url = "http://www.cnblogs.com/CreateSelf"; efContext2.SaveChanges(RefreshConflict.StoreWins);
此時我們看到數據庫中的值為上下文1中的數據。
EntityFramework Core並發客戶端和數據庫合並
var efContext1 = new EFCoreContext(); var efContext2 = new EFCoreContext(); var b1 = efContext1.Blogs.Find(1); var b2 = efContext2.Blogs.Find(1); b1.Name = nameof(efContext1); b1.Count = 10; efContext1.SaveChanges(); b2.Name = nameof(efContext2); b1.Count = 11; b2.Url = "http://www.cnblogs.com/CreateSelf"; efContext2.SaveChanges(RefreshConflict. MergeClientAndStore);
上述我們看到數據庫中的Name和Count是上下文1中的值,而Url則為上下文2中的值。
關於重試機制找到一個比較強大的輪子:https://github.com/App-vNext/Polly 看到一直在更新目前已經支持.net core。看如下加星應該是不錯。
沒有深入研究該重試機制,就稍微了解了下進行如下重試操作:
public int Commit(Action change, Action<DbUpdateConcurrencyException> handleException, int retryCount = 3) { change(); Policy .Handle<DbUpdateConcurrencyException>(ex => ex.Entries.Count > 0) .Or<ArgumentException>(ex => ex.ParamName == "exception") .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(10)) .Execute(() => context.SaveChanges()); return context.SaveChanges(); }
同樣調用上述UpdateBlog方法,上下文2中數據如下:
readerWriter2.Commit( change: () => { blog2.Name = nameof(readerWriter2); blog2.Count = 4; blog2.Url = "http://www.cnblogs.com/CreateMyself"; }, handleException: exception => ............
結果成功更新,利用這個比之前演示的那個更佳,但是發現當執行到這個方法單步執行時會出現如下錯誤,不知為何:
總結
貌似這篇是有史以來寫的最長的一篇博客了,上述關於EF中的並發演示利用行版本的形式,利用並發Token也一致,不過是配置不同罷了,關於EntityFramework Core並發到這里是完全結束,花了許多時間去研究和查資料,受益匪淺,不同的並發策略都已給出,就看具體應用場景了,希望對閱讀本文的你有所幫助。see u,我們下節繼續開始進入SQL Server性能優化系列,敬請期待。