SQLAlchemy模型使用


 

                      SQLAchemy模型使用

簡介:

SQLAlchemy是Python編程語言下的一款ORM框架,該框架建立在數據庫API之上,使用關系對象映射進行數據庫操作,簡言之便是:將對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果。

 

SQLAlchemy本身無法操作數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:

 1 MySQL-Python  2     mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
 3   
 4 pymysql  5     mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]  6   
 7 MySQL-Connector  8     mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
 9   
10 cx_Oracle 11     oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 12   
13 更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html

 

1.SQLAlchemy初始化和創建連接

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import Column from sqlalchemy.types import String, Integer from sqlalchemy.ext.declarative import declarative_base #導入相應的模塊 engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", max_overflow=5) #創建數據庫連接,max_overflow指定最大連接數 DBSession = sessionmaker(engine)#創建DBSession類型 session = DBSession()#創建session對象 BaseModel = declarative_base()#創建對象的基類 class User(BaseModel): #定義User對象 __tablename__ = 'user' #創建表,指定表名稱

#指定表的結構 id = Column(String, primary_key=True) username = Column(String, index=True) class Session(BaseModel): __tablename__ = 'session' id = Column(String, primary_key=True) user = Column(String, index=True) ip = Column(String)
BaseModel.metadata.create_all(engine)#創建表,執行所有BaseModel類的子類
session.commit()#提交,必須
 

2. SQLAlchemy創建連接

SQLAlchemy 的連接創建是 Lazy 的方式, 即在需要使用時才會去真正創建. 之前做的工作, 全是"定義".連接的定義是在 engine 中做的.

2.1. Engine

engine 的定義包含了三部分的內容, 一是具體數據庫類型的實現, 二是連接池, 三是策略(即engine 自己的實現).

所謂的數據庫類型即是 MYSQL , Postgresql , SQLite 這些不同的數據庫.

一般創建 engine 是使用 create_engine 方法:

engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test")

參數字符串的各部分的意義:

"mysql+pymysql://mysql:123456@10.0.0.8:3306/test"

對於這個字符串, SQLAlchemy 提供了工具可用於處理它:

 1 # -*- coding: utf-8 -*-
 2 
 3 from sqlalchemy import create_engine  4 from sqlalchemy.engine.url import make_url  5 from sqlalchemy.engine.url import URL  6 
 7 s = 'postgresql://test@localhost:5432/bbcustom'
 8 url = make_url(s)  9 s = URL(drivername='postgresql', username='test', password="",host="localhost", port=5432, database="bbcustom") 10 
11 engine = create_engine(url) 12 engine = create_engine(s) 13 
14 print engine.execute('select id from "user"').fetchall()

create_engine 函數有很多的控制參數, 這個后面再詳細說.

2.2. Engine的策略

create_engine 的調用, 實際上會變成 strategy.create 的調用. 而 strategy 就是 engine 的實現細節. strategy 可以在 create_engine 調用時通過 strategy 參數指定, 目前官方的支持有三種:

  • plain, 默認的
  • threadlocal, 連接是線程局部的
  • mock, 所有的 SQL 語句的執行會使用指定的函數

mock 這個實現, 會把所有的 SQL 語句的執行交給指定的函數來做, 這個函數是由create_engine 的 executor 參數指定:

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 from sqlalchemy.ext.declarative import declarative_base  6 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index  7 from sqlalchemy.orm import sessionmaker, relationship  8 from sqlalchemy import create_engine

9 def f(sql,*args,**kwargs): 10 print(sql,args,kwargs) 11 s="mysql+pymysql://mysql:123456@10.0.0.8:3306/test" 12 engin=create_engine(s,strategy='mock',executor=f) 13 print(engin.execute('select id from "user"'))

2.3. 各數據庫實現

各數據庫的實現在 SQLAlchemy 中分成了兩個部分, 一是數據庫的類型, 二是具體數據庫中適配的客戶端實現. 比如對於 Postgresql 的訪問, 可以使用 psycopg2 , 也可以使用 pg8000 :

1 s = 'postgresql+psycopg2://test@localhost:5432/bbcustom'
2 s = 'postgresql+pg8000://test@localhost:5432/bbcustom'
3 engine = create_engine(s)

具體的適配工作, 是需要在代碼中實現一個 Dialect 類來完成的. 官方的實現在 dialects 目錄下.

獲取具體的 Dialect 的行為, 則是前面提到的 URL 對象的 get_dialect 方法. create_engine 時你單傳一個字符串, SQLAlchemy 自己也會使用 make_url 得到一個 URL 的實例).

2.4. 連接池

SQLAlchemy 支持連接池, 在 create_engine 時添加相關參數即可使用.

  • pool_size 連接數
  • max_overflow 最多多幾個連接
  • pool_recycle 連接重置周期
  • pool_timeout 連接超時時間

連接池效果:

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import Column from sqlalchemy.types import String, Integer from sqlalchemy.ext.declarative import declarative_base engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", pollsize=2,max_overflow=0) DBSession = sessionmaker(engine) session = DBSession() BaseModel = declarative_base() from threading import Thread def f(): print (engine.execute('show databases').fetchall()) p = [] for i in range(3): p.append(Thread(target=f)) for t in p: t.start()

 

 

 

3.SQLAlchemy 模型使用

3.1. 模型定義

對於 Table 的定義, 本來是直接的實例化調用, 通過 declarative 的包裝, 可以像"定義類"這樣的更直觀的方式來完成.

user = Table('user', Column('user_id', Integer, primary_key = True), Column('user_name', String(16), nullable = False), Column('email_address', String(60)), Column('password', String(20), nullable = False) )                                                                                     
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from sqlalchemy.ext.declarative import declarative_base  4 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index  5 from sqlalchemy.orm import sessionmaker, relationship  6 from sqlalchemy import create_engine  7 
 8 engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", max_overflow=5) #創建數據庫連接
 9 DBSession=sessionmaker(engine)#創建了一個自定義了的 Session類
10 session=DBSession() 11 Base = declarative_base()#創建對象的基類
12 
13 
14 class Blog(Base): 15     __tablename__ = 'blog'
16 
17     id = Column(Integer, primary_key=True) 18     title = Column(String(64), server_default='', nullable=False) 19     text = Column(String, server_default='', nullable=False) 20     user = Column(String(32), index=True, server_default='', nullable=False) 21     create = Column(String(32), index=True, server_default='0', nullable=False) 22 
23 
24 class User(Base): 25     __tablename__ = 'user'
26 
27     id = Column(Integer, primary_key=True) 28     name = Column(String(32), server_default='', nullable=False) 29     username = Column(String(32), index=True, server_default='', nullable=False) 30     password = Column(String(64), server_default='', nullable=False) 31 
32 
33 def init_db(): 34  Base.metadata.create_all(engine) 35 
36 def drop_db(): 37  Base.metadata.drop_all(engine) 38 
39 if __name__ == '__main__': 40  init_db() 41     #drop_db()
42     #Base.metadata.tables['user'].create(engine, checkfirst=True)
43     #Base.metadata.tables['user'].drop(engine, checkfirst=False)
44     #pass

3.2. 創建

1 session = Session() 2 session.add(User(id=1)) #創建一個 3 session.add(Blog(id=1)) 4 session.add_all([ User(id=2),Blog(id=2)]) #創建多個 5 session.commit()

執行的順序並不一定會和代碼順序一致, SQLAlchemy 自己會整合邏輯再執行.

3.3. 查詢

SQLAlchemy 實現的查詢非常強大, 寫起來有一種隨心所欲的感覺.

查詢的結果, 有幾種不同的類型, 這個需要注意, 像是:

  • instance
  • instance of list
  • keyed tuple of list
  • value of list

基本查詢:

 1 session.query(User).filter_by(username='abc').all()#查詢用戶User創建表中username字段等於abc的內容  2 session.query(User).filter(User.username=='abc').all()
#平時使用的時候,兩者區別主要就是當使用filter的時候條件之間是使用“==",fitler_by使用的是"="

3 session.query(Blog).filter(Blog.create >= 0).all() #查詢Blog對象中創建表中create字段大於等於0的所有內容 4 session.query(Blog).filter(Blog.create >= 0).first() 5 session.query(Blog).filter(Blog.create >= 0 | Blog.title == 'A').first()#條件或 6 session.query(Blog).filter(Blog.create >= 0 & Blog.title == 'A').first()#條件與 7 session.query(Blog).filter(Blog.create >= 0).offset(1).limit(1).scalar()# offset(N)從第N條開始返回 limit(N)最多返回 N 條記錄 scalar() 如果有記錄 8 session.query(User).filter(User.username == 'abc').scalar() 9 session.query(User.id).filter(User.username == 'abc').scalar() 10 session.query(Blog.id).filter(Blog.create >= 0).all() 11 session.query(Blog.id).filter(Blog.create >= 0).all()[0].id 12 dict(session.query(Blog.id, Blog.title).filter(Blog.create >= 0).all()) 13 session.query(Blog.id, Blog.title).filter(Blog.create >= 0).first().title #記錄不存在時,first() 會返回 None 14 session.query(User.id).order_by('id desc').all()#對結果集進行排序 15 session.query(User.id).order_by('id').first() 16 session.query(User.id).order_by(User.id).first() 17 session.query(User.id).order_by(-User.id).first() 18 session.query('id', 'username').select_from(User).all()#與 19 session.query(User).get('16e19a64d5874c308421e1a835b01c69')# 以主鍵獲取

多表查詢:

session.query(Blog, User).filter(Blog.user == User.id).first().User.username session.query(Blog, User.id, User.username).filter(Blog.user == User.id).first().id session.query(Blog.id,User.id,User.username).filter(Blog.user == User.id).first().keys()

條件查詢:

from sqlalchemy import or_, not_ session.query(User).filter(or_(User.id == '', User.id == '1')).all() session.query(User).filter(not_(User.id == '1')).all() session.query(User).filter(User.id.in_(['1'])).all() session.query(User).filter(User.id.like('1%')).all() session.query(User).filter(User.id.startswith('1')).all() dir(User.id)

函數:

from sqlalchemy import func session.query(func.count('1')).select_from(User).scalar() session.query(func.count('1'), func.max(User.username)).select_from(User).first() session.query(func.count('1')).select_from(User).scalar() session.query(func.md5(User.username)).select_from(User).all() session.query(func.current_timestamp()).scalar() session.query(User).count()

3.4. 修改

還是通常的兩種方式:

1 session.query(User).filter(User.username == 'abc').update({'name': '123'}) 2 session.commit() 3 
4 user=session.query(User).filter_by(username='abc').scalar() 5 user.name = '223'
6 session.commit()

如果涉及對屬性原值的引用, 則要考慮 synchronize_session 這個參數.

  • 'evaluate' 默認值, 會同時修改當前 session 中的對象屬性.
  • 'fetch' 修改前, 會先通過 select 查詢條目的值.
  • False 不修改當前 session 中的對象屬性.

在默認情況下, 因為會有修改當前會話中的對象屬性, 所以如果語句中有 SQL 函數, 或者"原值引用", 那是無法完成的操作, 自然也會報錯, 比如:

from sqlalchemy import func session.query(User).update({User.name: func.trim('123 ')}) session.query(User).update({User.name: User.name + 'x'})

這種情況下, 就不能要求 SQLAlchemy 修改當前 session 的對象屬性了, 而是直接進行數據庫的交互, 不管當前會話值:

session.query(User).update({User.name: User.name + 'x'}, synchronize_session=False)

是否修改當前會話的對象屬性, 涉及到當前會話的狀態. 如果當前會話過期, 那么在獲取相關對象的屬性值時, SQLAlchemy 會自動作一次數據庫查詢, 以便獲取正確的值:

user = session.query(User).filter_by(username='abc').scalar() print user.name session.query(User).update({User.name: 'new'}, synchronize_session=False) print (user.name) session.commit() print (user.name)

執行了 update 之后, 雖然相關對象的實際的屬性值已變更, 但是當前會話中的對象屬性值並沒有改變. 直到 session.commit() 之后, 當前會話變成"過期"狀態, 再次獲取 user.name 時, SQLAlchemy 通過 user 的 id 屬性, 重新去數據庫查詢了新值. (如果 user 的 id 變了呢? 那就會出事了啊.)

synchronize_session 設置成 'fetch' 不會有這樣的問題, 因為在做 update 時已經修改了當前會話中的對象了.

不管 synchronize_session 的行為如何, commit 之后 session 都會過期, 再次獲取相關對象值時, 都會重新作一次查詢.

3.5. 刪除

session.query(User).filter_by(username='abc').delete() user = session.query(User).filter_by(username='abc').first() session.delete(user)

刪除同樣有像修改一樣的 synchronize_session 參數的問題, 影響當前會話的狀態.

3.6. JOIN

SQLAlchemy 可以很直觀地作 join 的支持:

1 r = session.query(Blog, User).join(User, Blog.user == User.id).all() 2 for blog, user in r: 3     print (blog.id, blog.user, user.id) 
4 r = session.query(Blog, User.name, User.username).join(User, Blog.user == User.id).all() 5 print (r)

表操作總結:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from sqlalchemy.ext.declarative import declarative_base  4 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index  5 from sqlalchemy.orm import sessionmaker, relationship  6 from sqlalchemy import create_engine  7 
 8 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)  9 
10 Base = declarative_base() 11 
12 # 創建單表
13 class Users(Base): 14     __tablename__ = 'users'
15     id = Column(Integer, primary_key=True) 16     name = Column(String(32)) 17     extra = Column(String(16)) 18 
19     __table_args__ = ( 20     UniqueConstraint('id', 'name', name='uix_id_name'), 21         Index('ix_id_name', 'name', 'extra'), 22  ) 23 
24     def __repr__(self): 25         return "%s-%s" %(self.id, self.name) 26 
27 # 一對多
28 class Favor(Base): 29     __tablename__ = 'favor'
30     nid = Column(Integer, primary_key=True) 31     caption = Column(String(50), default='red', unique=True) 32 
33     def __repr__(self): 34         return "%s-%s" %(self.nid, self.caption) 35 
36 class Person(Base): 37     __tablename__ = 'person'
38     nid = Column(Integer, primary_key=True) 39     name = Column(String(32), index=True, nullable=True) 40     favor_id = Column(Integer, ForeignKey("favor.nid")) 41     # 與生成表結構無關,僅用於查詢方便
42     favor = relationship("Favor", backref='pers') 43 
44 # 多對多
45 class ServerToGroup(Base): 46     __tablename__ = 'servertogroup'
47     nid = Column(Integer, primary_key=True, autoincrement=True) 48     server_id = Column(Integer, ForeignKey('server.id')) 49     group_id = Column(Integer, ForeignKey('group.id')) 50     group = relationship("Group", backref='s2g') 51     server = relationship("Server", backref='s2g') 52 
53 class Group(Base): 54     __tablename__ = 'group'
55     id = Column(Integer, primary_key=True) 56     name = Column(String(64), unique=True, nullable=False) 57     port = Column(Integer, default=22) 58     # group = relationship('Group',secondary=ServerToGroup,backref='host_list')
59 
60 
61 class Server(Base): 62     __tablename__ = 'server'
63 
64     id = Column(Integer, primary_key=True, autoincrement=True) 65     hostname = Column(String(64), unique=True, nullable=False) 66 
67 
68 
69 
70 def init_db(): 71  Base.metadata.create_all(engine) 72 
73 
74 def drop_db(): 75  Base.metadata.drop_all(engine) 76 
77 
78 Session = sessionmaker(bind=engine) 79 session = Session()
表結構+連接數據庫

 

