前言:
Django的ORM雖然強大,但是畢竟局限在Django,而SQLAlchemy是Python中的ORM框架;
SQLAlchemy的作用是:類/對象--->SQL語句--->通過pymysql/MySQLdb模塊--->提交到數據庫執行;
組成部分:
- Engine,框架的引擎
- Connection Pooling ,數據庫連接池
- Dialect,選擇連接數據庫的DB API種類
- Schema/Types,架構和類型
- SQL Exprression Language,SQL表達式語言
SQLAlchemy本身無法操作數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
pip3 install sqlalchemy #安裝sqlalchemy模塊
一、 基本使用
1.原生SQL

import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine conn_pool=create_engine( #創建sqlalchemy引擎 "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", max_overflow=2, #超過連接池大小之后,允許最大擴展連接數; pool_size=5, #連接池大小 pool_timeout=30,#連接池如果沒有連接了,最長等待時間 pool_recycle=-1,#多久之后對連接池中連接進行一次回收 ) #單線程操作線程池 conn = conn_pool.raw_connection() #從連接池中獲取1個連接,開始連接 cursor = conn.cursor() cursor.execute( "select * from cmdb_worker_order" ) result = cursor.fetchall() print(result) cursor.close() conn.close()

import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine conn_pool=create_engine( #創建sqlalchemy引擎 "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", max_overflow=2, #超過連接池大小之后,允許最大擴展連接數; pool_size=5, #連接池大小 pool_timeout=30,#連接池如果沒有連接了,最長等待時間 pool_recycle=-1,#多久之后對連接池中連接進行一次回收 ) #多線程操作線程池 def task(arg): conn = conn_pool.raw_connection() cursor = conn.cursor() cursor.execute( #"select * from cmdb_worker_order" "select sleep(2)" ) result = cursor.fetchall() cursor.close() conn.close() for i in range(20): t = threading.Thread(target=task, args=(i,)) #5個線程 執行2秒 然后5個線程在去執行2秒 t.start()
mysql> show status like 'Threads%'; +-------------------+-------+ | Variable_name | Value | +-------------------+-------+ | Threads_cached | 1 | | Threads_connected | 8 | | Threads_created | 11 | | Threads_running | 8 | +-------------------+-------+ 4 rows in set (0.00 sec)
2.ORM
2.1單表
a. 創建數據庫單表

#創建單表 #!/usr/bin/env python # -*- coding:utf-8 -*- import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' #表名稱 id = Column(Integer, primary_key=True) # primary_key=True設置主鍵 name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。 def init_db(): #根據類創建數據庫表 engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行創建 if __name__ == '__main__': init_db() #執行創建
b.刪除表

#!/usr/bin/env python # -*- coding:utf-8 -*- import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' #表名稱 id = Column(Integer, primary_key=True) # primary_key=True設置主鍵 name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。 def drop_db(): #根據類 刪除數據庫表 engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行刪除表 if __name__ == '__main__': drop_db() #執行創建
c.添加1條記錄

# import time # import threading # import sqlalchemy # from sqlalchemy import create_engine # from sqlalchemy.engine.base import Engine # # # conn_pool=create_engine( #創建sqlalchemy引擎 # "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", # max_overflow=2, #超過連接池大小之后,允許最大擴展連接數; # pool_size=5, #連接池大小 # pool_timeout=30,#連接池如果沒有連接了,最長等待時間 # pool_recycle=-1,#多久之后對連接池中連接進行一次回收 # # ) # # #單線程操作線程池 # # conn = conn_pool.raw_connection() #從連接池中獲取1個連接,開始連接 # cursor = conn.cursor() # cursor.execute( # "select * from cmdb_worker_order" # ) # result = cursor.fetchall() # print(result) # cursor.close() # conn.close() #多線程操作線程池 # def task(arg): # conn = conn_pool.raw_connection() # cursor = conn.cursor() # cursor.execute( # #"select * from cmdb_worker_order" # "select sleep(2)" # ) # result = cursor.fetchall() # cursor.close() # conn.close() # # # for i in range(20): # t = threading.Thread(target=task, args=(i,)) #5個線程 執行2秒 然后5個線程在去執行2秒 # t.start() # # #創建單表 #!/usr/bin/env python # -*- coding:utf-8 -*- import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): __tablename__ = 'users' #表名稱 id = Column(Integer, primary_key=True) # primary_key=True設置主鍵 name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。 def create_db(): #根據類 刪除數據庫表 engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行刪除表 if __name__ == '__main__': create_db() #執行創建

from SqlALchemy.models import Users from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8") Session = sessionmaker(bind=engine) # 每次執行數據庫操作時,都需要創建一個會話 session = Session() # ############# 執行ORM操作 ############# obj1 = Users(name="張根") #創建Users對象=1行數據 session.add(obj1) #添加到表中 # 提交事務 session.commit() # 關閉session session.close()
2.2.多表

#!/usr/bin/env python # -*- coding:utf-8 -*- import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index from sqlalchemy.orm import relationship Base = declarative_base() class Users(Base): __tablename__ = 'users' #表名稱 id = Column(Integer, primary_key=True) # primary_key=True設置主鍵 name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。 age = Column(Integer, default=18) #數字字段 email = Column(String(32), unique=True) #設置唯一索引 ctime = Column(DateTime, default=datetime.datetime.now) #設置默認值為當前時間(注意千萬不要datetime.datetime.now()) extra = Column(Text, nullable=True) #文本內容字段 __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), #設置聯合唯一索引 # Index('ix_id_name', 'name', 'extra'), #設置聯合索引 ) class Hosts(Base): __tablename__ = 'hosts' id = Column(Integer, primary_key=True) name = Column(String(32), index=True) ctime = Column(DateTime, default=datetime.datetime.now) # ##################### 一對多示例 ######################### class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='籃球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) hobby_id = Column(Integer, ForeignKey("hobby.id")) # 與生成表結構無關,僅用於查詢方便 hobby = relationship("Hobby", backref='pers') # ##################### 多對多示例 ######################### class Server2Group(Base): __tablename__ = 'server2group' id = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 與生成表結構無關,僅用於查詢方便 servers = relationship('Server', secondary='server2group', backref='groups') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): """ 根據類創建數據庫表 :return: """ engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) def drop_db(): #根據類 刪除數據庫表 engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行刪除表 if __name__ == '__main__': init_db() #執行創建
二、源碼剖析
使用scoped_session(Session) 和Session() 創建的連接的區別?
A.發現問題:
為什么 session =scoped_session(Session) 和 session = Session(),明明是2個沒有繼承關系的類實例化出來的對象,卻有相同的add/commit/...方法?

class A(object): fields=('f1','f2' ) def f1(self): print('A類的f1方法') def f2(self): print('A的f2方法') def __call__(self,*args, **kwargs): getattr(self,*args)() class B(object): def __init__(self,class_A): self.class_a=class_A() def waper(name): def do(self): self.class_a(name) return do for name in A.fields: setattr(B,name,waper(name)) obj=B(A) obj.f1() obj.f2()
B.session =scoped_session(Session) 實例化執行scoped_session的__init__方法,Session參數也就是原session類;
class scoped_session(object): #沒有繼承 session_factory = None def __init__(self, session_factory, scopefunc=None): self.session_factory = session_factory #1.0 :session_factory=原來的session類 if scopefunc: # 1.1:scopefunc=None 走else分支 self.registry = ScopedRegistry(session_factory, scopefunc) else: ''' class ThreadLocalRegistry(ScopedRegistry): def __init__(self, createfunc): self.createfunc = createfunc #源session類 self.registry = threading.local() #封裝了1個可以隔離多線程之間數據的threading.local()對象: ''' self.registry = ThreadLocalRegistry(session_factory) #返回1個封裝了源session類和threading.local對象的ThreadLocalRegistry對象
C.給 scoped_session類設置 屬性 = 1個封裝了閉包函數do,封裝了這些屬性,在用戶app里實例化 scoped_session()之后就可以去執行這些do函數了!
def instrument(name): def do(self, *args, **kwargs): #self=scoped_session對象 return getattr(self.registry(), name)(*args, **kwargs) #self name=add /commit 閉包封裝進來的 ''' 把一下代碼封裝到 scoped_session類中去,接下如果執行self就是scoped_session對象 或者ScopedRegistry對象了
了! self.session_factory = session_factory #session_factory=原來的session類 self.registry = ScopedRegistry(session_factory, scopefunc) #ScopedRegistry對象 name: def do(self, *args, **kwargs): return getattr(self.registry(), add )(*args, **kwargs) ''' return do
D.app中執行session.add(obj1)本質是執行scoped_session類中封裝的add屬性對應的do函數
def do(self, *args, **kwargs): # self.registry()=執行ThreadLocalRegistry 或者 scoped_session對象 的__call__方法 return getattr(self.registry(), name)(*args, **kwargs) #self name=add /commit 閉包封裝進來的
#最后執行下面的代碼!
def __call__(self):
try:
return self.registry.value #去threading.local()獲取
except AttributeError: #如果獲取不到
val = self.registry.value = self.createfunc() #去源session對象中獲取方法
return val
E.得出結論:
scoped_session(Session) 內部使用了threading.local() 實現了對多線程的支持;
F.知識:
__all__ = ['scoped_session'] :1個py文件中使用了__all__=[ ]限制了導入的變量;
threading.local():為每1個線程,另外開辟1塊新內存,來保存local_value,所以每個線程都可以獲取到自己設置的值。
閉包:可以把外部函數數據,傳遞到內部函數中保存;
三、進階操作
注意無論SQLalchemy的增、刪、改、查操作,最后都需要commit,數據庫才會執行SQL;
1.增、刪、改操作

obj1 = Users(name="張根",age=18,email='13220198866@163.com',extra='sss') session.add(obj1)

session.add_all([ Users(name="張根1",age=19,email='645172205@qq.com',extra='sss'), Users(name="張根2",age=20,email='13220198866@139.com',extra='sss') ])

session.query(Users).filter(Users.id==5).delete()

###################修改########################## session.query(Users).filter(Users.id==4).update({'name':'Martin'}) session.query(Users).filter(Users.id==4).update({Users.name: Users.name + "666"}, synchronize_session=False)#在原來的基礎上做字符串 session.query(Users).filter(Users.id==18).update({Users.id: Users.id - 12},synchronize_session="evaluate")#在原來的基礎上做數字+,-操作
2.單表查詢操作

r0=session.query(Users).all() #查詢user表中全部數據; r1=session.query(Users).filter(Users.id > 2) #查詢user表中,id>2的記錄; r2=session.query(Users.age).all() #查詢User表中的 age列; ##[(18,), (19,)] r3=session.query(Users.age.label('cname')).all() #使用別名查詢 r4=session.query(Users).filter(Users.name=='Martin').all() #查詢姓名==Martin的用戶 r5=session.query(Users).filter_by(name='Martin',age=19).all() #filter_by方式查詢 r6=session.query(Users).filter_by(name='Martin',age=19).first() #獲取第 單個對象 print(r6.name) r7=session.query(Users).filter(text("id<:value and name=:name")).params(value=18, name='Martin').order_by(Users.id).all() #查詢 id>18 name=Martin的Users 根據 id排序,params支持傳參數; r8 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()#from_statement,申請使用原生sql

#條件查詢 ret0 = session.query(Users).filter(Users.id.between(0,7), Users.name == 'Martin').all() # #between: 查詢 user id在0--7之間,用戶名為Martin 的數據; ret1= session.query(Users).filter(Users.id.in_([1,6])).all() #查詢user_id in [1,6] 的數據 ret2 = session.query(Users).filter(~Users.id.in_([1,3,4])).all() #查詢user_id not in [1,6] 的數據 ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='Martin'))).all() #1.session.query(Users.id).filter_by(name='Martin') 查詢name=Martin'的id #2.在user表中 按1的結果 查詢

#################################邏輯運算##################################### from sqlalchemy import and_, or_ ret0 = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret1 = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret2 = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all()

###################################字符串符 模糊匹配查詢############################### ret0 = session.query(Users).filter(Users.name.like('M%')).first().name ret1 = session.query(Users).filter(~Users.name.like('%r%')).first().name print(ret0,ret1)

########################### 限制(分頁)############################ ret = session.query(Users)[0:2] print(ret)

##########################排序############################## ret0 = session.query(Users).order_by(Users.id.desc()).all() #根據id,由大到小排序(desc). ret1 = session.query(Users).order_by(Users.id.asc(),Users.age.desc()).all() #根據id,由小到大排序(asc),如id相等,由大到小排序(desc). print([i.id for i in ret0 ] ) print([i.id for i in ret1 ] )

################################分組############################### from sqlalchemy.sql import func ret0 = session.query(Users).group_by(Users.age).all() #根據name字段進行分組 ret1 = session.query( #使用name字段進行分組,求每組中 最大id 、最小id 、id 之和 func.max(Users.id), func.sum(Users.id), func.min(Users.id)) .group_by(Users.name).all() ret2 = session.query( func.max(Users.id), func.sum(Users.id), #對分組之后的數據進行 having篩選, func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() ''' having的作用: 例如:查詢公司中 部門人數大於3人的部門,先按照部門分組,然后求人數,然后再having 大於3的; '''

##################################連表查詢############################################### ret = session.query(models.Users).join(models.Hobby,models.Users.id == models.Hobby.id,isouter=True).filter(models.Users.id >1) #2個沒有外鍵關系的表 做連表查詢 print(ret) ret = session.query(models.Person).join(models.Hobby).all() #inin_join print(ret) ret = session.query(models.Person).join(models.Hobby,isouter=True).all() #left_join 調換位置 更換為 right_join print(ret) '''' left join:以左表為准,顯示符合搜索條件的記錄; aID aNum bID bName 5 a20050115 NULL NULL right join:以右表為准,顯示符合搜索條件的記錄; aID aNum bID bName NULL NULL 8 2006032408 inin_join:並不以誰為基礎,它只顯示符合條件 aID aNum bID bName '''

# 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all()
3.基於relationship的增加、查詢操作
relationship可以幫助我們 快速在存在1對多、多對多關系的表之間做反向連表查詢和數據添加;

#連表方式1:指定字段獲取 persons=session.query(models.Person.name,models.Hobby.caption.label('hobby_caption')).join(models.Hobby) for row in persons: print(row.name,row.hobby_caption,) #連表方式2:獲取所有字段 persons = session.query(models.Person, models.Hobby).join(models.Hobby) for row in persons: #print(row)=2個對象 print(row[0].name,row[1].caption) #連表方式3:relationship 連表查詢 persons = session.query(models.Person).all() for row in persons: print(row.name,row.hobby.caption) ''' hobby = relationship("Hobby", backref='pers') Hobby:正向查詢 backref:反向查詢 ps:查詢喜歡姑娘的所有人 hobby_obj=session.query(models.Hobby).filter(models.Hobby.id==2).first() print(hobby_obj.pers) ''' ################################relationship增加###################### #1.relationship正向增加 person_obj=models.Person(name='Tony',hobby=models.Hobby(caption='any')) #2.relationship 反向增加 hobby_obj=models.Hobby(caption='人妖') hobby_obj.pers=[ models.Person(name='李淵'), models.Person(name='西門慶'), ] session.add(person_obj,hobby_obj)

from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from SqlALchemy import models engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8") Session = sessionmaker(bind=engine) session = Session() #############################多對多添加################################# #添加方式1:同時添加男、女對象,直接添加相親表;前提是 知道新添加男、女對象的ID; session.add_all( [ models.Boy(name='張無忌'), models.Boy(name='宋青書'), models.Girl(name='周芷若'), models.Girl(name='趙敏'), ]) session.commit() s2g =models.G2B(girl_id=1,boy_id=1) session.add(s2g) session.commit() #添加方式2:通過女生對象 添加 相親記錄表 girl_obj = models.Girl(name='滅絕師太') #創建1位女性朋友 滅絕 girl_obj.boys = [models.Boy(name='張三豐'),models.Boy(name='方正大師')] #然后滅絕和 張三豐、方正大師分別相了1次親 session.add(girl_obj) session.commit() ##添加方式3:通過男生對象 添加 相親記錄 boy_obj = session.query(models.Boy).filter(models.Boy.name=='尹志平').first() #創建1位男性朋友 尹志平 boy_obj.girls = [models.Girl(name='小龍女'),models.Girl(name='黃蓉')] #然后尹志平 和小龍女、黃蓉分別 相了一次親 session.add(boy_obj) session.commit() ##################################多對多查詢################################### #boys = relationship('Boy', secondary='g2b', backref='girls') #1.基於 relationship 正向查詢 mie_jue = session.query(models.Girl).filter(models.Girl.name=='滅絕師太').first() print( [i.name for i in mie_jue.boys]) #['方正大師', '張三豐'] #2.基於 relationship 反向查詢 yi_zhi_ping = session.query(models.Boy).filter(models.Boy.name=='尹志平').first() print( [i.name for i in yi_zhi_ping.girls]) #['小龍女', '黃蓉']
以下為相親表表結構:

###################### 相親表多對多示例 ######################### class G2B(Base): __tablename__ = 'g2b' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 與生成表結構無關,僅用於查詢方便 boys = relationship('Boy', secondary='g2b', backref='girls') class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False)
4.關聯子查詢
什么是SQL子查詢?
mysql> select id,name,(select max(id) from girl) as maxgirl from boy; #SQL子查詢 +----+--------------+---------+ | id | name | maxgirl | +----+--------------+---------+ | 2 | 宋青書 | 7 | | 5 | 尹志平 | 7 | | 3 | 張三豐 | 7 | | 1 | 張無忌 | 7 | | 4 | 方正大師 | 7 | +----+--------------+---------+ 5 rows in set (0.00 sec) mysql>
查詢每個學生的平均分!
First, query the SID from Student
Second,with SID query everyone `s socres compute average score。
select id,name,(select avg(score) from 成績表 where 成績表.sid =學生表.id ) as avg_socre from 學生表;

#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text, func from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 關聯子查詢 subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar() result = session.query(Group.name, subqry) """ SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid FROM server WHERE server.id = `group`.id) AS anon_1 FROM `group` """ # 原生SQL """ # 查詢 cursor = session.execute('select * from users') result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) session.commit() print(cursor.lastrowid) """ session.close()
四、Flask-SQLAlchemy組件
FlaskSQLAlchemy是flask和SQLALchemy的管理者,其本質是在flask項目中 通過對文件管理、導入,把Flask和QLAlchemy兩個組件無縫連接在一起,
還可以幫助我們實現自動開啟、關閉連接、配置提升開發效率;
根據一個常見flask項目的目錄結構,梳理一下它的運行流程便知;
程序入口run.py導入sansa包執行__init__.py文件
0.導入sansa包會執行sanas的__init__.py文件導入create_app
1.執行create_app函數

#!/usr/bin/env python # -*- coding:utf-8 -*- """ 生成依賴文件: pipreqs ./ """ from sansa import create_app #0.導入sansa包會執行sanas的__init__.py文件導入create_app app = create_app() #1.執行create_app函數 if __name__ == '__main__': app.run()
執行sansa.__init__.py
0.導入flask_sqlalchemy,注意這里導入的是flask_sqlalchemy不是原始的sqlalchemy
1.讀取、注冊flask的配置文件
2.通過配置文件,將flask_sqlalchemy注冊到app中
3.通過flask藍圖把account.account(路由和視圖) 注冊到app里(導入視圖)

#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask #0、導入flask_sqlalchemy,注意這里導入的是flask_sqlalchemy不是原始的sqlalchemy from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() from .models import * from .views import account def create_app(): app = Flask(__name__) #1、讀取、注冊flask的配置文件 app.config.from_object('settings.DevelopmentConfig') #2、通過配置文件,將flask_sqlalchemy注冊到app中 db.init_app(app) #3、通過藍圖把account.account(路由和視圖) 注冊到app里 app.register_blueprint(account.account) #導入視圖 return app
db對象在執行run.py剛剛啟動調用了sansa\__init__.py程序的時候就實例化好了並封裝好了 配置文件、self.Model = self.make_declarative_base(model_class, metadata)現在就可以使用了db對象創建models文件了。
開始創建model
0.db在執行run.py剛剛啟動調用了sansa\__init__.py程序的時候就實例化好了
1.導入sansa.__init__中的實例化完成的db對象class Users(db.Model):
2.db對象封裝好了 配置信息、ORM基類、create_all方法

#!/usr/bin/env python # -*- coding:utf-8 -* #0.db在執行run.py剛剛啟動調用了sansa\__init__.py程序的時候就實例化好了 from . import db #1.導入sansa.__init__中的實例化完成的db對象 class Users(db.Model): #2.db在啟動的時候 已經封裝好了 配置文件、self.Model = self.make_declarative_base(model_class, metadata) 就可以使用了 """ 用戶表 """ __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) def __repr__(self): return '<User %r>' % self.username
讀取models.py中的映射去執行SQL創建表
0.加載models表映射關系
1.創建app對象
2.使用db對象根據model這種映射關系執行創建表操作

#!/usr/bin/env python # -*- coding:utf-8 -*- from sansa import create_app from sansa import db app = create_app() from sansa.models import * #0.加載models表映射關系 with app.app_context(): #1.創建app對象 db.create_all() #2.使用db對象根據model這種映射關系執行創建表操作
通過視圖操作表
0.導入db對象,包含了engin和 創建連接;
1.導入models;
2. db.session直接獲取連接,開始操作。。。。

#!/usr/bin/env python # -*- coding:utf-8 -*- from sansa import create_app from sansa import db app = create_app() from sansa.models import * #0.加載models表映射關系 with app.app_context(): #1.創建app對象 db.create_all() #2.使用db對象根據model這種映射關系執行創建表操作 #db.drop_all() #3.使用db對象 刪除表
五、pipreqs組件
拿到別人的新項目之后發現缺少 這個、那個....模塊運行不起來,然后根據報錯逐一得去pip到最后發現安裝得版本不一致;
這不是你的問題而是項目開發者的不夠規范;
1.安裝pipreqs組件
pip install pipreqs
2.在項目/目錄下執行pipreqs ./,搜集項目中所有使用得第三方包;
[root@cmdb cmdb_rbac_arya]# pipreqs ./ INFO: Successfully saved requirements file in ./requirements.txt [root@cmdb cmdb_rbac_arya]# ls 123.txt cron_ansible_api.py manage.py requirements.txt webcron ansible_api_runner.py cron_close_order.py Monaco.ttf static web.sql arya cron_writesql.py multitask templates work_order_remind.py cmdb DBshow nohup.out tools cmdb_rbac_arya Get_biying_image.py rbac w8.pid [root@cmdb cmdb_rbac_arya]# cat requirements.txt paramiko==2.4.1 ansible==2.6.3 PyMySQL==0.8.0 pandas==0.22.0 Django==1.11.7 XlsxWriter==1.0.4 redis==2.10.6 requests==2.18.4 Pillow==5.3.0 [root@cmdb cmdb_rbac_arya]#
pymysql
1.批量更新

import pymysql import datetime import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import conf #MySQL基礎配置類 class MySQLHandler(object): def __init__(self,timedelta,mysql_conf,): self.mysql_conf=mysql_conf self.logpath='/data/work_data/logs/{0}.log'.format(__file__.split('/')[-1][:-3]) self.timedelta=timedelta def query(self, sql): self.ret = [] mysql_conn = pymysql.connect(**self.mysql_conf) cursor = mysql_conn.cursor() cursor.execute(sql) columns = [x[0] for x in cursor.description] for row in cursor.fetchall(): d = dict(zip(columns, row)) self.ret.append(d) cursor.execute('commit') cursor.close() mysql_conn.close() return self.ret def update(self, sql): self.ret = [] mysql_conn = pymysql.connect(**self.mysql_conf) cursor = mysql_conn.cursor() cursor.execute(sql) cursor.execute('commit') cursor.close() mysql_conn.close() return self.ret def update_many(self,sql,args): self.ret = [] mysql_conn = pymysql.connect(**self.mysql_conf) cursor = mysql_conn.cursor() cursor.executemany(sql,args) cursor.execute('commit') cursor.close() mysql_conn.close() return self.ret def log(self,record): # print(record) with open(file=self.logpath,mode='a', encoding='utf-8') as f: current_record = record f.write(current_record)
2.事務

class MultipleManualHandler(base_handler.BaseHandler): @base_handler.error_if_not_super def post(self,customer_id): data = json.loads(self.request.body.decode('utf8')) sql = 'select id from customer_product_manual_version where customer_id="%s"' % (customer_id) old_db_records = [str(r['id']) for r in self.query(sql=sql)] import pymysql from conf import mysql_conf mysql_con = pymysql.connect(**mysql_conf) cursor=mysql_con.cursor() try:#事物操作 sql = 'delete from customer_product_manual_version where id in(%s)' % (','.join(old_db_records)) cursor.execute(sql) for p,v in data.items(): value_list=(customer_id,p,cache.version_to_major(v),v,self.user,datetime.datetime.now()) sql = 'insert into customer_product_manual_version (customer_id,product,major_version,version,last_modify_user_id,last_modify_time) values("%s","%s","%s","%s","%s","%s");'%(value_list) cursor.execute(sql) except Exception as e: print(e) mysql_con.rollback() # 出錯事務回滾 else: mysql_con.commit() #沒有出錯提交 finally: cursor.close() mysql_con.close() sql = 'select product,version from customer_product_manual_version where customer_id="%s"' % (customer_id) newest_result=self.query(sql=sql) response = {d['product']: d['version'] for d in newest_result} self.write(self.to_json(response))
參考:銀角大王