查询
-
# -*- coding: utf-8 -*-
-
from sqlalchemy.orm import sessionmaker
-
from SQLAlchemy.create import engine,User
-
Session = sessionmaker(engine)
-
db_session = Session()
-
-
query = db_session.query(User).filter(User.name.like('%2%')).order_by(User.id)
-
-
# ------> 查询 all list
-
print(query.all())
-
-
# ------> first() 第一个
-
print(query.first())
-
-
# ------> one() 如果这个结果集少于或者多于一条数据, 结论有且只有一条数据的时候才会正常的返回,否则抛出异常
-
print(query.one())
-
-
# ------> one_or_none() 在结果集中没有数据的时候也不会抛出异常
-
-
# ------> scalar() 底层调用one()方法,并且如果one()方法没有抛出异常,会返回查询到的第一列的数据
-
query2 = db_session.query(User).filter(User.name=="2").scalar()
-
print(query2)
-
-
# ------> text
-
from sqlalchemy import text
-
query3 = db_session.query(User).filter(text("id<3")).order_by(text('id'))
-
print(query3.all())
-
-
# text 带变量方式 用 :传入变量,用params接收
-
query4 = db_session.query(User).filter(text("id<:value")).params(value=4)
-
print(query4.all())
-
-
# from_statement 原生sql语句
-
query5 = db_session.query(User).from_statement(text("select * from user where id>:value")).params(value=2).all()
-
print(query5)
-
-
# ------> and_ or_ 普通的表达式在这里面不好使
-
from sqlalchemy.sql import and_, or_
-
query6 = db_session.query(User).filter(and_(User.id==1, User.name=="ewqe")).first()
-
print(query6.id)
-
# 结果 1
-
-
query7 = db_session.query(User).filter(or_(User.id==1, User.name=="rre")).all()
-
print([i.id for i in query7])
-
# 结果 [1, 3]
-
-
# ------> between 大于多少小于多少
-
query8 = db_session.query(User).filter(User.id.between(1, 3)).all()
-
print(query8)
-
-
# ------> in_ 在里面对应还有not in等
-
query9 = db_session.query(User).filter(User.id.in_([1])).all()
-
print(query9)
-
-
# ------> synchronize_session
-
# synchronize_session=False 在原有值的基础上增加和删除 099 str
-
db_session.query(User).filter(User.id > 0).update({User.name: User.name + '099'}, synchronize_session=False)
-
db_session.commit()
-
-
# synchronize_session=evaluate 在原有值的基础上增加和减少 11, 必须是整数类型
-
db_session.query(User).filter(User.id > 0).update({"name": User.name + 1}, synchronize_session="evaluate")
-
db_session.commit()
-
-
# 如果查询条件里有in_,需要在delete()中加如下参数: fetch 删除的更快
-
query10 = db_session.query(User).filter(User.id.in_([1, 2])).delete(synchronize_session='fetch')
-
print(query10) # 返回找到的数量
-
db_session.commit()
-
计数(Count)
-
有时候你想明确的计数,比如要统计users表中有多少个不同的姓名,
-
那么简单粗暴的采用以上count是不行的,因为姓名有可能会重复,
-
但是处于两条不同的数据上,如果在原生数据库中,可以使用distinct关键字,
-
那么在SQLAlchemy中,可以通过func.count()方法来实现
-
from sqlalchemy import func
-
query1 = db_session.query(func.count(User.name)).first() # (4,)
-
-
# 如果想实现select count(*) from users,可以通过以下方式来实现:
-
query2 = db_session.query(func.count("*")).select_from(User).scalar() # 4
-
-
# 如果指定了要查找的表的字段,可以省略select_from()方法:
-
query3 = db_session.query(func.count(User.id)).scalar() # 4
排序
-
order_by:可以指定根据这个表中的某个字段进行排序,如果在前面加了一个-,代表的是降序排序
-
正向排序和反向排序:默认情况是从小到大,从前到后排序的,如果想要反向排序,可以调用排序的字段的desc方法。
一. relationship的order_by参数
teachers = relationship('Teacher',secondary=association_table, back_populates='classes', order_by=Teacher.name)
二. 在模型定义中,添加以下代码:
-
__mapper_args__ = {
-
"order_by": name
-
}
limit、offset和切片
-
limit:可以限制每次查询的时候只查询几条数据。
-
offset:可以限制查找数据的时候过滤掉前面多少条。
-
切片:可以对Query对象使用切片操作,来获取想要的数据。
-
-
all = db_session.query(Teacher).limit(2).all()
-
print([i.id for i in all ])
-
# 结果 [1, 2]
-
-
all2 = db_session.query(Teacher).limit(2).offset(1).all()
-
print([i.id for i in all2])
-
# 结果 [2, 3]
懒加载
-
在一对多,或者多对多的时候,如果想要获取多的这一部分的数据的时候,往往能通过一个属性就可以全部获取了。
-
比如有一个作者,想要或者这个作者的所有文章,那么可以通过user.articles就可以获取所有的。
-
但有时候我们不想获取所有的数据,比如只想获取这个作者今天发表的文章,
-
那么这时候我们可以给relationship传递一个lazy='dynamic',
-
以后通过user.articles获取到的就不是一个列表,而是一个AppendQuery对象了。
-
这样就可以对这个对象再进行一层过滤和排序等操作
group_by
比如我想根据名字来分组, 统计每个名字分组里面有多少人
-
from sqlalchemy import func
-
# 我想根据名字来分组, 统计每个名字分组里面有多少人
-
all = db_session.query(Teacher.name, func.count(Teacher.id)).group_by(Teacher.name).all()
having
-
having是对查找结果进一步过滤。比如只想要看未成年人的数量,那么可以首先对年龄进行分组统计人数,然后再对分组进行having过滤
-
-
result = session.query(User.age,func.count(User.id)).group_by(User.age).having(User.age >= 18).all()
join方法
表结构
-
class Address(Base):
-
__tablename__ = 'address'
-
id = Column(Integer, primary_key=True)
-
name = Column(String(32))
-
user_id = Column(Integer,ForeignKey("user.id")) # 外键关系 Fk
-
-
-
class User(Base):
-
__tablename__ = 'user'
-
id = Column(Integer, primary_key=True)
-
name = Column(String(32))
-
---> 普通查询
-
all = db_session.query(User, Address).filter(User.id == Address.user_id).all()
-
for u, a in all:
-
print(u.name, a.name)
-
-
输出结果
-
peach 地址1
-
peach 地址2
-
peach 地址3
-
-
---> join查询
-
all = db_session.query(User, Address).join(Address).all()
-
for u, a in all:
-
print(u.name, a.name)
-
-
输出结果
-
peach 地址1
-
peach 地址2
-
peach 地址3
-
-
---> outerjoin查询 left join是以左边为准 right 是以右边为准
-
all = db_session.query(User, Address).outerjoin(Address).all()
-
for u in all:
-
print(u)
-
-
输出结果
-
(<__main__.User object at 0x7fb9e3160518>, <__main__.Address object at 0x7fb9e3160588>)
-
(<__main__.User object at 0x7fb9e3160518>, <__main__.Address object at 0x7fb9e31605f8>)
-
(<__main__.User object at 0x7fb9e3160518>, <__main__.Address object at 0x7fb9e3160668>)
-
(<__main__.User object at 0x7fb9e31606d8>, None)
别名
-
# 当多表查询的时候,有时候同一个表要用到多次,这时候用别名就可以方便的解决命名冲突的问题了
-
from sqlalchemy.orm import aliased
-
adalias1 = aliased(Address)
-
adalias2 = aliased(Address)
-
-
for username, ad1name, ad2name in db_session.query(User.name, adalias1.name, adalias2.name).join(adalias1).all():
-
print(username, ad1name, ad2name)
子查询
-
# 构造子查询
-
# 首先查询用户有多少地址
-
from sqlalchemy.sql import func
-
addr = db_session.query(Address.user_id.label('user_id'), func.count("*").label('address_count')).group_by(Address.user_id).all()
-
# 结果
-
# [(1, 3)] 查出用户id1 有3个地址
-
.label('user_id') 是取的别名
-
# 将子查询放到父查询中
-
addrs = db_session.query(Address.user_id.label('user_id'), func.count("*").label('address_count')).group_by(Address.user_id).subquery() # 子查询语句
-
for u,count in db_session.query(User,addrs.c.address_count).outerjoin(addrs,User.id==addrs.c.user_id).order_by(User.id):
-
print(u.name,count)
-
-
# 结果
-
# peach 3
-
# peach2 None
-
-
# 一个查询如果想要变为子查询,则是通过subquery()方法实现
-
# 变成子查询后,通过子查询.c属性来访问查询出来的列
-
# 以上方式只能查询某个对象的具体字段
以上方式只能查询某个对象的具体字段,如果要查找整个实体,则需要通过aliased方法