查詢
-
# -*- coding: utf-8 -*-
-
from sqlalchemy.orm import sessionmaker
-
from SQLAlchemy.create import engine,User
-
Session = sessionmaker(engine)
-
db_session = Session()
-
-
query = db_session.query(User).filter(User.name.like('%2%')).order_by(User.id)
-
-
# ------> 查詢 all list
-
print(query.all())
-
-
# ------> first() 第一個
-
print(query.first())
-
-
# ------> one() 如果這個結果集少於或者多於一條數據, 結論有且只有一條數據的時候才會正常的返回,否則拋出異常
-
print(query.one())
-
-
# ------> one_or_none() 在結果集中沒有數據的時候也不會拋出異常
-
-
# ------> scalar() 底層調用one()方法,並且如果one()方法沒有拋出異常,會返回查詢到的第一列的數據
-
query2 = db_session.query(User).filter(User.name=="2").scalar()
-
print(query2)
-
-
# ------> text
-
from sqlalchemy import text
-
query3 = db_session.query(User).filter(text("id<3")).order_by(text('id'))
-
print(query3.all())
-
-
# text 帶變量方式 用 :傳入變量,用params接收
-
query4 = db_session.query(User).filter(text("id<:value")).params(value=4)
-
print(query4.all())
-
-
# from_statement 原生sql語句
-
query5 = db_session.query(User).from_statement(text("select * from user where id>:value")).params(value=2).all()
-
print(query5)
-
-
# ------> and_ or_ 普通的表達式在這里面不好使
-
from sqlalchemy.sql import and_, or_
-
query6 = db_session.query(User).filter(and_(User.id==1, User.name=="ewqe")).first()
-
print(query6.id)
-
# 結果 1
-
-
query7 = db_session.query(User).filter(or_(User.id==1, User.name=="rre")).all()
-
print([i.id for i in query7])
-
# 結果 [1, 3]
-
-
# ------> between 大於多少小於多少
-
query8 = db_session.query(User).filter(User.id.between(1, 3)).all()
-
print(query8)
-
-
# ------> in_ 在里面對應還有not in等
-
query9 = db_session.query(User).filter(User.id.in_([1])).all()
-
print(query9)
-
-
# ------> synchronize_session
-
# synchronize_session=False 在原有值的基礎上增加和刪除 099 str
-
db_session.query(User).filter(User.id > 0).update({User.name: User.name + '099'}, synchronize_session=False)
-
db_session.commit()
-
-
# synchronize_session=evaluate 在原有值的基礎上增加和減少 11, 必須是整數類型
-
db_session.query(User).filter(User.id > 0).update({"name": User.name + 1}, synchronize_session="evaluate")
-
db_session.commit()
-
-
# 如果查詢條件里有in_,需要在delete()中加如下參數: fetch 刪除的更快
-
query10 = db_session.query(User).filter(User.id.in_([1, 2])).delete(synchronize_session='fetch')
-
print(query10) # 返回找到的數量
-
db_session.commit()
-
計數(Count)
-
有時候你想明確的計數,比如要統計users表中有多少個不同的姓名,
-
那么簡單粗暴的采用以上count是不行的,因為姓名有可能會重復,
-
但是處於兩條不同的數據上,如果在原生數據庫中,可以使用distinct關鍵字,
-
那么在SQLAlchemy中,可以通過func.count()方法來實現
-
from sqlalchemy import func
-
query1 = db_session.query(func.count(User.name)).first() # (4,)
-
-
# 如果想實現select count(*) from users,可以通過以下方式來實現:
-
query2 = db_session.query(func.count("*")).select_from(User).scalar() # 4
-
-
# 如果指定了要查找的表的字段,可以省略select_from()方法:
-
query3 = db_session.query(func.count(User.id)).scalar() # 4
排序
-
order_by:可以指定根據這個表中的某個字段進行排序,如果在前面加了一個-,代表的是降序排序
-
正向排序和反向排序:默認情況是從小到大,從前到后排序的,如果想要反向排序,可以調用排序的字段的desc方法。
一. relationship的order_by參數
teachers = relationship('Teacher',secondary=association_table, back_populates='classes', order_by=Teacher.name)
二. 在模型定義中,添加以下代碼:
-
__mapper_args__ = {
-
"order_by": name
-
}
limit、offset和切片
-
limit:可以限制每次查詢的時候只查詢幾條數據。
-
offset:可以限制查找數據的時候過濾掉前面多少條。
-
切片:可以對Query對象使用切片操作,來獲取想要的數據。
-
-
all = db_session.query(Teacher).limit(2).all()
-
print([i.id for i in all ])
-
# 結果 [1, 2]
-
-
all2 = db_session.query(Teacher).limit(2).offset(1).all()
-
print([i.id for i in all2])
-
# 結果 [2, 3]
懶加載
-
在一對多,或者多對多的時候,如果想要獲取多的這一部分的數據的時候,往往能通過一個屬性就可以全部獲取了。
-
比如有一個作者,想要或者這個作者的所有文章,那么可以通過user.articles就可以獲取所有的。
-
但有時候我們不想獲取所有的數據,比如只想獲取這個作者今天發表的文章,
-
那么這時候我們可以給relationship傳遞一個lazy='dynamic',
-
以后通過user.articles獲取到的就不是一個列表,而是一個AppendQuery對象了。
-
這樣就可以對這個對象再進行一層過濾和排序等操作
group_by
比如我想根據名字來分組, 統計每個名字分組里面有多少人
-
from sqlalchemy import func
-
# 我想根據名字來分組, 統計每個名字分組里面有多少人
-
all = db_session.query(Teacher.name, func.count(Teacher.id)).group_by(Teacher.name).all()
having
-
having是對查找結果進一步過濾。比如只想要看未成年人的數量,那么可以首先對年齡進行分組統計人數,然后再對分組進行having過濾
-
-
result = session.query(User.age,func.count(User.id)).group_by(User.age).having(User.age >= 18).all()
join方法
表結構
-
class Address(Base):
-
__tablename__ = 'address'
-
id = Column(Integer, primary_key=True)
-
name = Column(String(32))
-
user_id = Column(Integer,ForeignKey("user.id")) # 外鍵關系 Fk
-
-
-
class User(Base):
-
__tablename__ = 'user'
-
id = Column(Integer, primary_key=True)
-
name = Column(String(32))
-
---> 普通查詢
-
all = db_session.query(User, Address).filter(User.id == Address.user_id).all()
-
for u, a in all:
-
print(u.name, a.name)
-
-
輸出結果
-
peach 地址1
-
peach 地址2
-
peach 地址3
-
-
---> join查詢
-
all = db_session.query(User, Address).join(Address).all()
-
for u, a in all:
-
print(u.name, a.name)
-
-
輸出結果
-
peach 地址1
-
peach 地址2
-
peach 地址3
-
-
---> outerjoin查詢 left join是以左邊為准 right 是以右邊為准
-
all = db_session.query(User, Address).outerjoin(Address).all()
-
for u in all:
-
print(u)
-
-
輸出結果
-
(<__main__.User object at 0x7fb9e3160518>, <__main__.Address object at 0x7fb9e3160588>)
-
(<__main__.User object at 0x7fb9e3160518>, <__main__.Address object at 0x7fb9e31605f8>)
-
(<__main__.User object at 0x7fb9e3160518>, <__main__.Address object at 0x7fb9e3160668>)
-
(<__main__.User object at 0x7fb9e31606d8>, None)
別名
-
# 當多表查詢的時候,有時候同一個表要用到多次,這時候用別名就可以方便的解決命名沖突的問題了
-
from sqlalchemy.orm import aliased
-
adalias1 = aliased(Address)
-
adalias2 = aliased(Address)
-
-
for username, ad1name, ad2name in db_session.query(User.name, adalias1.name, adalias2.name).join(adalias1).all():
-
print(username, ad1name, ad2name)
子查詢
-
# 構造子查詢
-
# 首先查詢用戶有多少地址
-
from sqlalchemy.sql import func
-
addr = db_session.query(Address.user_id.label('user_id'), func.count("*").label('address_count')).group_by(Address.user_id).all()
-
# 結果
-
# [(1, 3)] 查出用戶id1 有3個地址
-
.label('user_id') 是取的別名
-
# 將子查詢放到父查詢中
-
addrs = db_session.query(Address.user_id.label('user_id'), func.count("*").label('address_count')).group_by(Address.user_id).subquery() # 子查詢語句
-
for u,count in db_session.query(User,addrs.c.address_count).outerjoin(addrs,User.id==addrs.c.user_id).order_by(User.id):
-
print(u.name,count)
-
-
# 結果
-
# peach 3
-
# peach2 None
-
-
# 一個查詢如果想要變為子查詢,則是通過subquery()方法實現
-
# 變成子查詢后,通過子查詢.c屬性來訪問查詢出來的列
-
# 以上方式只能查詢某個對象的具體字段
以上方式只能查詢某個對象的具體字段,如果要查找整個實體,則需要通過aliased方法