Entity Framework code-fist 的SaveChanges()方法一次性提交對實體的更改到數據庫。類似於ADO.net 的
DataAdapter 對象提交 DataSet 的更新。 只不過,EF作為ORM ,實體是強類型的,這樣linq也能做很好的支持。
最近,在做工作流的項目中,使用EF作為數據庫訪問層,需要在 EF提交實體的更新的時候,同時執行一段sql語句或者存儲過程,而且要保證在同一個事務當中。
關於EF事務提交的問題,網上已經有方案,主要是通過 TransactionScope:
using (TransactionScope scope = new TransactionScope())
{
dbContext.Database.ExecuteSqlCommand(sql, param);
dbContext.SaveChanges();
scope.Complete();
}
甚至在TransactionScope包含的代碼中可以 提交多個 DBContext,當然如果多個DBContext對於的是不同的數據庫,這個事務肯定要上升到分布式事務 msdtc.
而且我們同時執行的sql語句,只能通過ExecuteSqlCommand 這條通道,如果不是的話,還是會上升到msdtc,可以參考 http://www.cnblogs.com/lovvver/archive/2012/06/10/2543762.html
由於,EF每次查詢或者更新操作, 無論是ExecuteSqlCommand 還是SaveChanges,都是打開Connection,然后及時就關閉。所以,上面的事務是在多個連接 的事務。要命的是,在不支持多連接事務的數據庫中,比如sql server 2005, 還是會上升為 msdtc。關於這個問題,請參考 http://www.digitallycreated.net/Blog/48/entity-framework-transactionscope-and-msdtc。
因為,在實際並不是分布式的場景中,要用到以上的方式提交事務,就不得不配置msdtc,msdtc服務浪費系統資源 和 不易配置 不講,我們並沒有實實在在做分布式操作,而且
可以的話,我們提交的各個操作都可以在一個連接內完成的。EF 沒有提供方法讓我們實現我們的功能,因此開始懷念ADO.net。
但是又不舍得,僅僅是因為操作提交的通道問題 ,就要舍棄 EF提供的強大的映射和查詢功能嗎 ? 要知道這方面的確比 SQLinq , L2S , 強大很多。
如果我們能獲取SaveChanges要提交的sql語句,然后我們就能為所欲為。查詢的時候走的是傳統的ef通道,提交的時候走的是ado.net 愛怎么樣就怎么樣,而EF參與提交的方式,只是提供sql語句. 查詢和操作分離這種架構也符合CQRS。
前段時間,EF不是開源了嗎http://www.cnblogs.com/shanyou/archive/2012/07/20/2600465.html ,嘿嘿 ,還有什么是我們做不到的呢?
首先從http://entityframework.codeplex.com/下載源代碼,下載的版本需要用vs2012 打開,而且framework版本是4.5的。
不過慶幸的是,4.0的程序可以調用4.5編譯的dll。
下載vs2012 rc ,打開解決方案。實際只需要 EntityFramework 和 EntityFramework.SqlServer 兩個項目,其他的強制簽名和nuget和其他不相關的項目可以移除。並且編譯成功。

找到 ,DbContext 的類 , 的SaveChanges方法,順藤摸瓜,
DbContext -》 InternalContext -》ObjectContext-》EntityAdapter-》UpdateTranslator -》 DynamicUpdateCommand
終於找到 DynamicUpdateCommand 的方法 Execute ,里面看到了 DbCommand 如何被創建出來。
internal override long Execute(
Dictionary<int, object> identifierValues,
List<KeyValuePair<PropagatorResult, object>> generatedValues)
{
// Compile command
using (var command = CreateCommand(identifierValues))
{
//更新的command
var connection = Translator.Connection;
// configure command to use the connection and transaction for this session
command.Transaction = ((null == connection.CurrentTransaction)
? null
: connection.CurrentTransaction.StoreTransaction);
command.Connection = connection.StoreConnection;
if (Translator.CommandTimeout.HasValue)
{
command.CommandTimeout = Translator.CommandTimeout.Value;
}
// Execute the query
int rowsAffected;
if (_modificationCommandTree.HasReader)
{
Debug.WriteLine("查詢的sql :" + command.CommandText);
// retrieve server gen results
rowsAffected = 0;
using (var reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
{
if (reader.Read())
{
rowsAffected++;
var members = TypeHelpers.GetAllStructuralMembers(CurrentValues.StructuralType);
for (var ordinal = 0; ordinal < reader.FieldCount; ordinal++)
{
// column name of result corresponds to column name of table
var columnName = reader.GetName(ordinal);
var member = members[columnName];
object value;
if (Helper.IsSpatialType(member.TypeUsage)
&& !reader.IsDBNull(ordinal))
{
value = SpatialHelpers.GetSpatialValue(Translator.MetadataWorkspace, reader, member.TypeUsage, ordinal);
}
else
{
value = reader.GetValue(ordinal);
}
// retrieve result which includes the context for back-propagation
var columnOrdinal = members.IndexOf(member);
var result = CurrentValues.GetMemberValue(columnOrdinal);
// register for back-propagation
generatedValues.Add(new KeyValuePair<PropagatorResult, object>(result, value));
// register identifier if it exists
var identifier = result.Identifier;
if (PropagatorResult.NullIdentifier != identifier)
{
identifierValues.Add(identifier, value);
}
}
}
// Consume the current reader (and subsequent result sets) so that any errors
// executing the command can be intercepted
CommandHelper.ConsumeReader(reader);
}
}
else
{
Debug.WriteLine("更新的sql :" + command.CommandText);
// return 1;
rowsAffected = command.ExecuteNonQuery();
}
return rowsAffected;
}
}
rowsAffected = command.ExecuteNonQuery(); 這個代碼正是真正執行的操作。
思路是我們對 DbCommand 阻止提交 ,並且記錄到一個集合里面。因此我們新建一個類:
public class CommandItem
{
public string CommandText { get; set; }
public CommandType CommandType { get; set; }
public DbParameterCollection Parameters { get; set; }
}
並且組合到 ObjectContext 之中
public List<CommandItem> CommandItems { get; set; }
在DbContext就能訪問到:
public List<CommandItem> CommandItems { get { return this.InternalContext.ObjectContext.CommandItems; } }
拷貝 DynamicUpdateCommand類里面的方法Excute方法,傳入List<CommandItem> items 作為容器
,注釋掉執行的代碼,修改成一個新的方法:
internal override long AtawExecute(List<CommandItem> items, Dictionary<int, object> identifierValues, List<KeyValuePair<PropagatorResult, object>> generatedValues) { // Compile command using (var command = CreateCommand(identifierValues)) { //更新的command //var connection = Translator.Connection; //// configure command to use the connection and transaction for this session //command.Transaction = ((null == connection.CurrentTransaction) // ? null // : connection.CurrentTransaction.StoreTransaction); //command.Connection = connection.StoreConnection; //if (Translator.CommandTimeout.HasValue) //{ // command.CommandTimeout = Translator.CommandTimeout.Value; //} // Execute the query // int rowsAffected; if (_modificationCommandTree.HasReader) { Debug.WriteLine("查詢的sql :" + command.CommandText); // retrieve server gen results //rowsAffected = 0; //using (var reader = command.ExecuteReader(CommandBehavior.SequentialAccess)) //{ // if (reader.Read()) // { // rowsAffected++; // var members = TypeHelpers.GetAllStructuralMembers(CurrentValues.StructuralType); // for (var ordinal = 0; ordinal < reader.FieldCount; ordinal++) // { // // column name of result corresponds to column name of table // var columnName = reader.GetName(ordinal); // var member = members[columnName]; // object value; // if (Helper.IsSpatialType(member.TypeUsage) // && !reader.IsDBNull(ordinal)) // { // value = SpatialHelpers.GetSpatialValue(Translator.MetadataWorkspace, reader, member.TypeUsage, ordinal); // } // else // { // value = reader.GetValue(ordinal); // } // // retrieve result which includes the context for back-propagation // var columnOrdinal = members.IndexOf(member); // var result = CurrentValues.GetMemberValue(columnOrdinal); // // register for back-propagation // generatedValues.Add(new KeyValuePair<PropagatorResult, object>(result, value)); // // register identifier if it exists // var identifier = result.Identifier; // if (PropagatorResult.NullIdentifier != identifier) // { // identifierValues.Add(identifier, value); // } // } // } // // Consume the current reader (and subsequent result sets) so that any errors // // executing the command can be intercepted // CommandHelper.ConsumeReader(reader); //} } else { Debug.WriteLine("更新的sql :" + command.CommandText); items.Add(new CommandItem() { CommandText = command.CommandText , CommandType = command.CommandType , Parameters = command.Parameters }); //return 1; // rowsAffected = command.ExecuteNonQuery(); } return 1; } //throw new NotImplementedException(); }
然后,同樣的,一路修改應該調用的方法。
internal virtual List<CommandItem> AtawUpdate(List<CommandItem> items)
public int AtawUpdate(IEntityStateManager entityCache, List<CommandItem> items) { //if (!IsStateManagerDirty(entityCache)) //{ // return 0; //} //// Check that we have a connection before we proceed //if (_connection == null) //{ // throw Error.EntityClient_NoConnectionForAdapter(); //} //// Check that the store connection is available //if (_connection.StoreProviderFactory == null // || _connection.StoreConnection == null) //{ // throw Error.EntityClient_NoStoreConnectionForUpdate(); //} //// Check that the connection is open before we proceed //if (ConnectionState.Open // != _connection.State) //{ // throw Error.EntityClient_ClosedConnectionForUpdate(); //} var updateTranslator = _updateTranslatorFactory(entityCache, this); updateTranslator.AtawUpdate(items); return 0; }
private void AtawSaveChangesToStore(SaveOptions options) { int entriesAffected; // var mustReleaseConnection = false; var connection = (EntityConnection)Connection; // get data adapter if (_adapter == null) { _adapter = (IEntityAdapter)((IServiceProvider)EntityProviderFactory.Instance).GetService(typeof(IEntityAdapter)); } // only accept changes after the local transaction commits _adapter.AcceptChangesDuringUpdate = false; _adapter.Connection = connection; _adapter.CommandTimeout = CommandTimeout; //try //{ //EnsureConnection(); // mustReleaseConnection = true; // determine what transaction to enlist in // var needLocalTransaction = false; //if (null == connection.CurrentTransaction // && !connection.EnlistedInUserTransaction) //{ // // If there isn't a local transaction started by the user, we'll attempt to enlist // // on the current SysTx transaction so we don't need to construct a local // // transaction. // needLocalTransaction = (null == _lastTransaction); //} //// else the user already has his own local transaction going; user will do the abort or commit. //DbTransaction localTransaction = null; //try //{ // // EntityConnection tracks the CurrentTransaction we don't need to pass it around // if (needLocalTransaction) // { // localTransaction = connection.BeginTransaction(); // } entriesAffected = _adapter.AtawUpdate(ObjectStateManager, CommandItems); // if (null != localTransaction) // { // // we started the local transaction; so we also commit it // localTransaction.Commit(); // } // // else on success with no exception is thrown, user generally commits the transaction // } // finally // { // if (null != localTransaction) // { // // we started the local transaction; so it requires disposal (rollback if not previously committed // localTransaction.Dispose(); // } // // else on failure with an exception being thrown, user generally aborts (default action with transaction without an explict commit) // } //} //finally //{ // if (mustReleaseConnection) // { // // Release the connection when we are done with the save // ReleaseConnection(); // } //} //if ((SaveOptions.AcceptAllChangesAfterSave & options) != 0) //{ // // only accept changes after the local transaction commits // try // { // AcceptAllChanges(); // } // catch (Exception e) // { // // If AcceptAllChanges throw - let's inform user that changes in database were committed // // and that Context and Database can be in inconsistent state. // throw new InvalidOperationException(Strings.ObjectContext_AcceptAllChangesFailure(e.Message)); // } //} // return items; }
public int AtawSaveChanges(SaveOptions options) { PrepareToSaveChanges(options); var entriesAffected = ObjectStateManager.GetObjectStateEntriesCount(EntityState.Added | EntityState.Deleted | EntityState.Modified); // if there are no changes to save, perform fast exit to avoid interacting with or starting of new transactions if (0 < entriesAffected) { entriesAffected = 0; AtawSaveChangesToStore(options); } ObjectStateManager.AssertAllForeignKeyIndexEntriesAreValid(); //return items; return 0; }
public virtual int AtawSaveChanges()
{
try
{
if (ValidateOnSaveEnabled)
{
var validationResults = Owner.GetValidationErrors();
if (validationResults.Any())
{
throw new DbEntityValidationException(
Strings.DbEntityValidationException_ValidationFailed, validationResults);
}
}
var shouldDetectChanges = AutoDetectChangesEnabled && !ValidateOnSaveEnabled;
var saveOptions = SaveOptions.AcceptAllChangesAfterSave |
(shouldDetectChanges ? SaveOptions.DetectChangesBeforeSave : 0);
return ObjectContext.AtawSaveChanges(saveOptions);
}
catch (UpdateException ex)
{
throw WrapUpdateException(ex);
}
}
最外面的DbContext 調用:
public virtual void SaveAtawChanges() { HasSaveChanges = true; // List<CommandItem> items = new List<CommandItem>(); // return items; InternalContext.AtawSaveChanges(); }
編譯,然后測試一下:
WorkflowDbContext context = new WorkflowDbContext(); var list = context.WF_WORKFLOW_INST.ToList(); list.ForEach(a => a.WI_NAME = "ff123"); context.SaveAtawChanges(); var gg = context.CommandItems.Select(a => { string ff = ""; for (int i = 0; i < a.Parameters.Count; i++) { var par = a.Parameters[i]; ff = ff + par.ParameterName + "=" + par.Value; } return new { sql語句 = a.CommandText, 語句類型 = a.CommandType, 參數 = ff }; } ); //執行事務操作 int ggg = 0; var con = context.Database.Connection as SqlConnection; using (con) { con.Open(); var trans = con.BeginTransaction(); try { foreach (var com in context.CommandItems) { List<SqlParameter> sqls = new List<SqlParameter>(); for (int i = 0; i < com.Parameters.Count; i++) { sqls.Add((((ICloneable)com.Parameters[i]).Clone() as SqlParameter)); } // SqlHelper.ExecuteNonQuery( ggg = ggg + SqlHelper.ExecuteNonQuery(trans, com.CommandType, com.CommandText, sqls.ToArray()); } } catch (Exception ex) { //發生異常,事務回滾 Response.Write("數據更新錯誤:" + ggg); trans.Rollback(); } } Response.Write("數據更新:"+ggg); this.GridView1.DataSource = gg; this.GridView1.DataBind();
sqls.Add((((ICloneable)com.Parameters[i]).Clone() asSqlParameter));
這個代碼的目的是因為SqlParameter 不允許被兩個DbCommond引用,所以要克隆出來。
最終 我們用
sqlHelper執行sql語句
返回影響行數
GridView上 顯示生產的sql 語句

成功。