1.增

1 obj = Users(name="alex0", extra='sb') 2 session.add(obj) 3 session.add_all([ 4     Users(name="alex1", extra='sb'), 5     Users(name="alex2", extra='sb'), 6 ]) 7 session.commit()

2.刪

session.query(Users).filter(Users.id > 2).delete() session.commit()

3.改

1 session.query(Users).filter(Users.id > 2).update({"name" : "099"}) 2 session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) 3 session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") 4 session.commit()

4.查

1 ret = session.query(Users).all() 2 ret = session.query(Users.name, Users.extra).all() 3 ret = session.query(Users).filter_by(name='alex').all() 4 ret = session.query(Users).filter_by(name='alex').first()

5.其他

 1 # 條件
 2 ret = session.query(Users).filter_by(name='alex').all()  3 ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()  4 ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()  5 ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()  6 ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()  7 ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()  8 from sqlalchemy import and_, or_  9 ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() 10 ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() 11 ret = session.query(Users).filter( 12  or_( 13         Users.id < 2, 14         and_(Users.name == 'eric', Users.id > 3), 15         Users.extra != ""
16  )).all() 17 
18 
19 # 通配符
20 ret = session.query(Users).filter(Users.name.like('e%')).all() 21 ret = session.query(Users).filter(~Users.name.like('e%')).all() 22 
23 # 限制
24 ret = session.query(Users)[1:2] 25 
26 # 排序
27 ret = session.query(Users).order_by(Users.name.desc()).all() 28 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() 29 
30 # 分組
31 from sqlalchemy.sql import func 32 
33 ret = session.query(Users).group_by(Users.extra).all() 34 ret = session.query( 35  func.max(Users.id), 36  func.sum(Users.id), 37  func.min(Users.id)).group_by(Users.name).all() 38 
39 ret = session.query( 40  func.max(Users.id), 41  func.sum(Users.id), 42     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() 43 
44 # 連表
45 
46 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() 47 
48 ret = session.query(Person).join(Favor).all() 49 
50 ret = session.query(Person).join(Favor, isouter=True).all() 51 
52 
53 # 組合
54 q1 = session.query(Users.name).filter(Users.id > 2) 55 q2 = session.query(Favor.caption).filter(Favor.nid < 2) 56 ret = q1.union(q2).all() 57 
58 q1 = session.query(Users.name).filter(Users.id > 2) 59 q2 = session.query(Favor.caption).filter(Favor.nid < 2) 60 ret = q1.union_all(q2).all()
View Code

 

 

 

 

 

 

 

4. SQLAlchemy外鍵和關系

4.1. 外鍵約束

使用 ForeignKey 來定義一個外鍵約定:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from sqlalchemy.ext.declarative import declarative_base  4 from sqlalchemy import Column, Integer, String, CHAR,BIGINT,ForeignKey, UniqueConstraint, Index  5 from sqlalchemy.orm import sessionmaker, relationship  6 from sqlalchemy import create_engine  7 
 8 engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", max_overflow=5) #創建數據庫連接
 9 DBSession=sessionmaker(engine)#創建了一個自定義了的 Session類
10 session=DBSession() 11 BaseModel = declarative_base()#創建對象的基類
12 class Blog(BaseModel): 13     __tablename__='blog'
14 
15     id=Column(BIGINT,primary_key=True,autoincrement=True) 16     title=Column(String(64),server_default='',nullable=False) 17     text=Column(String(256),server_default='',nullable=False) 18     user=Column(BIGINT,ForeignKey('user.id'),index=True,nullable=False)#將user表中id字段作為外鍵,使用ForeignKey指定 19 
20 class User(BaseModel): 21     __tablename__='user'
22 
23     id=Column(BIGINT,primary_key=True,autoincrement=True) 24     name=Column(String(32),server_default='',nullable=False) 25     username=Column(String(32),index=True,server_default='',nullable=True) 26     passwd=Column(String(64),server_default='',nullable=False)

創建時:

1 BaseModel.metadata.create_all(engine) 2 user=User(name='first',username='pan',passwd='123456') 3 session.add(user) 4 session.flush() 5 blog = Blog(title='frist', user=user.id) 6 session.add(blog) 7 session.commit()

session.flush() 是進行數據庫交互, 但是事務並沒有提交. 進行數據庫交互之后, user.id 才有值.

定義了外鍵, 對查詢來說, 並沒有影響. 外鍵只是單純的一條約束而已. 當然, 可以在外鍵上定義一些關聯的事件操作, 比如當外鍵條目被刪除時, 字段置成 null , 或者關聯條目也被刪除等.

4.2. 關系定義

要定義關系, 必有使用 ForeignKey 約束. 當然, 這里說的只是在定義模型時必有要有, 至於數據庫中是否真有外鍵約定, 這並不重要.

接下來我們來了解幾個關於外鍵(Foreign Key)的小知識:

1. FOREIGN KEY 約束是大多數(但不是所有)的關系型數據庫中可以鏈接到主鍵列,或者擁有UNIQUE約束的列。

2. FOREIGN KEY 能夠引用多重列主鍵,並且其自身擁有多重列,被稱為“復合外鍵”(composite foreign key)。其也能夠引用這些列的子集(subset)。(注:這地方不太明白)

3. FOREIGN KEY 列作為對於其引用的列或者行的變化的響應能夠自動更新其自身,比如CASCADE引用操作,這些都是內置於關系型數據庫的功能之一。

4. FOREIGN KEY 能夠引用其自身的表,這個就涉及到“自引用”(self-referential)的外鍵了。

5. 更多關於外鍵的資料可以參考Foreign Key – Wikipedia

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from sqlalchemy.ext.declarative import declarative_base  4 from sqlalchemy import Column, Integer, String, CHAR,BIGINT,ForeignKey, UniqueConstraint, Index  5 from sqlalchemy.orm import sessionmaker, relationship  6 from sqlalchemy import create_engine  7 
 8 engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", max_overflow=5) #創建數據庫連接
 9 DBSession=sessionmaker(engine)#創建了一個自定義了的 Session類
10 session=DBSession() 11 BaseModel = declarative_base()#創建對象的基類
12 class Blog(BaseModel): 13     __tablename__='blog'
14 
15     id=Column(BIGINT,primary_key=True,autoincrement=True) 16     title=Column(String(64),server_default='',nullable=False) 17     text=Column(String(256),server_default='',nullable=False) 18     user=Column(BIGINT,ForeignKey('user.id'),index=True,nullable=False) 19     user_obj=relationship('User') 20 
21 class User(BaseModel): 22     __tablename__='user'
23 
24     id=Column(BIGINT,primary_key=True,autoincrement=True) 25     name=Column(String(32),server_default='',nullable=False) 26     username=Column(String(32),index=True,server_default='',nullable=True) 27     passwd=Column(String(64),server_default='',nullable=False) 28     blog_list=relationship('Blog',backref='Blog.user')  #relationship函數是sqlalchemy對關系之間提供的一種便利的調用方式, backref參數則對關系提供反向引用的聲明。
    #大致原理應該就是sqlalchemy在運行時對Blog對象動態的設置了一個指向所屬User對象的屬性,這樣就能在實際開發中使邏輯關系更加清晰,代碼更加簡潔了。
31 32 BaseModel.metadata.create_all(engine) 33 user=User(name='first',username='pan',passwd='123456') 34 session.add(user) 35 session.flush() 36 blog = Blog(title='frist', user=user.id) 37 session.add(blog) 38 session.commit()

關系只是 SQLAlchemy 提供的工具, 與數據庫無關, 所以任何時候添加都是可以的.

上面的 User-Blog 是一個"一對多"關系, 通過 Blog 的 user 這個 ForeignKey , SQLAlchemy 可以自動處理關系的定義. 在查詢時, 返回的結果自然也是, 一個是列表, 一個是單個對象:

1 ret=session.query(Blog).get(1).user_obj 2 ret1=session.query(User).get(1).blog_list 3 print(ret.passwd,ret1[0].id)#ret.passwd=User.passwd  ret1[0].id=Blog.user列表中第一個值

#blog_list=relationship('Blog',backref='Blog.user')
#user_obj=relationship('User')
#relationship指定對象不同引用方法不同

這種關系的定義, 並不影響查詢並獲取對象的行為, 不會添加額外的 join 操作. 在對象上取一個user_obj 或者取 blog_list 都是發生了一個新的查詢操作.

上面的關系定義, 對應的屬性是實際查詢出的實例列表, 當條目數多的時候, 這樣可能會有問題. 比如用戶名下有成千上萬的文章, 一次全取出就太暴力了. 關系對應的屬性可以定義成一個 Query :

1 class User(BaseModel): 2     __tablename__ = 'user'
3 
4     id = Column(BIGINT, primary_key=True, autoincrement=True) 5     name = Column(String(32), server_default='', nullable=False) 6 
7     blog_list = relationship('Blog', order_by='Blog.user', lazy="dynamic")

這樣在獲取實例時就可以自由控制了:

1 session.query(User).get(1).blog_list.all() 2 session.query(User).get(1).blog_list.filter(Blog.title == '1').first()

 

 

4.3. 關系的查詢

關系定義之后, 除了在查詢時會有自動關聯的效果, 在作查詢時, 也可以對定義的關系做操作:

 1 class Blog(BaseModel):  2     __tablename__ = 'blog'
 3 
 4     id = Column(Integer, autoincrement=True, primary_key=True)  5     title = Column(Unicode(32), server_default='')  6     user = Column(Integer, ForeignKey('user.id'), index=True)  7 
 8     user_obj = relationship('User')  9 
10 
11 class User(BaseModel): 12     __tablename__ = 'user'
13 
14     id = Column(Integer, autoincrement=True, primary_key=True) 15     name = Column(Unicode(32), server_default='') 16 
17     blogs = relationship('Blog')

對於 一對多 的關系, 使用 any() 函數查詢:

user = session.query(User).filter(User.blogs.any(Blog.title == 'first')).first()

SQLAlchemy 會使用 exists 條件, 類似於:

1 SELECT *FROM user WHERE EXISTS 2     (SELECT 1 FROM blog  WHERE user.id = blog.user AND blog.title = ?)  LIMIT ? OFFSET ?

反之, 如果是 多對一 的關系, 則使用 has() 函數查詢:

blog = session.query(Blog).filter(Blog.user_obj.has(User.name == 'pan')).first()

最后的 SQL 語句都是一樣的.

4.4. 關系的獲取形式

前面介紹的關系定義中, 提到了兩種關系的獲取形式, 一種是:

user_obj = relationship('User')

這種是在對象上獲取關系對象時, 再去查詢.

另一種是:

blog_list = relationship('Blog', lazy="dynamic")

這種的結果, 是在對象上獲取關系對象時, 只返回 Query , 而查詢的細節由人為來控制.

總的來說, 關系的獲取分成兩種, Lazy 或 Eager . 在直接查詢層面, 上面兩種都屬於 Lazy 的方式, 而 Eager 的一種, 就是在獲取對象時的查詢語句, 是直接帶 join 的, 這樣關系對象的數據在一個查詢語句中就直接獲取到了:

 1 class Blog(BaseModel):  2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True)  5     title = Column(String(64), server_default='', nullable=False)  6     user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)  7 
 8     user_obj = relationship('User', lazy='joined', cascade='all')  9 
10 
11 class User(BaseModel): 12     __tablename__ = 'user'
13 
14     id = Column(BIGINT, primary_key=True, autoincrement=True) 15     name = Column(String(32), server_default='', nullable=False)

這樣在查詢時:

blog = session.query(Blog).first() print( blog.user_obj)

便會多出 LEFT OUTER JOIN 的語句, 結果中直接獲取到對應的 User 實例對象.

也可以把 joined 換成子查詢, subquery :

class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='all', lazy='subquery') if __name__ == '__main__': session = Session() user = session.query(User).first() session.commit()

子查詢會用到臨時表.

上面定義的:

1 blog_list = relationship('Blog', lazy="dynamic") 2 user_obj = relationship('User', lazy='joined') 3 blog_list = relationship('Blog', lazy='subquery')

都算是一種默認方式. 在具體使用查詢時, 還可以通過 options() 方法定義關聯的獲取方式:

from sqlalchemy.orm import lazyload, joinedload, subqueryload user = session.query(User).options(lazyload('blog_list')).first() print (user.blog_list)

更多的用法:

 1 session.query(Parent).options(  2     joinedload('foo').joinedload('bar').joinedload('bat')  3  ).all()  4 
 5 session.query(A).options(  6     defaultload("atob").joinedload("btoc")  7  ).all()  8 
 9 session.query(MyClass).options(lazyload('*')) 10 
11 session.query(MyClass).options( 12     lazyload('*'), joinedload(MyClass.widget) 13  ) 14 
15 session.query(User, Address).options(Load(Address).lazyload('*'))

如果關聯的定義之前是 Lazy 的, 但是實際使用中, 希望在手工 join 之后, 把關聯對象直接包含進結果實例, 可以使用 contains_eager() 來包裝一下:

1 from sqlalchemy.orm import contains_eager 2 
3 blog = session.query(Blog).join(Blog.user_obj)\ 4  .options(contains_eager(Blog.user_obj)).first() 5 print blog.user_obj

4.5. 關系的表現形式

關系在對象屬性中的表現, 默認是列表, 但是, 這不是唯一的形式. 根據需要, 可以作成 dictionary , set 或者其它你需要的對象.

 1 class Blog(BaseModel):  2     __tablename__ = 'blog'
 3 
 4     id = Column(Integer, autoincrement=True, primary_key=True)  5     title = Column(Unicode(32), server_default='')  6     user = Column(Integer, ForeignKey('user.id'), index=True)  7 
 8     user_obj = relationship('User')  9 
10 
11 class User(BaseModel): 12     __tablename__ = 'user'
13 
14     id = Column(Integer, autoincrement=True, primary_key=True) 15     name = Column(Unicode(32), server_default='') 16 
17     blogs = relationship('Blog')

對於上面的兩個模型:

user = session.query(User).first() print (user.blogs)

現在 user.blogs 是一個列表. 我們可以在 relationship() 調用時通過 collection_class 參數指定一個類, 來重新定義關系的表現形式:

1 user = User(name='XXX') 2 session.add_all([Blog(title='A', user_obj=user), Blog(title='B', user_obj=user)]) 3 session.commit() 4 
5 user = session.query(User).first() 6 print (user.blogs)

set , 集合:

blogs = relationship('Blog', collection_class=set) #InstrumentedSet([<__main__.Blog object at 0x1a58710>, <__main__.Blog object at 0x1a587d0>])

attribute_mapped_collection , 字典, 鍵值從屬性取:

1 from sqlalchemy.orm.collections import attribute_mapped_collection 2 
3 blogs = relationship('Blog', collection_class=attribute_mapped_collection('title')) 4 
5 #{'A': <__main__.Blog object at 0x20ed810>, 'B': <__main__.Blog object at 0x20ed8d0>}
如果 title 重復的話, 結果會覆蓋.
mapped_collection , 字典, 鍵值自定義:
1 from sqlalchemy.orm.collections import mapped_collection 2 
3 blogs = relationship('Blog', collection_class=mapped_collection(lambda blog: blog.title.lower()))

4.6. 多對多關系

先考慮典型的多對多關系結構:

 1 class Blog(BaseModel):  2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True)  5     title = Column(String(64), server_default='', nullable=False)  6 
 7     tag_list = relationship('Tag')  8     tag_list = relationship('BlogAndTag')  9 
10 
11 class Tag(BaseModel): 12     __tablename__ = 'tag'
13 
14     id = Column(BIGINT, primary_key=True, autoincrement=True) 15     name = Column(String(16), server_default='', nullable=False) 16 
17 
18 class BlogAndTag(BaseModel): 19     __tablename__ = 'blog_and_tag'
20 
21     id = Column(BIGINT, primary_key=True, autoincrement=True) 22     blog = Column(BIGINT, ForeignKey('blog.id'), index=True) 23     tag = Column(BIGINT, ForeignKey('tag.id'), index=True) 24     create = Column(BIGINT, index=True, server_default='0')

在 Blog 中的:

tag_list = relationship('Tag')

顯示是錯誤的, 因為在 Tag 中並沒有外鍵. 而:

tag_list = relationship('BlogAndTag')

這樣雖然正確, 但是 tag_list 的關系只是到達 BlogAndTag 這一層, 並沒有到達我們需要的 Tag.

這種情況下, 一個多對多關系是有三張表來表示的, 在定義 relationship 時, 就需要一個secondary 參數來指明關系表:

 1 class Blog(BaseModel):  2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True)  5     title = Column(String(64), server_default='', nullable=False)  6 
 7     tag_list = relationship('Tag', secondary=lambda: BlogAndTag.__table__)
#是用lambda可以使后面跟的類無需提前定義,如果直接secondary=classname,則class需天氣定義
8 9 10 class Tag(BaseModel): 11 __tablename__ = 'tag' 12 13 id = Column(BIGINT, primary_key=True, autoincrement=True) 14 name = Column(String(16), server_default='', nullable=False) 15 16 17 class BlogAndTag(BaseModel): 18 __tablename__ = 'blog_and_tag' 19 20 id = Column(BIGINT, primary_key=True, autoincrement=True) 21 blog = Column(BIGINT, ForeignKey('blog.id'), index=True) 22 tag = Column(BIGINT, ForeignKey('tag.id'), index=True) 23 create = Column(BIGINT, index=True, server_default='0')

這樣, 在操作時可以直接獲取到對應的實例列表:

blog = session.query(Blog).filter(Blog.title == 'a').one() print (blog.tag_list)

訪問 tag_list 時, SQLAlchemy 做的是一個普通的多表查詢.

tag_list 屬性同時支持賦值操作:

session = Session() blog = session.query(Blog).filter(Blog.title == 'a').one() blog.tag_list = [Tag(name='t1')] session.commit()

提交時, SQLAlchemy 總是會創建 Tag , 及對應的關系 BlogAndTag .

而如果是:

1 session = Session() 2 blog = session.query(Blog).filter(Blog.title == 'a').one() 3 blog.tag_list = [] 4 session.commit() 5 
6 tag = session.query(Tag).filter(Tag.name == 'x').one() 7 blog.tag_list.remove(tag) 8 session.commit()

那么 SQLAlchemy 只會刪除對應的關系 BlogAndTag , 不會刪除實體 Tag .

如果你直接刪除實體, 那么對應的關系是不會自動刪除的:

1 session = Session() 2 blog = session.query(Blog).filter(Blog.title == 'a').one() 3 tag = Tag(name='ok') 4 blog.tag_list = [tag] 5 session.commit() 6 
7 tag = session.query(Tag).filter(Tag.name == 'ok').one() 8 session.delete(tag) 9 session.commit()

 

4.7. Cascades 自動關系處理

前面提到的, 當操作關系, 實體時, 與其相關聯的關系, 實體是否會被自動處理的問題, 在 SQLAlchemy 中是通過 Cascades 機制來定義和解決的. ( Cascades 這個詞是來源於Hibernate .)

cascade 是一個 relationship 的參數, 其值是逗號分割的多個字符串, 以表示不同的行為. 默認值是 " save-updatemerge" , 稍后會介紹每個詞項的作用.

這里的所有規則介紹, 只涉及從 Parent 到 Child , Parent 即定義 relationship的類. 不涉及backref .

cascade 所有的可選字符串項是:

  • all , 所有操作都會自動處理到關聯對象上.
  • save-update , 關聯對象自動添加到會話.
  • delete , 關聯對象自動從會話中刪除.
  • delete-orphan , 屬性中去掉關聯對象, 則會話中會自動刪除關聯對象.
  • merge , session.merge() 時會處理關聯對象.
  • refresh-expire , session.expire() 時會處理關聯對象.
  • expunge , session.expunge() 時會處理關聯對象.
save-update
當一個對象被添加進 session 后, 此對象標記為 save-update 的 relationship 關系對象也會同時添加進這個 session .
 1 class Blog(BaseModel): 2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True) 5     title = Column(String(64), server_default='', nullable=False) 6     user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) 7 
 8 
 9 class User(BaseModel): 10     __tablename__ = 'user'
11 
12     id = Column(BIGINT, primary_key=True, autoincrement=True) 13     name = Column(String(32), server_default='', nullable=False) 14 
15     blog_list = relationship('Blog', cascade='') 16     blog_list_auto = relationship('Blog', cascade='save-update') 17 
18 
19 if __name__ == '__main__': 20 
21     session = Session() 22 
23     user = User(name=u'哈哈') 24     blog = Blog(title=u'第一個') 25     user.blog_list = [blog] 26     #user.blog_list_auto = [blog]
27 session.add(user) 28     print (blog in session) 29     session.commit()

 

delete
當一個對象在 session 中被標記為刪除時, 其屬性中 relationship 關聯的對象也會被標記成刪除, 否則, 關聯對象中的對應外鍵字段會被改成 NULL , 不能為 NULL 則報錯.
 1 class Blog(BaseModel): 2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True) 5     title = Column(String(64), server_default='', nullable=False) 6     user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) 7 
 8 
 9 class User(BaseModel): 10     __tablename__ = 'user'
11 
12     id = Column(BIGINT, primary_key=True, autoincrement=True) 13     name = Column(String(32), server_default='', nullable=False) 14 
15     blog_list = relationship('Blog', cascade='save-update, delete') 16 
17 
18 if __name__ == '__main__': 19     session = Session() 20 
21     #user = User(name=u'用戶')
22     #user.blog_list = [Blog(title=u'哈哈')]
23     #session.add(user)
24     user = session.query(User).first() 25 session.delete(user) 26     session.commit()

 

delete-orphan
當 relationship 屬性變化時, 被 "去掉" 的對象會被自動刪除. 比如之前是:
user.blog_list = [blog, blog2]
現在變成:
user.blog_list = [blog2]
那么 blog 這個關聯實體是會自動刪除的. 這各機制只適用於 "一對多" 的關系中, "多對多" 和反過來的 "多對一" 都不適用. 在relationship 定義時, 可以添加 single_parent = True 參數來強制約束. 當然, 在實現上 SQLAlchemy 是會先查出所有關聯實體, 然后計算差集確認哪些需要被刪除.
 1 class Blog(BaseModel): 2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True) 5     title = Column(String(64), server_default='', nullable=False) 6     user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) 7 
 8 
 9 class User(BaseModel): 10     __tablename__ = 'user'
11 
12     id = Column(BIGINT, primary_key=True, autoincrement=True) 13     name = Column(String(32), server_default='', nullable=False) 14 
15     blog_list = relationship('Blog', cascade='save-update, delete-orphan') 16 
17 
18 if __name__ == '__main__': 19 
20     session = Session() 21 
22     #user = User(name=u'用戶')
23     #blog = Blog(title=u'一')
24     #blog2 = Blog(title=u'二')
25     #user.blog_list = [blog, blog2]
26     #session.add(user)
27     user = session.query(User).first() 28     blog2 =  session.query(Blog).filter(Blog.title == u'').first() 29     user.blog_list = [blog2] 30     #session.delete(user)
31     session.commit()

 

merge
這個選項是標識在 session.merge() 時處理關聯對象. session.merge() 的作用, 是把一個會話外的實例, "整合"進會話, 比如 "有則修改, 無則創建" 就是典型的一種 "整合":
 1 user = User(id=1, name="1") 2 session.add(user) 3 session.commit() 4 
 5 user = User(id=1) 6 user = session.merge(user) 7 print user.name 8 
 9 user = User(id=1, name="2") 10 user = session.merge(user) 11 session.commit()

 

cascade 中的 merge 作用:
1 class Blog(BaseModel): 2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True) 5     title = Column(String(64), server_default='', nullable=False) 6     user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) 7 
 8 
 9 class User(BaseModel): 10     __tablename__ = 'user'
11 
12     id = Column(BIGINT, primary_key=True, autoincrement=True) 13     name = Column(String(32), server_default='', nullable=False) 14 
15     blog_list = relationship('Blog', 16                              cascade='save-update, delete, delete-orphan, merge') 17 
18 
19 if __name__ == '__main__': 20 
21     session = Session() 22 
23     user = User(id=1, name='1') 24 session.add(user) 25 session.commit(user) 26 
27     user = User(id=1, blog_list=[Blog(title='哈哈')]) 28 session.merge(user) 29 
30     session.commit()

 

refresh-expire
 1 當使用 session.expire() 標識一個對象過期時, 此對象的關聯對象是否也被標識為過期(訪問屬性會重新查詢數據庫). 2 
 3 class Blog(BaseModel): 4     __tablename__ = 'blog'
 5 
 6     id = Column(BIGINT, primary_key=True, autoincrement=True) 7     title = Column(String(64), server_default='', nullable=False) 8     user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) 9 
10 
11 class User(BaseModel): 12     __tablename__ = 'user'
13 
14     id = Column(BIGINT, primary_key=True, autoincrement=True) 15     name = Column(String(32), server_default='', nullable=False) 16 
17     blog_list = relationship('Blog', 18             cascade='save-update, delete, delete-orphan, merge, refresh-expire') 19 
20 
21 if __name__ == '__main__': 22 
23     session = Session() 24 
25     #user = User(id=1, name='1')
26     #blog = Blog(title="abc")
27     #user.blog_list = [blog]
28     #session.add(user)
29 
30     user = session.query(User).first() 31     blog = user.blog_list[0] 32     print (user.name) 33     print (blog.title) 34 session.expire(user) 35     print ('EXPIRE') 36     print (user.name) 37     print (blog.title) 38 
39     session.commit()

 

expunge
與 merge 相反, 當 session.expunge() 把對象從會話中去除的時候, 此對象的關聯對象也同時從會話中消失.
 1 class Blog(BaseModel):  2     __tablename__ = 'blog'
 3 
 4     id = Column(BIGINT, primary_key=True, autoincrement=True)  5     title = Column(String(64), server_default='', nullable=False)  6     user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)  7 
 8 
 9 class User(BaseModel): 10     __tablename__ = 'user'
11 
12     id = Column(BIGINT, primary_key=True, autoincrement=True) 13     name = Column(String(32), server_default='', nullable=False) 14 
15     blog_list = relationship('Blog', cascade='delete, delete-orphan, expunge') 16 
17 
18 if __name__ == '__main__': 19 
20     session = Session() 21     user = User(name=u'用戶') 22     blog = Blog(title=u'第一個') 23     user.blog_list = [blog] 24 
25  session.add(user) 26  session.add(blog) 27 
28  session.expunge(user) 29     print (blog in session) 30 
31     #session.commit() 

 

4.8. 屬性代理
考慮這樣的情況, 關系是關聯的整個模型對象的, 但是, 有時我們對於這個關系, 並不關心整個對象, 只關心其中的某個屬性. 考慮下面的場景:
 1 from sqlalchemy.ext.associationproxy import association_proxy 2 
 3 class Blog(BaseModel): 4     __tablename__ = 'blog'
 5 
 6     id = Column(Integer, autoincrement=True, primary_key=True) 7     title = Column(Unicode(32), nullable=False, server_default='') 8     user = Column(Integer, ForeignKey('user.id'), index=True) 9 
10 
11 class User(BaseModel): 12     __tablename__ = 'user'
13 
14     id = Column(Integer, autoincrement=True, primary_key=True) 15     name = Column(Unicode(32), nullable=False, server_default='') 16 
17     blog_list = relationship('Blog') 18     blog_title_list = association_proxy('blog_list', 'title')
blog_list 是一個正確的一對多關系. 下面的 blog_title_list 就是這個關系上的一個屬性代理.blog_title_list 只處理 blog_list 這個關系中對應的對象的 title 屬性, 包括獲取和設置兩個方向.
1 session = Session() 2 
3 user = User(name='xxx') 4 user.blog_list = [Blog(title='ABC')] 5 session.add(user) 6 session.commit() 7 
8 user = session.query(User).first() 9 print (user.blog_title_list)
上面是獲取屬性的示例. 在"設置", 或者說"創建"時, 直接操作是有錯的:
1 user = session.query(User).first() 2 user.blog_title_list = ['NEW'] 3 session.add(user) 4 session.commit()
原因在於, 對於類 Blog 的初始化形式. association_proxy('blog_list', 'title') 中的 title只是獲取時的屬性定義, 而在上面的設置過程中, 實際上的調用形式為:
Blog('NEW')
Blog 類沒有明確定義 __init__() 方法, 所有這種形式的調用會報錯. 可以把 __init__() 方法補上:
這樣調用就沒有問題了.
1 class Blog(BaseModel): 2     __tablename__ = 'blog'
3 
4     id = Column(Integer, autoincrement=True, primary_key=True) 5     title = Column(Unicode(32), nullable=False, server_default='') 6     user = Column(Integer, ForeignKey('user.id'), index=True) 7 
8     def __init__(self, title): 9         self.title = title
另一個方法, 是在調用 association_proxy() 時使用 creator 參數明確定義"值"和"實例"的關系:
1 class User(BaseModel): 2     __tablename__ = 'user'
3 
4     id = Column(Integer, autoincrement=True, primary_key=True) 5     name = Column(Unicode(32), nullable=False, server_default='') 6 
7     blog_list = relationship('Blog') 8     blog_title_list = association_proxy('blog_list', 'title', 9                                         creator=lambda t: User(title=t))
creator 定義的方法, 返回的對象可以被對應的 blog_list 關系接收即可.
在查詢方面, 多對一 的關系代理上, 可以直接使用屬性:、
1 class Blog(BaseModel): 2     __tablename__ = 'blog'
3 
4     id = Column(Integer, autoincrement=True, primary_key=True) 5     title = Column(Unicode(32), server_default='') 6     user = Column(Integer, ForeignKey('user.id'), index=True) 7 
8     user_obj = relationship('User') 9     user_name = association_proxy('user_obj', 'name')
查詢:
blog = session.query(Blog).filter(Blog.user_name == 'XX').first()
反過來的 一對多 關系代理上, 可以使用 contains() 函數:
user = session.query(User).filter(User.blogs_title.contains('A')).first()

 

5. SQLAlchemy會話與事務控制

5.1. 基本使用

SQLAlchemy 的 session 是用於管理數據庫操作的一個像容器一樣的東西. 模型實例對象本身獨立存在, 而要讓其修改(創建)生效, 則需要把它們加入某個 session . 同時你也可以把模型實例對象從 session 中去除. 被 session 管理的實例對象, 在 session.commit() 時被提交到數據庫. 同時 session.rollback() 是回滾變更.

session.flush() 的作用是在事務管理內與數據庫發生交互, 對應的實例狀態被反映到數據庫. 比如自增 ID 被填充上值.

1 user = User(name='名字') 2 session.add(user) 3 session.commit()
1 try: 2     user = session.Query(User).first() 3     user.name = u'改名字
4  session.commit() 5 except: 6     session.rollback()

5.2. for update

SQLAlchemy 的 Query 支持 select ... for update / share .

session.Query(User).with_for_update().first() session.Query(User).with_for_update(read=True).first()

完整形式是:

with_for_update(read=False, nowait=False, of=None)
read
是標識加互斥鎖還是共享鎖. 當為 True 時, 即 for share 的語句, 是共享鎖. 多個事務可以獲取共享鎖, 互斥鎖只能一個事務獲取. 有"多個地方"都希望是"這段時間我獲取的數據不能被修改, 我也不會改", 那么只能使用共享鎖.
nowait
其它事務碰到鎖, 是否不等待直接"報錯".
of
指明上鎖的表, 如果不指明, 則查詢中涉及的所有表(行)都會加鎖.

5.3. 事務嵌套

SQLAlchemy 中的事務嵌套有兩種情況. 一是在 session 中管理的事務, 本身有層次性. 二是 session 和原始的 connection 之間, 是一種層次關系, 在這 session , connection 兩個概念之中的事務同樣具有這樣的層次.

session 中的事務, 可能通過 begin_nested() 方法做 savepoint :

1 session.add(u1) 2 session.add(u2) 3 
4 session.begin_nested() 5 session.add(u3) 6 session.rollback() # rolls back u3, keeps u1 and u2
7 
8 session.commit()

或者使用上下文對象:

1 for record in records: 2     try: 3  with session.begin_nested(): 4  session.merge(record) 5     except: 6         print "Skipped record %s" % record 7 session.commit()

嵌套的事務的一個效果, 是最外層事務提交整個變更才會生效.

1 user = User(name='2') 2 
3 session.begin_nested() 4 session.add(user) 5 session.commit() 6 
7 session.rollback()

於是, 前面說的第二種情況有一種應用方式, 就是在 connection 上做一個事務, 最終也在 connection 上回滾這個事務, 如果 session 是 bind 到這個連接上的, 那么 session 上所做的更改全部不會生效:

 1 conn = Engine.connect()  2 session = Session(bind=conn)  3 trans = conn.begin()  4 
 5 user = User(name='2')  6 session.begin_nested()  7 session.add(user)  8 session.commit()  9 
10 session.commit() 11 
12 trans.rollback()

在測試中這種方式可能會有用.

5.4. 二段式提交

二段式提交, Two-Phase, 是為解決分布式環境下多點事務控制的一套協議.

與一般事務控制的不同是, 一般事務是 begin, 之后 commit 結束.

而二段式提交的流程上, begin 之后, 是 prepare transaction 'transaction_id' , 這時相關事務數據已經持久化了. 之后, 再在任何時候(哪怕重啟服務), 作 commit prepared 'transaction_id' 或者 rollback prepared 'transaction_id' .

從多點事務的控制來看, 應用層要做的事是, 先把任務分發出去, 然后收集"事務准備"的狀態(prepare transaction 的結果). 根據收集的結果決定最后是 commit 還是 rollback .

簡單來說, 就是事務先保存, 再說提交的事.

SQLAlchemy 中對這個機制的支持, 是在構建會話類是加入 twophase 參數:

Session = sessionmaker(twophase=True)

然后會話類可以根據一些策略, 綁定多個 Engine , 可以是多個數據庫連接, 比如:

Session = sessionmaker(twophase=True) Session.configure(binds={User: Engine, Blog: Engine2})

這樣, 在獲取一個會話實例之后, 就處在二段式提交機制的支持之下, SQLAlchemy 自己會作多點的協調了. 完整的流程:

 1 Engine = create_engine('postgresql://test@localhost:5432/test', echo=True)  2 Engine2 = create_engine('postgresql://test@localhost:5432/test2', echo=True)  3 
 4 Session = sessionmaker(twophase=True)  5 
 6 Session.configure(binds={User: Engine, Blog: Engine2})  7 session = Session()  8 
 9 user = User(name='名字') 10 session.add(user) 11 session.commit()

對應的 SQL 大概就是:

1 begin; 2 insert into "user" (name) values (?); 3 prepare transaction 'xx'; 4 commit prepared 'xx';

使用時, Postgresql 數據庫需要把 max_prepared_transactions 這個配置項的值改成大於 0 

 

 

6. SQLAlchemy字段類型

6.1. 基本類型

字段類型是在定義模型時, 對每個 Column 的類型約定. 不同類型的字段類型在輸入輸出上, 及支持的操作方面, 有所區別.

這里只介紹 sqlalchemy.types.* 中的類型, SQL 標准類型方面, 是寫什么最后生成的 DDL 語句就是什么, 比如 BIGINTBLOG 這些, 但是這些類型並不一定在所有數據庫中都有支持. 除此而外, SQLAlchemy 也支持一些特定數據庫的特定類型, 這些需要從具體的 dialects 實現里導入.

Integer/BigInteger/SmallInteger
整形.
Boolean
布爾類型. Python 中表現為 True/False , 數據庫根據支持情況, 表現為 BOOLEAN 或SMALLINT . 實例化時可以指定是否創建約束(默認創建).
Date/DateTime/Time (timezone=False)
日期類型, Time 和 DateTime 實例化時可以指定是否帶時區信息.
Interval
時間偏差類型. 在 Python 中表現為 datetime.timedelta() , 數據庫不支持此類型則存為日期.
Enum (*enums, **kw)
枚舉類型, 根據數據庫支持情況, SQLAlchemy 會使用原生支持或者使用 VARCHAR 類型附加約束的方式實現. 原生支持中涉及新類型創建, 細節在實例化時控制.
Float
浮點小數.
Numeric (precision=None, scale=None, decimal_return_scale=None, ...)
定點小數, Python 中表現為 Decimal .
LargeBinary (length=None)
字節數據. 根據數據庫實現, 在實例化時可能需要指定大小.
PickleType
Python 對象的序列化類型.
String (length=None, collation=None, ...)
字符串類型, Python 中表現為 Unicode , 數據庫表現為 VARCHAR , 通常都需要指定長度.
Unicode
類似與字符串類型, 在某些數據庫實現下, 會明確表示支持非 ASCII 字符. 同時輸入輸出也強制是 Unicode 類型.
Text
長文本類型, Python 表現為 Unicode , 數據庫表現為 TEXT .
UnicodeText
參考 Unicode .

 

 

7. SQLAlchemy混合屬性機制

7.1. 直接行為

混合屬性, 官方文檔中稱之為 Hybrid Attributes . 這種機制表現為, 一個屬性, 在類和層面, 和實例的層面, 其行為是不同的. 之所以需要關注這部分的差異, 原因源於 Python 上下文和 SQL 上下文的差異.

類 層面經常是作為 SQL 查詢時的一部分, 它面向的是 SQL 上下文. 而實例是已經得到或者創建的結果, 它面向的是 Python 上下文.

定義模型的 Column() 就是一個典型的混合屬性. 作為實例屬性時, 是具體的對象值訪問, 而作為類屬性時, 則有構成 SQL 語句表達式的功能.

1 class Interval(BaseModel): 2     __tablename__ = 'interval'
3 
4     id = Column(Integer, autoincrement=True, primary_key=True) 5     start = Column(Integer) 6     end = Column(Integer) 7 
8 session.add(Interval(start=0, end=100)) 9 session.commit()

實例行為:

ins = session.query(Interval).first() print (ins.end - ins.start)

類行為:

ins = session.query(Interval).filter(Interval.end - Interval.start > 10).first()

這種機制其實一直在被使用, 但是可能大家都沒有留意一個屬性在類和實例上的區別.

如果屬性需要被進一步封裝, 那么就需要明確聲明 Hybrid Attributes 了:

 1 from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method  2 
 3 class Interval(BaseModel):  4     __tablename__ = 'interval'
 5 
 6     id = Column(Integer, autoincrement=True, primary_key=True)  7     start = Column(Integer)  8     end = Column(Integer)  9 
10  @hybrid_property 11     def length(self): 12         return self.end - self.start 13 
14  @hybrid_method 15     def bigger(self, i): 16         return self.length > i 17 
18 
19 session.add(Interval(start=0, end=100)) 20 session.commit() 21 
22 ins = session.query(Interval).filter(Interval.length > 10).first() 23 ins = session.query(Interval).filter(Interval.bigger(10)).first() 24 print( ins.bigger(1))

setter 的定義同樣使用對應的裝飾器即可:

 1 class Interval(BaseModel):  2     __tablename__ = 'interval'
 3 
 4     id = Column(Integer, autoincrement=True, primary_key=True)  5     start = Column(Integer)  6     end = Column(Integer)  7 
 8  @hybrid_property  9     def length(self): 10         return abs(self.end - self.start) 11 
12  @length.setter 13     def length(self, l): 14         self.end = self.start + l

7.2. 表達式行為

前面說的屬性, 在類和實例上有不同行為, 可以看到, 在類上的行為, 其實就是生成 SQL 表達式時的行為. 上面的例子只是簡單的運算, SQLAlchemy 可以自動處理好 Python 函數和 SQL 函數的區別. 但是如果是一些特性更強的 SQL 函數, 就需要手動指定了. 於時, 這時的情況變成, 實例行為是 Python 范疇的調用行為, 而類行為則是生成 SQL 函數的相關表達式.

同時是前面的例子, 對於 length 的定義, 更嚴格上來說, 應該是取絕對值的.

 1 class Interval(BaseModel):  2     __tablename__ = 'interval'
 3 
 4     id = Column(Integer, autoincrement=True, primary_key=True)  5     start = Column(Integer)  6     end = Column(Integer)  7 
 8  @hybrid_property  9     def length(self): 10         return abs(self.end - self.start)

但是, 如果使用了 Python 的 abs() 函數, 在生成 SQL 表達式時顯示有無法處理了. 所以, 需要手動定義:

 1 from sqlalchemy import func  2 
 3 class Interval(BaseModel):  4     __tablename__ = 'interval'
 5 
 6     id = Column(Integer, autoincrement=True, primary_key=True)  7     start = Column(Integer)  8     end = Column(Integer)  9 
10  @hybrid_property 11     def length(self): 12         return abs(self.end - self.start) 13 
14  @length.expression 15     def length(self): 16         return func.abs(self.end - self.start)

這樣查詢時就可以直接使用:

ins = session.query(Interval).filter(Interval.length > 1).first()

7.3. 應用於關系

總體上沒有特別之處:

 1 class Account(BaseModel):  2     __tablename__ = 'account'
 3 
 4     id = Column(Integer, autoincrement=True, primary_key=True)  5     user = Column(Integer, ForeignKey('user.id'), index=True)  6     balance = Column(Integer, server_default='0')  7 
 8 
 9 class User(BaseModel): 10     __tablename__ = 'user'
11 
12     id = Column(Integer, autoincrement=True, primary_key=True) 13     name = Column(Unicode(32), nullable=False, server_default='') 14 
15     accounts = relationship('Account') 16     #balance = association_proxy('accounts', 'balance')
17 
18  @hybrid_property 19     def balance(self): 20         return sum(x.balance for x in self.accounts)

查詢時:

user = session.query(User).first() print (user.balance)

這里涉及的東西都是 Python 自己的, 包括那個 sum() 函數, 和 SQL 沒有關系.

如果想實現的是, 使用 SQL 的 sum() 函數, 取出指定用戶的總賬戶金額數, 那么就要考慮把balance 作成表達式的形式:

1 from sqlalchemy import select 2 
3 @hybrid_property 4 def balance(self): 5     return select([func.sum(Account.balance)]).where(Account.user == self.id).label('balance_v') 6     #return func.sum(Account.balance)

這樣的話, User.balance 只是單純的一個表達式了, 查詢時指定字段:

user = session.query(User, User.balance).first() print (user.balance_v)

注意, 如果寫成:

session.query(User.balance).first()

意義就不再是"獲取第一個用戶的總金額", 而變成"獲取總金額的第一個". 這里很坑吧.

像上面這樣改, 實例層面就無法使用 balance 屬性. 所以, 還是先前介紹的, 表達式可以單獨處理:

1 @hybrid_property 2 def balance(self): 3     return sum(x.balance for x in self.accounts) 4 
5 @balance.expression 6 def balance(self): 7     return select([func.sum(Account.balance)]).where(Account.user == self.id).label('balance_v')

定義了表達式的 balance , 這部分作為查詢條件上當然也是可以的:

user = session.query(User).filter(User.balance > 1).first()

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM