MongoDB模擬多文檔事務操作


Mongodb不支持多文檔原子性操作,因此依據兩階段提交協議(Two Phase Commits protocol)來模擬事務。

以兩個銀行賬戶之間的轉賬行為為例,來說明如何實現多文檔間的事務操作。

為實現多文檔間的事務操作,定義一個事務文檔TransactionDocument,儲存在事務集合TransactionCollection

 

public class TransactionDocument2
    {
        public object _id { set; get; }
        //原賬戶
        public string Source { set; get; }
        //目標賬戶
        public string Destination { set; get; }
        //轉賬金額
        public decimal Value { set; get; }
        //執行狀態(初始化initial, 執行操作pending, 完成操作applied, 事務結束done, 正在取消操作canceling, 完成取消canceled)
        public string State { set; get; }
        //最后修改日期
        public DateTime LastModified { set; get; }
    }

 

銀行賬戶的結構為:

public class Account
    {
        /// <summary>
        /// 賬號
        /// </summary>
        public string _id { set; get; }
        /// <summary>
        /// 賬戶余額
        /// </summary>
        public decimal Balance { set; get; }
        /// <summary>
        /// 待處理事務鏈表
        /// </summary>
        public List<object> PendingTransactions { set; get; }
}

 

轉賬的主流程為:

0步,為參與事務的兩個實體創建唯一的事務文檔。

AB兩個賬戶創建唯一的事務文檔,事務文檔的_id值為AB賬戶_id值的組合。

1步,在TransactionCollection集合中找到狀態為"initial"的事務文檔。

對於AB兩個賬戶間的轉賬操作,只能有一個事務文檔。這樣做是為了防止多個客戶端同時對一個賬戶執行修改操作,只有一個這種事務文檔,那么當AB間的轉賬行為開始時,事務文檔的狀態為“pending”,而事務開始要查找的是狀態為“initial”的事務文檔,因此不會獲得這樣的事務文檔,也就不會執行任何操作,只有當AB轉賬操作完成后才有可能再次執行類似的操作。

2步,第1步執行成功的前提下,將事務文檔狀態由“initial”更改為“pending”。

3步,第2步執行成功的前提下,對兩個賬戶應用事務,執行轉賬。

對兩個賬戶應用事務的具體操作就是向AB兩個賬戶的待處理事務鏈表中添加事務文檔_id

4步,第3步執行成功的前提下,將事務文檔狀態由“pending”更改為“applied”。

5步,第4步執行成功的前提下,移除事務標識。

具體操作是:移除第3步中向AB兩個賬戶待處理事務鏈表添加的事務文檔_id

6步,第5步執行成功的前提下,將事務文檔狀態由“applied”更改為“done”。

7步,不論第6步是否執行成功,將事務文檔狀態“done”更改為“initial”。

看似在第6步將applied”更改為“initial也是可以的,但是如果在這之間在加入一個done”狀態會帶來更大的好處,例如,可以定時掃描TransactionCollection集合,批量將狀態為“done”的事務文檔狀態改為“initial”,而不是在第6步執行完成以后立即執行第7步。

 

輔助流程

針對上述主流程的每一步加以分析,找出需要輔助流程介入的位置。

對於第0步:

如果創建不成功不會產生任何影響。

對於第1步:

如果沒有找到,不會產生任何影響。

對於第2步:

如果事務文檔狀態修改不成功,不會產生任何影響。

對於第3步:

如果執行轉賬失敗,A賬戶的錢已被扣除V,但B沒有收到V,回滾到之前的狀態。

如果在指定的超時時間內沒有完成則,執行從錯誤中恢復策略。

對於第4步:

如果修改事務文檔狀態失敗,設置執行超時時間Th4,重復執行此步驟,如果超時時間已到達,但未完成,執行從錯誤中恢復策略。

對於第5步:

如果移除事務標識失敗,設置執行超時時間Th5,重復執行此步驟,如果超時時間已到達,但未完成,執行從錯誤中恢復策略。

對於第6步:

如果移除事務標識失敗,設置執行超時時間Th6,重復執行此步驟,如果超時時間已到達,但未完成,執行從錯誤中恢復策略。

 

回滾的步驟為:

1步,將事務文檔狀態由pending”更改為“canceling

2步,賬戶余額還原為操作之前的狀態,刪除兩個賬戶的待處理事務鏈表中的事務文檔_id.

3步,將事務文檔狀態由canceling”更改為“cancelled

 

從錯誤中恢復策略

通過重復執行需要此策略的那一步操作即可達到目的。可以選擇異步執行錯誤恢復機制。

 

超時檢測

比較事務文檔的LastModified 與當前時間的值,如果二者差值超過設定的閾值,即判定超時。

 

示例

考慮了部分情形,實際情況比實例所考慮的情形要復雜。此外MongoDB3.4版本開始支持decimal類型,不過在字段上添加BsonRepresentation(BsonType.Decimal128)特性

事務文檔和賬戶文檔相應地修改為

 

public class TransactionDocumentP
{
        .......
        //轉賬金額
        [BsonRepresentation(BsonType.Decimal128)]
        public decimal Value { set; get; }
        ......
}

public class AccountP
{
        ......
        [BsonRepresentation(BsonType.Decimal128)]
        public decimal Balance { set; get; }
        ......
}

 

操作的集合

 

        //事務文檔集合
        private string TransactionCollectionName = "TransactionCollection";
        //賬戶集合
        private string AccountsCollectionName = "UserAccounts";
        private MongoDBService mongoDBService = new MongoDBService("mongodb://localhost:27017/TestDB?maxPoolSize=100&minPoolSize=10",
               "TestDB");

 

主流程方法:

1 為參與事務的兩個實體創建唯一的事務文檔

 

private void PrepareTransfer(decimal value, string source, string destination)
        {
            //創建事務文檔
            TransactionDocumentP tDoc = new TransactionDocumentP
            {
                _id = string.Format("{0}For{1}", source, destination),
                State = "initial",
                LastModified = DateTime.Now,
                Value = value,
                Source = source,
                Destination = destination
            };
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter1 = filterBuilder.Eq(doc => doc._id, tDoc._id);
            if (mongoDBService.ExistDocument(TransactionCollectionName, filter1))
            {
                return;
            }
            //將事務文檔插入事務集合
            mongoDBService.Insert(TransactionCollectionName, tDoc);
        }

 

2 找到狀態為"initial"的事務文檔

private TransactionDocumentP RetrieveTransaction()
        {
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter = filterBuilder.Eq(doc => doc.State, "initial");

            return mongoDBService.Single(TransactionCollectionName, filter);
        }

3 執行轉賬與應用事務

private bool ApplyTransaction(TransactionDocumentP t, decimal value, string source, string destination)
        {
            FilterDefinitionBuilder<AccountP> filterBuilderS = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterS1 = filterBuilderS.Eq(doc => doc._id, source);
            var updateS = Builders<AccountP>.Update.Inc(m => m.Balance, -value).Push(m => m.PendingTransactions, t._id);
            UpdateResult updateResultS = mongoDBService.DocumentUpdate(AccountsCollectionName, filterS1, updateS);

            bool isSuss = updateResultS.ModifiedCount > 0 && updateResultS.ModifiedCount == updateResultS.MatchedCount;
            if(isSuss)
            {
                FilterDefinitionBuilder<AccountP> filterBuilderD = Builders<AccountP>.Filter;
                FilterDefinition<AccountP> filterD1 = filterBuilderD.Eq(doc => doc._id, destination);
                var updateD = Builders<AccountP>.Update.Inc(m => m.Balance, value).Push(m => m.PendingTransactions, t._id);
                UpdateResult updateResultD = mongoDBService.DocumentUpdate(AccountsCollectionName, filterD1, updateD);
                isSuss = updateResultD.ModifiedCount > 0 && updateResultD.ModifiedCount == updateResultD.MatchedCount;
            }

            return isSuss;
        }

4更新兩個賬戶的待處理事務鏈表,移除事務標識,超時跳出

private bool UpdateAccount(TransactionDocumentP t, string source, string destination, TimeSpan maxTxnTime)
        {
            FilterDefinitionBuilder<AccountP> filterBuilderS = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterS = filterBuilderS.Eq(doc => doc._id, source);
            var updateS = Builders<AccountP>.Update.Pull(doc => doc.PendingTransactions, t._id);
            bool isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterS, updateS);
            while (true)
            {
                if (isSucc) break;
                bool timeOut = CheckTimeOut(t, maxTxnTime);
                if (timeOut) break;
                isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterS, updateS);
            }
            if (!isSucc)
            {
                return isSucc;
            }

            FilterDefinitionBuilder<AccountP> filterBuilderD = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterD = filterBuilderD.Eq(doc => doc._id, destination);
            var updateD = Builders<AccountP>.Update.Pull(doc => doc.PendingTransactions, t._id);
            isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterD, updateD);
            while (true)
            {
                if (isSucc) break;
                bool timeOut = CheckTimeOut(t, maxTxnTime);
                if (timeOut) break;
                isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterD, updateD);
            }
            return isSucc;
        }

5 更新事務文檔狀態

private bool UpdateTransactionState(TransactionDocumentP t, string oldState, string newState)
        {
            if (t == null)
            {
                return false;
            }
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter1 = filterBuilder.Eq(doc => doc._id, t._id);
            FilterDefinition<TransactionDocumentP> filter2 = filterBuilder.Eq(doc => doc.State, oldState);
            FilterDefinition<TransactionDocumentP> filter = filterBuilder.And(new FilterDefinition<TransactionDocumentP>[] { filter1, filter2 });

            var update = Builders<TransactionDocumentP>.Update.Set(m => m.State, newState).Set(m =>m.LastModified,DateTime.Now);
            UpdateResult updateResult = mongoDBService.DocumentUpdate(TransactionCollectionName, filter, update);

            return  updateResult.ModifiedCount > 0 && updateResult.ModifiedCount == updateResult.MatchedCount;
        }
