SQLAlchemy的session


在更改 SQLAlchemy Session 從每次請求都創建到共享同一個 Session 之后遇到了如下問題:

StatementError: (sqlalchemy.exc.InvalidRequestError) Can’t reconnect until invalid transaction is rolled back [SQL: ]

或者是

raised unexpected: OperationalError(“(_mysql_exceptions.OperationalError) (2006, ‘MySQL server has gone away’)”,)

錯誤是 SQLAlchemy 拋出。原因是你從 pool 拿的 connection 沒有以 session.commit 或 session.rollback 或者 session.close 放回 pool 里。這時 connection 的 transaction 沒有完結(rollback or commit)。 而不知什么原因(recyle 了,timeout 了)你的 connection 又死掉了,你的 sqlalchemy 嘗試重新連接。由於 transaction 還沒完結,無法重連。

正確用法是確保 session 在使用完成后用 session.close, session.commit 或者 session.rollback 把連接還回 pool。

SQLAlchemy 數據庫連接池使用

sessions 和 connections 不是相同的東西, session 使用連接來操作數據庫,一旦任務完成 session 會將數據庫 connection 交還給 pool。

在使用 create_engine 創建引擎時,如果默認不指定連接池設置的話,一般情況下,SQLAlchemy 會使用一個 QueuePool 綁定在新創建的引擎上。並附上合適的連接池參數。

在以默認的方法 create_engine 時(如下),就會創建一個帶連接池的引擎。

engine = create_engine('mysql+mysqldb://root:password@127.0.0.1:3306/dbname')

在這種情況下,當你使用了 session 后就算顯式地調用 session.close(),也不能把連接關閉。連接會由 QueuePool 連接池進行管理並復用。

這種特性在一般情況下並不會有問題,不過當數據庫服務器因為一些原因進行了重啟的話。最初保持的數據庫連接就失效了。隨后進行的 session.query() 等方法就會拋出異常導致程序出錯。

如果想禁用 SQLAlchemy 提供的數據庫連接池,只需要在調用 create_engine 是指定連接池為 NullPool,SQLAlchemy 就會在執行 session.close() 后立刻斷開數據庫連接。當然,如果 session 對象被析構但是沒有被調用 session.close(),則數據庫連接不會被斷開,直到程序終止。

下面的代碼就可以避免 SQLAlchemy 使用連接池:

#!/usr/bin/env python
#-*- coding: utf-8 -*-
 
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
 
engine = create_engine('mysql+mysqldb://root:password@127.0.0.1:3306/dbname', poolclass=NullPool)
Session = sessionmaker(bind=engine)
session = Session()
usr_obj_list = session.query(UsrObj).all()
print usr_obj_list[0].id

 

create_engine() 函數和連接池相關的參數有:

  • -pool_recycle, 默認為 -1, 推薦設置為 7200, 即如果 connection 空閑了 7200 秒,自動重新獲取,以防止 connection 被 db server 關閉。
  • -pool_size=5, 連接數大小,默認為 5,正式環境該數值太小,需根據實際情況調大
  • -max_overflow=10, 超出 pool_size 后可允許的最大連接數,默認為 10, 這 10 個連接在使用過后,不放在 pool 中,而是被真正關閉的。
  • -pool_timeout=30, 獲取連接的超時閾值,默認為 30 秒

直接只用 create_engine 時,就會創建一個帶連接池的引擎

engine = create_engine('postgresql://postgres@127.0.0.1/dbname')

當使用 session 后就顯示地調用 session.close(),也不能把連接關閉,連接由 QueuePool 連接池管理並復用。

引發問題

當數據庫重啟,最初保持的連接就會失敗,隨后進行 session.query() 就會失敗拋出異常 mysql 數據 ,interactive_timeout 等參數處理連接的空閑時間超過(配置時間),斷開

何時定義 session,何時提交,何時關閉

基本

  • 通常來說,將 session 的生命周期和訪問操作數據庫的方法對象隔離和獨立。
  • 確保 transaction 有非常清晰的開始和結束,保持 transaction 簡短,也就意味着讓 transaction 能在一系列操作之后終止,而不是一直開放着。

    from contextlib import contextmanager
    
    @contextmanager 
    def session_scope(): 
        “"
        Provide a transactional scope around a series of operations.
        ””” 
        session = Session() 
        try: 
            yield session 
            session.commit() 
        except: 
            session.rollback() 
            raise 
        finally: 
            session.close()

     

是否線程安全

Session 不是為了線程安全而設計的,因此確保只在同一個線程中使用。

如果實際上有多個線程參與同一任務,那么您考慮在這些線程之間共享 Session 及其對象;但是在這種極不尋常的情況下,應用程序需要確保實現正確的 locking scheme,以便不會同時訪問 Session 或其狀態。處理這種情況的一種更常見的方法是為每個並發線程維護一個 Session,而是將對象從一個 Session 復制到另一個 Session,通常使用 Session.merge() 方法將對象的狀態復制到本地的新對象中。

scoped session

想要線程安全時使用 scoped_session() ,文檔解釋

the scoped_session() function is provided which produces a thread-managed registry of Session objects. It is commonly used in web applications so that a single global variable can be used to safely represent transactional sessions with sets of objects, localized to a single thread.

using transactional=False is one solution, but a better one is to simply rollback(), commit(), or close() the Session when operations are complete - transactional mode (which is called “autocommit=False” in 0.5) has the advantage that a series of select operations will all share the same isolated transactional context..this can be more or less important depending on the isolation mode in effect and the kind of application.

DBAPI has no implicit “autocommit” mode so there is always a transaction implicitly in progress when queries are made.

This would be a fairly late answer. This is what happens: While using the session, a sqlalchemy Error is raised (anything which would also throw an error when be used as pure SQL: syntax errors, unique constraints, key collisions etc.).

You would have to find this error, wrap it into a try/except-block and perform a session.rollback().

After this you can reinstate your session.

flush 和 commit 區別

  • flush 預提交,等於提交到數據庫內存,還未寫入數據庫文件;
  • commit 就是把內存里面的東西直接寫入,可以提供查詢了;

使用總結


 

sqlalchemy:

>>> with engine.connect() as conn:
... result = conn.execute(text("SELECT x, y FROM some_table")) ... for row in result: ... print(f"x: {row.x} y: {row.y}")

 

上面的result是查詢的結果集,可迭代, 迭代出的每個對象(row),包含了查詢出的每一行數據的信息。可以直接通過'.字段名'方式獲取到字段值。row本身類似於一個元組,如這種用法:

for x, y in result: # 獲取到每一行數據,元組數據按查詢的select語句后的字段名順序依次排序
# ...
Result.all() 方法,獲取到所有的row,如:[(x1, y1), (x2, y2)]
Result.mappings()方法獲取每一行的字典對象,如[{'x': x1, 'y':y1}, {'x': x2, 'y':y2}]

sql語句中參數傳遞:

>>> with engine.connect() as conn:
... result = conn.execute( ... text("SELECT x, y FROM some_table WHERE y > :y"), ... {"y": 2} ... ) ... for row in result: ... print(f"x: {row.x} y: {row.y}")

 

sql語句中使用:參數名 形式定義站位符, 
excute()第二個參數通過字典傳遞參數

多組參數的傳遞:

>>> with engine.connect() as conn:
... conn.execute(
... text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), ... [{"x": 11, "y": 12}, {"x": 13, "y": 14}] ... ) ... conn.commit()

 

excute()方法第二個參數不是傳一個字典,而是傳一個字典列表。此時底層會遍歷這個列表然后拼接成多個sql語句執行。但是在執行多個sql前,DBAPI會通過各種方式對sql進行優化如這里的insert語句會被優化,優化后,只會執行一條insert語句:
INSERT INTO some_table (x, y) VALUES (11, 12), (13, 14)

參數直接和sql語句綁定,通過text()方法實現:

>>> stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
>>> with engine.connect() as conn: ... result = conn.execute(stmt) ... for row in result: ... print(f"x: {row.x} y: {row.y}")


Session是基礎的事務/數據庫交互對象稱為,這個對象的使用方式與 Connection相似 ,事實上 Session 通過內部的 Connection 才操作sql的。

>>> from sqlalchemy.orm import Session

>>> stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6) >>> with Session(engine) as session: ... result = session.execute(stmt) ... for row in result: ... print(f"x: {row.x} y: {row.y}")

 

>>> with Session(engine) as session:
... result = session.execute( ... text("UPDATE some_table SET y=:y WHERE x=:x"), ... [{"x": 9, "y":11}, {"x": 13, "y": 15}] ... ) ... session.commit()

這里事務提交后session不再繼續持有connection,下一次需要操作數據庫時,會重新從engine中獲取connection


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM