mongo數據庫的事務操作


一:mongo數據庫的版本在4.0以上

二:以pymongo為例

from pymongo import MongoClient
class MongoTest(object):

    def __init__(self):
        self.conn = MongoClient(host='xxx', port=xxx)

    def transaction_test_normal(self):
        """正常給兩條數據庫插入兩條數據"""
        # self.conn["test1"]["t1"] ql_test1指的是數據庫名字 test1指的是集合名字
        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        # 1. pymongo的client開啟事務
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                ressult1 = t1.insert_one(document={"name":"beijing"},session=session)
                ressult2 = t2.insert_one(document={"name":"shanghai"},session=session)
                # 返回兩條插入數據的_id的值
                print(ressult1.inserted_id,ressult2.inserted_id)


    def transaction_test_abort(self):
        """事務刪除兩條數據,引發異常后,兩條數據都未發生改變"""
        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                ressult1 = t1.delete_one({"name":"beijing"},session=session)
                print(1/0)
                ressult2 = t2.delete_one({"name":"shanghai"},session=session)
                print(ressult1.deleted_count,ressult2.deleted_count)

    def test1(self):
        """插入一條數據成功后,刪除一條數據,故意拋出異常,看插入是否成功"""
        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                # xian未插入成功
                ressult1 = t1.insert_one({"name": "xian"}, session=session)
                print(1 / 0)
                ressult2 = t2.delete_one({"name": "guandong"}, session=session)
                print(ressult1.inserted_id, ressult2.deleted_count)

    def test2(self):
        """
        更新第一條數據,更新第二條數據,故意拋出異常,看是否更新成功---
        1. 不能使用update方法了,必須使用find_one_and_update
        2. find_one_and_update也有upsert=True選項,不存在就執行插入操作,存在就不做任何操作
        """

        t1 = self.conn["ql_test"]["test1"]
        t2 = self.conn["ql_test"]["test2"]
        with self.conn.start_session(causal_consistency=True) as session:
            with session.start_transaction():
                #
                r1 = t1.find_one_and_update({"name":"xian666"},{"$set":{"name":"xian"}},session=session)
                print(1 / 0)
                r2 = t2.find_one_and_update({"name":"chongqing"},{"$set":{"name":"chongqing666"}},session=session)
                print(r1,r2)



m = MongoTest()
m.test2()

 

 

# TODO


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM