一.SQLAlchemy介紹
SQLAlchemy是一個基於Python實現的ORM框架。該框架建立在 DB API之上,使用關系對象映射進行數據庫操作,簡言之便是:將類和對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果。
1
|
pip3 install sqlalchemy
|
組成部分:
- Engine,框架的引擎
- Connection Pooling ,數據庫連接池
- Dialect,選擇連接數據庫的DB API種類
- Schema/Types,架構和類型
- SQL Exprression Language,SQL表達式語言
SQLAlchemy本身無法操作數據庫,其本質上是依賴pymysql.MySQLdb,mssql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:
SQLAlchemy用一個字符串表示連接信息:
'數據庫類型+數據庫驅動名稱://用戶名:口令@機器地址:端口號/數據庫名'
1
2
3
4
5
6
7
8
9
10
11
12
13
|
MySQL
-
Python
mysql
+
mysqldb:
/
/
<user>:<password>@<host>[:<port>]
/
<dbname>
pymysql
mysql
+
pymysql:
/
/
<username>:<password>@<host>
/
<dbname>[?<options>]
MySQL
-
Connector
mysql
+
mysqlconnector:
/
/
<user>:<password>@<host>[:<port>]
/
<dbname>
cx_Oracle
oracle
+
cx_oracle:
/
/
user:
pass
@host:port
/
dbname[?key
=
value&key
=
value...]
更多:http:
/
/
docs.sqlalchemy.org
/
en
/
latest
/
dialects
/
index.html
|
底層處理
使用 Engine/ConnectionPooling/Dialect 進行數據庫操作,Engine使用ConnectionPooling連接數據庫,然后再通過Dialect執行SQL語句。
#!/usr/bin/env python # -*- coding: utf-8 -*- # auth : pangguoping from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)" # ) # 新插入行自增ID # cur.lastrowid # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),] # ) # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)", # host='1.1.1.99', color_id=3 # ) # 執行SQL # cur = engine.execute('select * from hosts') # 獲取第一行數據 # cur.fetchone() # 獲取第n行數據 # cur.fetchmany(3) # 獲取所有數據 # cur.fetchall()
說白了就是使用pymysql的方法一樣.
二. 使用
1. 執行原生SQL語句
import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程時,最多等待的時間,超時報錯,默認30秒 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置),-1代表永遠不回收,即一直被重用 ) def task(arg): conn = engine.raw_connection() #拿到的是一個原生的pymysql連接對象 cursor = conn.cursor() cursor.execute( "select * from t1" ) result = cursor.fetchall() cursor.close() conn.close() for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()

#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5) def task(arg): conn = engine.contextual_connect() with conn: cur = conn.execute( "select * from t1" ) result = cur.fetchall() print(result) for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()

#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine from sqlalchemy.engine.result import ResultProxy engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5) def task(arg): cur = engine.execute("select * from t1") result = cur.fetchall() cur.close() print(result) for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()
注意: 查看連接,進程cmd,mysql中>輸入 show status like 'Threads%';
2. ORM
a. 創建數據庫表
創建單表
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() # 創建對象的基類: # 定義User對象: class Users(Base): # 表的名字: __tablename__ = 'users' # 表的結構: id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False,default='xx') # index指定是否是索引,nullable是否能為空 email = Column(String(32), unique=True) # 指定唯一 ctime = Column(DateTime, default=datetime.datetime.now) #注意,此處設置時datetime.datetime.now若加了括號,則時間永遠是程序啟動時的時間,后面創建數據時,不會變化 extra = Column(Text, nullable=True) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), # 聯合唯一索引 Index('ix_id_name', 'name', 'email'), #給name和email創建普通索引,索引名為ix_id_name ) def init_db(): """ 根據類創建數據庫表 :return: """ # 初始化數據庫連接: engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) #找到所有繼承了Base的類,按照其結構建表 def drop_db(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()
默認建的表的引擎是MyISAM,如果要設置成InnoDB(支持事務),該怎么設置呢?
__table_args__ = { 'mysql_engine': 'InnoDB', # 指定表的引擎 'mysql_charset': 'utf8' # 指定表的編碼格式 }
FK,M2M關系的創建
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime,UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() # 創建對象的基類: # ##################### 一對多示例 ######################### class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='籃球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) hobby_id = Column(Integer, ForeignKey("hobby.id")) #建FK關系 # 與生成表結構無關,僅用於查詢方便 hobby = relationship("Hobby", backref='pers') #反向關聯的名字 # ##################### 多對多示例 ######################### # 這里多對多需要自己建第三張表,並綁定關系 class Server2Group(Base): __tablename__ = 'server2group' id = Column(Integer, primary_key=True, autoincrement=True) #autoincrement 設置自增 server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 與生成表結構無關,僅用於查詢方便 servers = relationship('Server', secondary='server2group', backref='groups') #反向關聯的名字 class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): """ 根據類創建數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/userinfo?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) def drop_db(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/userinfo?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()
SQLALchemy不同於Django的ORM,當創建多對多關聯事,不會自動創建第三張表,需要我們自己定義關系表,進行關聯
b. 操作數據庫表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) #創建Session類 # 每次執行數據庫操作時,都需要創建一個session session = Session() # 創建session對象: # ############# 執行ORM操作 ############# # 創建新User對象 obj1 = Users(name="alex1") # 添加到session: session.add(obj1) # 提交即保存到數據庫: session.commit() # 關閉session session.close()
c.通過原生SQL語句執行

#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 查詢 # cursor = session.execute('select * from users') # result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'hc'}) # 注意占位符和傳參的形式 session.commit() print(cursor.lastrowid) session.close() 原生SQL語句
d.基本增刪改查示例
https://www.keakon.net/2012/12/03/SQLAlchemy使用經驗
import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from db import Users, Hosts engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # ################ 添加 ################ obj1 = Users(name="hc") session.add(obj1) #添加一個對象 session.add_all([ Users(name="hc"), Users(name="alex"), Hosts(name="c1.com"), ]) #添加多個對象 session.commit() # ################ 刪除 ################ # filter是where條件,最后調用one()或first()返回唯一行,如果調用all()則返回所有行 session.query(Users).filter(Users.id > 2).delete() #刪除Users表中id大於2的數據 session.commit() # ################ 修改 ################ session.query(Users).filter(Users.id > 0).update({"name" : "099"}) # 將Users表中id>0的數據,把name字段改為099 # 更新user表中id大於2的name列,在原字符串后邊增加099 session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) #synchronize_session設置為False即執行字符串拼接 # 更新user表中id大於2的num列,使最終值在原來數值基礎上加1 session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") #synchronize_session設置為evaluate即執行四則運算 session.commit() # ################ 查詢 ################ r1 = session.query(Users).all() r2 = session.query(Users.name.label('xx'), Users.age).all() #label 取別名的,即在查詢結果中,顯示name的別名'xx' r3 = session.query(Users).filter(Users.name == "alex").one() # one()返回唯一行,類似於django的get,如果返回數據為多個則報錯 r3 = session.query(Users).filter(Users.name == "alex").all() # all()獲取所有數據 r4 = session.query(Users).filter_by(name='alex').all() # 注意filter和filter_by后面括號內條件的寫法 r5 = session.query(Users).filter_by(name='alex').first() # first()獲取返回數據的第一行 r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all() #order_by后面還可以.desc()降序排列,默認為.asc()升序排列 # text(自定義條件,:的功能類似%s占位),params中進行傳參 r7 = session.query(Users).from_statement(text("SELECT * FROM Hosts where name=:name")).params(name='ed').all() # text中還能從另一個表中查詢,前面要用from_statement,而不是filter session.close()
當我們使用in_查詢時,如果進行刪除會更新,會出現如下錯誤
InvalidRequestError: Could not evaluate current criteria in Python. Specify 'fetch' or False for the synchronize_session parameter.
解決辦法:加上 synchronize_session=False
https://segmentfault.com/q/1010000000130368
e.基於relationship操作ForeignKey
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 添加 """ session.add_all([ Hobby(caption='乒乓球'), Hobby(caption='羽毛球'), Person(name='張三', hobby_id=3), Person(name='李四', hobby_id=4), ]) person = Person(name='張九', hobby=Hobby(caption='姑娘')) session.add(person) hb = Hobby(caption='人妖') hb.pers = [Person(name='文飛'), Person(name='博雅')] session.add(hb) # 會同時創建3條數據(1條hobby的數據,2條person的數據) session.commit() """ # 使用relationship正向查詢 """ v = session.query(Person).first() print(v.name) print(v.hobby.caption) """ # 使用relationship反向查詢 """ v = session.query(Hobby).first() print(v.caption) print(v.pers) """ session.close()
f.基於relationship操作m2m

#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 添加 """ session.add_all([ Server(hostname='c1.com'), Server(hostname='c2.com'), Group(name='A組'), Group(name='B組'), ]) session.commit() s2g = Server2Group(server_id=1, group_id=1) session.add(s2g) session.commit() gp = Group(name='C組') gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')] session.add(gp) session.commit() ser = Server(hostname='c6.com') ser.groups = [Group(name='F組'),Group(name='G組')] session.add(ser) session.commit() """ # 使用relationship正向查詢 """ v = session.query(Group).first() print(v.name) print(v.servers) """ # 使用relationship反向查詢 """ v = session.query(Server).first() print(v.hostname) print(v.groups) """ session.close() 基於relationship操作m2m
g.進階操作
in_、notin_、and、or、like、limit、排序、分組、連表、組合
# 條件 ret = session.query(Users).filter_by(name='alex').all() # ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() # 且的關系 ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() # ~表示非。就是not in的意思 ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() # 聯表查詢 from sqlalchemy import and_, or_ # 且和or的關系 ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() # 條件以and方式排列 ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() # 條件以or方式排列 ret = session.query(Users).filter( or_( #這部分表示括號中的條件都以or的形式匹配 Users.id < 2, # 或者 or User.id < 2 and_(Users.name == 'eric', Users.id > 3),# 表示括號中這部分進行and匹配 Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 表示not like # 限制 limit用法 ret = session.query(Users)[1:2] # 等於limit ,具體功能需要自己測試 # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 按照name從大到小排列,如果name相同,按照id從小到大排列 # 分組 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # having對聚合的內容再次進行過濾 # 連表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() # 默認是inner join ret = session.query(Person).join(Favor, isouter=True).all() # isouter表示是left join # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() #union默認會去重 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all() # union_all不去重
去重
https://segmentfault.com/a/1190000006949536
關聯子查詢

#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text, func from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 關聯子查詢 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """ # 原生SQL """ # 查詢 cursor = session.execute('select * from users') result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) session.commit() print(cursor.lastrowid) """ session.close()
子查詢: session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() """ select * from users where id in (select id from xxx) """ subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() #第一步: session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id) #這句的sql語句為 select count(id) as sid from server where server.id = group.id 如果直接運行,則會報錯 # 第二步:.correlate(Group).as_scalar() ==> 代表此時不執行查詢操作,將其當作條件,在group表中查詢時,才執行查詢 result = session.query(Group.name, subqry) # sql語句為:select group.name subqry from group #第三步:將subqry替換為上面的條件,則此句的SQL為: # select group.name,(select count(id) as sid from server where server.id = group.id) as xx from group
class User(db.Model): __tablename__ = "user" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), unique=True, nullable=False) pwd = db.Column(db.String(100), nullable=False) role_id = db.Column(db.Integer, default=0) email = db.Column(db.String(100), nullable=True) addtime = db.Column(db.DateTime, index=True, default=datetime.now) is_active = db.Column(db.Boolean, default=True) uid = db.Column(db.String(24), nullable=False, default=uuid, unique=True, server_default=uuid()) class Userlog(db.Model): __tablename__ = "userlog" id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, default=0) db.ForeignKey('user.id') ip = db.Column(db.String(100)) addtime = db.Column(db.DateTime, index=True, default=datetime.now)
orm語句
subqry = db.session.query(Userlog).order_by(Userlog.id.desc()).subquery() s = aliased(Userlog,subqry) rs = db.session.query(User, s.ip.label('last_ip'), s.addtime.label('last_time')).outerjoin(s, User.id == s.user_id).group_by( s.user_id) print rs
對應的sql語句
SELECT user.id AS user_id, user.name AS user_name, user.pwd AS user_pwd, user.role_id AS user_role_id, user.email AS user_email, user.addtime AS user_addtime, user.is_active AS user_is_active, user.uid AS user_uid, anon_1.ip AS last_ip, anon_1.addtime AS last_time FROM user LEFT OUTER JOIN (SELECT userlog.id AS id, userlog.user_id AS user_id, userlog.ip AS ip, userlog.addtime AS addtime FROM userlog ORDER BY userlog.id DESC) AS anon_1 ON user.id = anon_1.user_id GROUP BY anon_1.user_id
1.多條件組合,可以用and_,or_實現。最外層時,and_可以省略,默認用逗號分開條件。
db.session.query(User).filter( and_( or_(User.name==name1,User.name==name2), or_(User.status==1,User.status==2) ), User.active==1 ).first()
2.動態組合條件。針對不同的場景,可能需要不同的查詢條件,類似動態的拼接SQL 語句。
if filter_type == 1: search = and_(GameRoom.status ==1,or_( and_(GameRoom.white_user_id == user_id, GameRoom.active_player == 1), and_(GameRoom.black_user_id == user_id, GameRoom.active_player == 0))) elif filter_type == 2: search = and_(GameRoom.status ==1,or_( and_(GameRoom.white_user_id == user_id, GameRoom.active_player == 0), and_(GameRoom.black_user_id == user_id, GameRoom.active_player == 1))) elif filter_type == 3: search = GameRoom.create_by == user_id db.session.query(GameRoom).filter(search).all()
3.關聯查詢。對應SQL的join和left join等。
session.query(User, Address).filter(User.id == Address.user_id).all()
session.query(User).join(User.addresses).all()
session.query(User).outerjoin(User.addresses).all()
4.使用別名用aliased,aliased在orm包中。當要對同一個表使用多次關聯時,可能需要用到別名。同時,如果查詢的結果有多個同名的字段,可以使用label重命名。
black_user = orm.aliased(User) white_user = orm.aliased(User) db.session.query( GameRoom, black_user.score.label("black_score"), white_user.score.label("white_score") ).outerjoin(black_user,GameRoom.black_user_id==black_user.user_id).outerjoin( white_user,GameRoom.white_user_id==white_user.user_id).filter( GameRoom.id==room_id ).all()
5.聚合查詢和使用數據庫函數。func可以調用各種聚合函數,和當前數據庫支持的其它函數。
session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all() session.query(User.name, func.sum(User.id).label("user_id_sum")).filter(func.to_days(User.create_date)==func.to_days(func.now())).group_by(User.name).all()
6.子查詢
stmt = db.session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery() db.session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all()
7.直接運行SQL語句查詢。如果查詢實在太復雜,覺得用SQLAlchemy查詢方式很難實現,或者要通過存儲過程實現查詢,可以讓SQLAlchemy直接運行SQL語句返回結果。
sql ="""select b.user_id,b.user_name,b.icon,b.score,a.add_score from (select user_id, sum(score_new - score_old) as add_score from user_score_log where year(create_date)=year(now()) and month(create_date)=month(now()) group by user_id) a join users b on a.user_id=b.user_id order by a.add_score desc limit 50""" list_top = db.session.execute(sql).fetchall()
8.分頁查詢。sqlalchemy中分頁用到pagination,先不說性能怎么樣,使用起來是真的非常方便。
pagination = GameMessage.query.filter(GameMessage.game_id==game_id).\ order_by(GameMessage.id.desc()).\ paginate(page, per_page=20, error_out=True) pages = pagination.pages total = pagination.total items = pagination.items
h.session對象如何實現線程安全?
session有兩種創建方式
方式一:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:123@47.93.4.198:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Session = sessionmaker(bind=engine) # 方式一: # 由於無法提供線程共享功能,所有在開發時要注意,在每個線程中自己創建 session。 # from sqlalchemy.orm.session import Session # 具有操作數據庫的:'close', 'commit', 'connection', 'delete', 'execute', 'expire',..... session = Session() # 創建普通的session print('原生session',session) # 操作數據庫 session.close()
由於無法提供線程共享功能,所有在開發時要注意,在每個線程中自己創建 session
解決辦法如下:

#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from db import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) def task(arg): session = Session() obj1 = Users(name="alex1") session.add(obj1) session.commit() for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
方式二(推薦):
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session engine = create_engine( "mysql+pymysql://root:123@47.93.4.198:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Session = sessionmaker(bind=engine) # 方式二:支持線程安全,自動為每個線程創建一個session,單線程時,只創建一個 # - threading.Local # - 唯一標識 # ScopedSession對象 # self.registry(), 加括號 創建session # self.registry(), 加括號 創建session # self.registry(), 加括號 創建session from greenlet import getcurrent as get_ident #本地線程的唯一標識的函數,加括號則執行函數 session = scoped_session(Session,get_ident) # session.add # 操作數據庫 session.remove()
支持線程安全,自動為每個線程創建一個session,單線程時,只創建一個
I.sqlalchemy-utils給SqlAlchemy提供choice功能
SqlAlchemy本身沒有chocie,需要安裝這個才能提供choice功能
pip install sqlalchemy-utils

from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String from sqlalchemy_utils import ChoiceType from sqlalchemy import create_engine Base = declarative_base() class User(Base): __tablename__ = 'users' type_choices=( (1,'北京'), (2,'上海'), ) id = Column(Integer, primary_key=True) #必須要有主鍵 name =Column(String(64)) types=Column(ChoiceType(type_choices,Integer())) # 注意:Integer后面要有括號 __table_args__ = { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8' } def init_db(): """ 根據類創建數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) def drop_db(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': drop_db() init_db()

#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "HuChong" # Date: 2018/1/12 from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from ru import User engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/db1", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() obj1 = User(name="xz",types=1) obj2 = User(name="zz",types=2) session.add_all([obj1,obj2]) session.commit() session.close()

#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "HuChong" # Date: 2018/1/12 from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from ru import User engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/db1", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() result_list=session.query(User).all() print(result_list) for item in result_list: print(item.types) print(item.types.code,item.types.value) session.close() #######打印結果如下######## ''' [<ru.User object at 0x0386D770>, <ru.User object at 0x0386D7D0>] Choice(code=1, value=北京) 1 北京 Choice(code=2, value=上海) 2 上海 '''
三、Flask-SQLAlchemy及Flask-Migrate組件
1.Flask-SQLAlchemy
用於將Flask和SQLAlchemy聯系起來,使用之前需要裝下面這個模塊
pip install flask-sqlalchemy
如果使用Flask-sqlalchemy組件,則在使用時有一點變化
# 1. 引入Flask-SQLAlchemy from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() #實例化SQLAlchemy對象
# 2. 注冊 Flask-SQLAlchemy # SQLAlchemy(app) # 由於這個對象在其他地方想要使用,所有用以下方式注冊 db.init_app(app) #讀取配置文件,配置文件中寫以前在create_engine里面的鏈接數據
#settings.py中,加上配置
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@47.93.4.198:3306/s6?charset=utf8"
SQLALCHEMY_POOL_SIZE = 2
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1
# 追蹤對象的修改並且發送信號
SQLALCHEMY_TRACK_MODIFICATIONS = False
# 3. 導入models中的表 from .models import *
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from app import db # 4. 寫類繼承db.Model class Users(db.Model): #再不是繼承Base,而且繼承db.Model __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) pwd = Column(String(32)) __table_args__ = { 'mysql_engine': 'InnoDB', # 指定表的引擎 'mysql_charset': 'utf8' # 指定表的編碼格式 } class Group(db.Model): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) __table_args__ = { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8' }
# 5. 創建和刪除表 # 以后執行db.create_all() # 以后執行db.drop_all() 但是這樣不好,我們引入 Flask-Migrate
2.Flask-Migrate
可以通過類似Django里的命令,進行數據遷移,創建表,刪除表,更新表
安裝 pip install Flask-Migrate
# 5.1 導入 from flask_migrate import Migrate, MigrateCommand from app import create_app, db app = create_app() manager = Manager(app) # 5.2 創建migrate實例 migrate = Migrate(app, db)
#執行命令: 初次:python manage.py db init python manage.py db migrate python manage.py db upgrade
以后執行SQL時: 方式一: result = db.session.query(models.User.id,models.User.name).all() db.session.remove() 方式二: result = models.Users.query.all()
3.代碼規范之生成requestments.txt文件
pip freeze # 獲取環境中所有安裝的模塊以及其對應的版本 pip freeze > requirements.txt # 生成對應的文本文件
由於獲取的是所有,我們還得自己手動在文本里刪除一些不必要的,所有這個方法不好,我們使用下面的方法
pip install pipreqs
首先安裝模塊,安裝完成以后,我們就可以在終端,執行pipreqs命令
# 獲取當前所在程序目錄中涉及到的所有模塊,並自動生成 requirements.txt 且寫入內容。 pipreqs ./
建議在Linux系統下使用,windows環境下會報錯
以后使用別人的程序,進入程序目錄:
安裝requirements.txt依賴
pip install -r requirements.txt
會自動安裝文件里,所有對應版本模塊
https://segmentfault.com/a/1190000003050954
http://www.cnblogs.com/huchong/p/8797516.html
http://docs.jinkan.org/docs/flask/patterns/sqlalchemy.html
SQLAlchemy外鍵和關系
http://www.codexiu.cn/python/SQLAlchemy%E5%9F%BA%E7%A1%80%E6%95%99%E7%A8%8B/73/530/
lazy的用法
http://shomy.top/2016/08/11/flask-sqlalchemy-relation-lazy/