檢驗超時版本:
private bool ReUpdateTransactionState(TransactionDocumentP t, string oldState, string newState,TimeSpan maxTxnTime)
        {
            bool isSucc = UpdateTransactionState(t, oldState, newState);
            while (true)
            {
                if (isSucc) break;
                bool timeOut = CheckTimeOut(t, maxTxnTime);
                if (timeOut) break;
                isSucc = UpdateTransactionState(t, oldState, newState);
            }
            return isSucc;
        }

輔助方法:

1 檢測超時

超時只能應對一般的短時網絡故障,對於長時間的故障這種辦法行不通。

 

private bool CheckTimeOut(TransactionDocumentP t, TimeSpan maxTxnTime)
        {
            DateTime cutOff = DateTime.Now - maxTxnTime;
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter = filterBuilder.Lt(doc => doc.LastModified, cutOff);
            var tranDoc = mongoDBService.Single(TransactionCollectionName, filter);
            return tranDoc == null ? true : false;
        }

 

2 回滾操作

 

private void RollbackOperations(TransactionDocumentP t,string source, string destination)
        {
            //1 將事務文檔狀態由pending更新為canceling.
            ReUpdateTransactionState(t, "pending", "canceling", new TimeSpan(0,0,100));
            
            //2 賬戶余額回滾.
            FilterDefinitionBuilder<AccountP> filterBuilderS = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterS1 = filterBuilderS.Eq(doc => doc._id, t.Source);//source
            FilterDefinition<AccountP> filterS2 = filterBuilderS.Where(doc => doc.PendingTransactions.Contains(t._id));
            FilterDefinition<AccountP> filterS = filterBuilderS.And(new FilterDefinition<AccountP>[] { filterS1, filterS2 });
            var updateS = Builders<AccountP>.Update.Inc(m => m.Balance, t.Value).Pull(m => m.PendingTransactions, t._id);
            bool isSuccess = mongoDBService.UpdateOne(AccountsCollectionName, filterS, updateS);

            if(isSuccess)
            {
                FilterDefinitionBuilder<AccountP> filterBuilderD = Builders<AccountP>.Filter;
                FilterDefinition<AccountP> filterD1 = filterBuilderD.Eq(doc => doc._id, t.Destination);//source
                FilterDefinition<AccountP> filterD2 = filterBuilderD.Where(doc => doc.PendingTransactions.Contains(t._id));
                FilterDefinition<AccountP> filterD = filterBuilderD.And(new FilterDefinition<AccountP>[] { filterD1, filterD2 });
                var updateD = Builders<AccountP>.Update.Inc(m => m.Balance, -t.Value).Pull(m => m.PendingTransactions, t._id);
                isSuccess = mongoDBService.UpdateOne(AccountsCollectionName, filterD, updateD);
            }

            if (isSuccess)
            {
                //3 將事務文檔狀態由canceling更新為cancelled.
                UpdateTransactionState(t, "canceling", "cancelled");
            }
        }

 

組織流程:

public void Process(decimal value, string source, string destination)
        {
            //超時時間
            TimeSpan tSpan = new TimeSpan(0,0,100);
            //0 為參與事務的兩個實體創建唯一的事務文檔
            PrepareTransfer(value,source,destination);

            //1 找到狀態為"initial"的事務文檔
            TransactionDocumentP t2 = RetrieveTransaction();

            //2 將事務文檔狀態由“initial”更改為“pending”,超時跳出
            bool initial_pending = ReUpdateTransactionState(t2, "initial", "pending", tSpan);
            if (!initial_pending)
            {
                return;
            }
            //3 執行轉賬
            bool isSuccessAp = ApplyTransaction(t2, value, source, destination);
            if (!isSuccessAp)
            {
                //回滾
                RollbackOperations(t2, source, destination);
                return;
            }

            //4 將事務文檔狀態由“pending”更改為“applied”
            bool pending_applied = ReUpdateTransactionState(t2, "pending", "applied", tSpan);
            if (!pending_applied)
            {
                return;
            }

            //5 更新兩個賬戶的待處理事務鏈表,移除事務標識,超時跳出
            bool update = UpdateAccount(t2, source, destination, tSpan);
            if (!update)
            {
                return;
            }

            //6 將事務文檔狀態由“applied”更改為“done”
            bool applied_done = ReUpdateTransactionState(t2, "applied", "done", tSpan);
            if (!applied_done)
            {
                return;
            }

            //7 將事務文檔狀態由“done”更改為“initial”
            bool done_initial = ReUpdateTransactionState(t2, "done", "initial", tSpan);
            if (!done_initial)
            {
                return;
            }
        }

-----------------------------------------------------------------------------------------

轉載與引用請注明出處。

時間倉促,水平有限,如有不當之處,歡迎指正。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM