1. join 查詢
假設這樣一個業務場景,知道一個郵箱地址,要查詢這個地址所屬的用戶,第一個辦法是用連接多個 filter() 來查詢。
for u, a in session.query(User, Address).filter(User.id==Address.user_id).filter(Address.email_address=='jack@google.com').all(): print(u) print(a) # 執行結果 jack jack@google.com
更簡便的方法是使用 join() 方法:
u = session.query(User).join(Address).filter(Address.email_address=='jack@google.com').one() print(u) # 執行結果 jack
Query.join() 知道如何在 User 和 Address 之間進行連接,因為我們設定了外鍵。假如我們沒有指定外鍵,比如這樣:
class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) fullname = Column(String(50)) password = Column(String(12)) class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer)
我們可以用下面方法來讓 join 生效:
query.join(Address, User.id==Address.user_id) # explicit condition query.join(User.addresses) # specify relationship from left to right query.join(Address, User.addresses) # same, with explicit target query.join('addresses') # same, using a string
例子:
session.query(User).join(Address, User.id==Address.user_id).filter(Address.email_address=='jack@google.com').all()
2. 子查詢(subquery)
現在需要查詢每個用戶所擁有的郵箱地址數量,思路是先對 addresses 表按用戶 ID 分組,統計各組數量,這樣我們得到一張新表;然后用 JOIN 連接新表和 users 兩個表,在這里,我們應該使用 LEFT OUTER JOIN,因為使用 INTER JOIN 所得出的新表只包含兩表的交集。
如果上面的暫時看不懂,我們先來看看第一個 stmt 的情況。
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).all() for i in stmt: print(i) # 執行結果 (5, 2)
可以理解成 group_by() 方法生成了一張新的表,該表有兩列,第一列是 user_id ,第二列是該 user_id 所擁有的 addresses 的數量,這個值由 func() 跟着的方法產生,我們可以使用 c() 方法來訪問這個值。
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery() q = session.query(User, stmt.c.address_count).outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id).all() for i in q: print(i) # 執行結果 (ed, None) (wendy, None) (mary, None) (fred, None) (jack, 2)
如果不用 outerjoin() 而使用 join(),就等於使用 SQL 中的 INTER JOIN,所得出的表只為兩者交集,不會包含 None 值的列。
from sqlalchemy.sql import func stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery() q = session.query(User, stmt.c.address_count).join(stmt, User.id==stmt.c.user_id).order_by(User.id).all() for i in q: print(i) # 執行結果 (jack, 2)
3.使用別名(aliased)
SQLAlchemy 使用 aliased() 方法表示別名,當我們需要把同一張表連接多次的時候,常常需要用到別名。
from sqlalchemy.orm import aliased # 把 Address 表分別設置別名 adalias1 = aliased(Address) adalias2 = aliased(Address) for username, email1, email2 in session.query(User.name, adalias1.email_address, adalias2.email_address).join(adalias1, User.addresses).join(adalias2, User.addresses).filter(adalias1.email_address=='jack@google.com').filter(adalias2.email_address=='j25@yahoo.com'): print(username, email1, email2) # 執行結果 jack jack@google.com j25@yahoo.com
上述代碼查詢同時擁有兩個名為:"jack@google.com" 和 "j25@yahoo.com" 郵箱地址的用戶。
別名也可以在子查詢里使用:
from sqlalchemy.orm import aliased stmt = session.query(Address).filter(Address.email_address != 'j25@yahoo.com').subquery() adalias = aliased(Address, stmt) for user, address in session.query(User, adalias).join(adalias, User.addresses): print(user) print(address) # 執行結果 jack jack@google.com
4. EXISTS 關鍵字
EXISTS 關鍵字可以在某些場景替代 JOIN 的使用。
from sqlalchemy.sql import exists stmt = exists().where(Address.user_id==User.id) for name, in session.query(User.name).filter(stmt): print(name) # 執行結果 jack
使用 any() 方法也能得到同意的效果:
for name, in session.query(User.name).filter(User.addresses.any()): print(name)
使用 any() 方法時也可加上查詢條件:
for name, in session.query(User.name).filter(User.addresses.any(Address.email_address.like('%google%'))): print(name)
使用 has() 方法也能起到 JOIN 的作用:
session.query(Address).filter(~Address.user.has(User.name=='jack')).all()
注意:這里的 ~ 符號是 “不” 的意思。
關系運算符
1. 等於、不等於
query = session.query(Address) jack = session.query(User).filter(User.name == 'jack').one() # 篩選 user 為 jack 的郵箱 query.filter(Address.user == jack) # 篩選 user 不為 jack 的郵箱 query.filter(Address.user != jack)
2. 為空、不為空
# 篩選 user 為空的郵箱 query.filter(Address.user == None) # 篩選 user 不為空的郵箱 query.filter(Address.user != None)
3. 包含
query = session.query(User) address = session.query(Address).filter(Address.id == 1).one() # 篩選包含某地址的用戶 query.filter(User.addresses.contains(address))
