SQLAlchemy 嵌套事務的解決方案


 sqlachemy 是python的orm框架,在使用一段時間后,我們通常會出現事務嵌套的情況,看到很多人寫代碼的時候,居然是session到處傳遞,這無疑是加大了代碼之間的耦合度。
    案例:
    def save(session):
        # TODO

    def update(session):
        # TODO

    def service():
        session = getSession();
        try:
            save(session);
            update(session);
            session.commit();
        except Exception as e:
            session.rollback();
        finally:
            if not session:
                session.close();

    假設save和update是同一個事務,但是上述的實踐缺強制了save和update的 session相耦合來達成,好的實踐應該是:save和update無任何關系,只是在實現業務邏輯時,組合到一個事務,確保事務性即可,即:

    def service():
        try:
            save();
            update();
        except Exception as e:
            # TODO 
    因此如何解決這個問題是本文需要闡述的主題。

解決方案是:

ACID是事務的四個基本特征,通常我們的理解事務是一個操作單元,要么一起成功要么一起失敗(原子性);通過一個例子來直接說明如何解決的。
    場景:注冊用戶(添加一個用戶,會未用戶送10個積分)

創建表

create table user(
        id int not null auto_increment,
        name varchar(255) not null default '' comment '用戶名',
        created_at datetime not null default current_timestamp comment '創建時間',
        updated_at datetime not null default current_timestamp comment '更新時間',
        primary key (`id`)
    )engine innodb charset=utf8 comment '用戶表';  

    create table user_credits(
        id int not null auto_increment,
        user_id int not null default 0 comment '用戶ID',
        user_name varchar(255) not null default '' comment '用戶名',
        score int not null default 0 comment '積分',
        created_at datetime not null default current_timestamp comment '創建時間',
        updated_at datetime not null default current_timestamp comment '更新時間',
        primary key (`id`),
        unique key `uk_user_id` (`user_id`)
    )engine innodb charset=utf8 comment '用戶積分表';

sqlalchemy 實現

# -*- coding:utf-8 -*-

import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

engine = create_engine("mysql+pymysql://root:root@localhost:3306/csdn", echo=True)

# 必須使用scoped_session,域session可以將session進行共享
DBSession = scoped_session(sessionmaker(bind=engine))

BaseModel = declarative_base()


# ----------- Relation Model Object---------------- #

class User(BaseModel):

    __tablename__ = "user"

    id = Column(Integer, primary_key=True)
    name = Column(String)
    created_at = Column(DateTime, default=datetime.datetime.now)
    updated_at = Column(DateTime, default=datetime.datetime.now)


class UserCredits(BaseModel):

    __tablename__ = "user_credits"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    user_name = Column(String)
    score = Column(Integer)
    created_at = Column(DateTime, default=datetime.datetime.now)
    updated_at = Column(DateTime, default=datetime.datetime.now)

# ----------- Service implements---------------- #


def add_user(user):
    " 添加用戶 "
    session = DBSession()
    try:
        session.add(user)
        session.commit()
    except Exception as e:
        session.rollback()
        print("AddUser: ======={}=======".format(e))
    finally:
        if not session:
            session.close()


def add_user_credits(userCredits, interrupt=True):
    " 添加用戶積分記錄 "
    session = DBSession()
    try:
        if interrupt:
            raise Exception("--- interrupt ---")

        session.add(userCredits)
        session.commit()
    except Exception as e:
        session.rollback()
        print("AddUserCredits: ======={}=======".format(e))
    finally:
        if not session:
            session.close()


def regist_user():

    session = DBSession()
    try:
        # 開啟子事務
        session.begin(subtransactions=True)

        # TODO Service
        user = User(name='wangzhiping')
        add_user(user)
        add_user_credits(UserCredits(
            user_id=user.id,
            user_name=user.name,
            score=10
        ), False)

        session.commit()
    except Exception as e:
        session.rollback()
        print("AddUserCredits: ======={}=======".format(e))
    finally:
        if not session:
            session.close()

# ---------- exec -----------
regist_user()
 1,設置session時,需要指定為scoped_session,目的是session可以共享(ThreadLocal);
    2,session.begin(subtransactions=True) 開啟子事務管理;
    這是實際上regist_user是在同一個線程中的session,這是add_user,add_user_credits實際上session是同一個,所以可以實現。其實這個可以更進一步擴展,把事務隔離級別,傳播屬性,這里不做介紹

---------------------
作者:紫守笨 
來源:CSDN 
原文:https://blog.csdn.net/program_red/article/details/55194130?utm_source=copy 
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

轉載:https://blog.csdn.net/program_red/article/details/55194130


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM