pymysql模塊+ Python ORM框架之SQLAlchemy


前言:

Django的ORM雖然強大,但是畢竟局限在Django,而SQLAlchemy是Python中的ORM框架;

SQLAlchemy的作用是:類/對象--->SQL語句--->通過pymysql/MySQLdb模塊--->提交到數據庫執行;

 

 

 

組成部分:

  • Engine,框架的引擎
  • Connection Pooling ,數據庫連接池
  • Dialect,選擇連接數據庫的DB API種類
  • Schema/Types,架構和類型
  • SQL Exprression Language,SQL表達式語言

SQLAlchemy本身無法操作數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:

 

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
pip3 install sqlalchemy  #安裝sqlalchemy模塊 

 

 

一、 基本使用

1.原生SQL

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

conn_pool=create_engine( #創建sqlalchemy引擎
     "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
     max_overflow=2, #超過連接池大小之后,允許最大擴展連接數;
     pool_size=5,   #連接池大小
     pool_timeout=30,#連接池如果沒有連接了,最長等待時間
     pool_recycle=-1,#多久之后對連接池中連接進行一次回收

)


#單線程操作線程池

conn = conn_pool.raw_connection()  #從連接池中獲取1個連接,開始連接
cursor = conn.cursor()
cursor.execute(
    "select * from cmdb_worker_order"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
單線程操作線程池
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

conn_pool=create_engine( #創建sqlalchemy引擎
     "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
     max_overflow=2, #超過連接池大小之后,允許最大擴展連接數;
     pool_size=5,   #連接池大小
     pool_timeout=30,#連接池如果沒有連接了,最長等待時間
     pool_recycle=-1,#多久之后對連接池中連接進行一次回收

)



#多線程操作線程池
def task(arg):
    conn = conn_pool.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        #"select * from cmdb_worker_order"
        "select sleep(2)"
    )
    result = cursor.fetchall()
    cursor.close()
    conn.close()


for i in range(20):
    t = threading.Thread(target=task, args=(i,)) #5個線程 執行2秒 然后5個線程在去執行2秒
    t.start()
多線程操作線程池

 

mysql> show status like 'Threads%';
+-------------------+-------+
| Variable_name     | Value |
+-------------------+-------+
| Threads_cached    | 1     |
| Threads_connected | 8     |
| Threads_created   | 11    |
| Threads_running   | 8     |
+-------------------+-------+
4 rows in set (0.00 sec)

 

2.ORM

2.1單表

a. 創建數據庫單表

#創建單表
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'                #表名稱
    id = Column(Integer, primary_key=True) # primary_key=True設置主鍵
    name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。

def init_db(): #根據類創建數據庫表
    engine = create_engine(
        "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
        max_overflow=0,   # 超過連接池大小外最多創建的連接
        pool_size=5,      # 連接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,否則報錯
        pool_recycle=-1   # 多久之后對線程池中的線程進行一次連接的回收(重置)
    )

    Base.metadata.create_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行創建

if __name__ == '__main__':
    init_db()                           #執行創建
創建單表

b.刪除表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'                #表名稱
    id = Column(Integer, primary_key=True) # primary_key=True設置主鍵
    name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。

def drop_db(): #根據類 刪除數據庫表
    engine = create_engine(
        "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
        max_overflow=0,   # 超過連接池大小外最多創建的連接
        pool_size=5,      # 連接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,否則報錯
        pool_recycle=-1   # 多久之后對線程池中的線程進行一次連接的回收(重置)
    )

    Base.metadata.drop_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行刪除表

if __name__ == '__main__':
    drop_db()                          #執行創建
刪除表

 c.添加1條記錄

# import time
# import threading
# import sqlalchemy
# from sqlalchemy import create_engine
# from sqlalchemy.engine.base import Engine
# #
# conn_pool=create_engine( #創建sqlalchemy引擎
#      "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
#      max_overflow=2, #超過連接池大小之后,允許最大擴展連接數;
#      pool_size=5,   #連接池大小
#      pool_timeout=30,#連接池如果沒有連接了,最長等待時間
#      pool_recycle=-1,#多久之后對連接池中連接進行一次回收
#
# )

#
# #單線程操作線程池
#
# conn = conn_pool.raw_connection()  #從連接池中獲取1個連接,開始連接
# cursor = conn.cursor()
# cursor.execute(
#     "select * from cmdb_worker_order"
# )
# result = cursor.fetchall()
# print(result)
# cursor.close()
# conn.close()


#多線程操作線程池
# def task(arg):
#     conn = conn_pool.raw_connection()
#     cursor = conn.cursor()
#     cursor.execute(
#         #"select * from cmdb_worker_order"
#         "select sleep(2)"
#     )
#     result = cursor.fetchall()
#     cursor.close()
#     conn.close()
#
#
# for i in range(20):
#     t = threading.Thread(target=task, args=(i,)) #5個線程 執行2秒 然后5個線程在去執行2秒
#     t.start()
#
#


#創建單表
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
    __tablename__ = 'users'                #表名稱
    id = Column(Integer, primary_key=True) # primary_key=True設置主鍵
    name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。

def create_db(): #根據類 刪除數據庫表
    engine = create_engine(
        "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
        max_overflow=0,   # 超過連接池大小外最多創建的連接
        pool_size=5,      # 連接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,否則報錯
        pool_recycle=-1   # 多久之后對線程池中的線程進行一次連接的回收(重置)
    )

    Base.metadata.create_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行刪除表

if __name__ == '__main__':
    create_db()                          #執行創建
models.py
from SqlALchemy.models import Users
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8")
Session = sessionmaker(bind=engine)

# 每次執行數據庫操作時,都需要創建一個會話
session = Session()

# ############# 執行ORM操作 #############
obj1 = Users(name="張根") #創建Users對象=1行數據
session.add(obj1)          #添加到表中

# 提交事務
session.commit()
# 關閉session
session.close()
app01.py

 

2.2.多表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

Base = declarative_base()

class Users(Base):
    __tablename__ = 'users'                #表名稱
    id = Column(Integer, primary_key=True) # primary_key=True設置主鍵
    name = Column(String(32), index=True, nullable=False) #index=True創建索引, nullable=False不為空。
    age = Column(Integer, default=18)        #數字字段
    email = Column(String(32), unique=True)  #設置唯一索引
    ctime = Column(DateTime, default=datetime.datetime.now) #設置默認值為當前時間(注意千萬不要datetime.datetime.now())
    extra = Column(Text, nullable=True)         #文本內容字段
    __table_args__ = (
        # UniqueConstraint('id', 'name', name='uix_id_name'), #設置聯合唯一索引
        # Index('ix_id_name', 'name', 'extra'),                #設置聯合索引
    )




class Hosts(Base):
    __tablename__ = 'hosts'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True)
    ctime = Column(DateTime, default=datetime.datetime.now)


# ##################### 一對多示例 #########################
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='籃球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    hobby_id = Column(Integer, ForeignKey("hobby.id"))

    # 與生成表結構無關,僅用於查詢方便
    hobby = relationship("Hobby", backref='pers')


# ##################### 多對多示例 #########################

class Server2Group(Base):
    __tablename__ = 'server2group'
    id = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))


class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    # 與生成表結構無關,僅用於查詢方便
    servers = relationship('Server', secondary='server2group', backref='groups')


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)


def init_db():
    """
    根據類創建數據庫表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
        max_overflow=0,  # 超過連接池大小外最多創建的連接
        pool_size=5,  # 連接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,否則報錯
        pool_recycle=-1  # 多久之后對線程池中的線程進行一次連接的回收(重置)
    )

    Base.metadata.create_all(engine)





def drop_db(): #根據類 刪除數據庫表
    engine = create_engine(
        "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8",
        max_overflow=0,   # 超過連接池大小外最多創建的連接
        pool_size=5,      # 連接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,否則報錯
        pool_recycle=-1   # 多久之后對線程池中的線程進行一次連接的回收(重置)
    )

    Base.metadata.drop_all(engine) #這行代碼很關鍵哦!! 讀取繼承了Base類的所有表在數據庫中進行刪除表

if __name__ == '__main__':
    init_db()                      #執行創建
創建多表

 

 

二、源碼剖析

使用scoped_session(Session) 和Session() 創建的連接的區別?

 

A.發現問題:

為什么 session =scoped_session(Session)  和 session = Session(),明明是2個沒有繼承關系的類實例化出來的對象,卻有相同的add/commit/...方法?

 

class A(object):
    fields=('f1','f2' )
    def f1(self):
        print('A類的f1方法')
    def f2(self):
        print('A的f2方法')

    def __call__(self,*args, **kwargs):
        getattr(self,*args)()

class B(object):
    def __init__(self,class_A):
        self.class_a=class_A()


def waper(name):
    def do(self):
       self.class_a(name)
    return do


for name in A.fields:
    setattr(B,name,waper(name))


obj=B(A)
obj.f1()
obj.f2()
源碼流程分析偽代碼

B.session =scoped_session(Session) 實例化執行scoped_session的__init__方法,Session參數也就是原session類;

class scoped_session(object): #沒有繼承
    session_factory = None
    def __init__(self, session_factory, scopefunc=None):
        self.session_factory = session_factory   #1.0 :session_factory=原來的session類
        if scopefunc:                           # 1.1:scopefunc=None 走else分支
            self.registry = ScopedRegistry(session_factory, scopefunc)
        else:
            '''
       class ThreadLocalRegistry(ScopedRegistry):
            def __init__(self, createfunc):
                self.createfunc = createfunc        #源session類
                self.registry = threading.local()   #封裝了1個可以隔離多線程之間數據的threading.local()對象:  
          '''
            self.registry = ThreadLocalRegistry(session_factory)  #返回1個封裝了源session類和threading.local對象的ThreadLocalRegistry對象

 

C.給 scoped_session類設置 屬性 = 1個封裝了閉包函數do,封裝了這些屬性,在用戶app里實例化 scoped_session()之后就可以去執行這些do函數了! 

def instrument(name):

    def do(self, *args, **kwargs): #self=scoped_session對象

        return getattr(self.registry(), name)(*args, **kwargs) #self    name=add /commit 閉包封裝進來的
        '''
        把一下代碼封裝到 scoped_session類中去,接下如果執行self就是scoped_session對象 或者ScopedRegistry對象了 
了! self.session_factory = session_factory #session_factory=原來的session類 self.registry = ScopedRegistry(session_factory, scopefunc) #ScopedRegistry對象 name: def do(self, *args, **kwargs): return getattr(self.registry(), add )(*args, **kwargs) ''' return do

 

D.app中執行session.add(obj1)本質是執行scoped_session類中封裝的add屬性對應的do函數

    def do(self, *args, **kwargs): 
        # self.registry()=執行ThreadLocalRegistry 或者 scoped_session對象 的__call__方法 
        return getattr(self.registry(), name)(*args, **kwargs) #self    name=add /commit 閉包封裝進來的



#最后執行下面的代碼!  
def __call__(self):
try:
return self.registry.value #去threading.local()獲取
except AttributeError: #如果獲取不到
val = self.registry.value = self.createfunc() #去源session對象中獲取方法
return val
 

 

E.得出結論:

scoped_session(Session) 內部使用了threading.local() 實現了對多線程的支持;

 

F.知識:

__all__ = ['scoped_session']  :1個py文件中使用了__all__=[ ]限制了導入的變量;

threading.local():為每1個線程,另外開辟1塊新內存,來保存local_value,所以每個線程都可以獲取到自己設置的值。

閉包:可以把外部函數數據,傳遞到內部函數中保存;

 

 

三、進階操作

注意無論SQLalchemy的增、刪、改、查操作,最后都需要commit,數據庫才會執行SQL;

1.增、刪、改操作

obj1 = Users(name="張根",age=18,email='13220198866@163.com',extra='sss')
session.add(obj1)
單條增加
session.add_all([
Users(name="張根1",age=19,email='645172205@qq.com',extra='sss'),
Users(name="張根2",age=20,email='13220198866@139.com',extra='sss')

])
批量增加
session.query(Users).filter(Users.id==5).delete()
刪除操作
###################修改##########################
session.query(Users).filter(Users.id==4).update({'name':'Martin'})
session.query(Users).filter(Users.id==4).update({Users.name: Users.name + "666"}, synchronize_session=False)#在原來的基礎上做字符串
session.query(Users).filter(Users.id==18).update({Users.id: Users.id - 12},synchronize_session="evaluate")#在原來的基礎上做數字+,-操作
修改操作

 

2.單表查詢操作

r0=session.query(Users).all()                             #查詢user表中全部數據;
r1=session.query(Users).filter(Users.id > 2)              #查詢user表中,id>2的記錄;
r2=session.query(Users.age).all()                         #查詢User表中的 age列; ##[(18,), (19,)]
r3=session.query(Users.age.label('cname')).all()          #使用別名查詢

r4=session.query(Users).filter(Users.name=='Martin').all()      #查詢姓名==Martin的用戶
r5=session.query(Users).filter_by(name='Martin',age=19).all()   #filter_by方式查詢
r6=session.query(Users).filter_by(name='Martin',age=19).first() #獲取第 單個對象 print(r6.name)
r7=session.query(Users).filter(text("id<:value and name=:name")).params(value=18, name='Martin').order_by(Users.id).all()
                                                                    #查詢 id>18 name=Martin的Users 根據 id排序,params支持傳參數;
r8 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()#from_statement,申請使用原生sql
基本查詢操作
#條件查詢
ret0 = session.query(Users).filter(Users.id.between(0,7), Users.name == 'Martin').all()
# #between: 查詢 user id在0--7之間,用戶名為Martin 的數據;
ret1= session.query(Users).filter(Users.id.in_([1,6])).all()
#查詢user_id in [1,6] 的數據
ret2 = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
#查詢user_id  not in [1,6] 的數據

ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='Martin'))).all()
#1.session.query(Users.id).filter_by(name='Martin') 查詢name=Martin'的id
#2.在user表中 按1的結果 查詢
條件查詢
#################################邏輯運算#####################################

from sqlalchemy import and_, or_
ret0 = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret1 = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret2 = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()
多查詢條件 邏輯運算、嵌套查詢
###################################字符串符 模糊匹配查詢###############################

ret0 = session.query(Users).filter(Users.name.like('M%')).first().name
ret1 = session.query(Users).filter(~Users.name.like('%r%')).first().name
print(ret0,ret1)
字符串符 模糊匹配查詢
########################### 限制(分頁)############################
ret = session.query(Users)[0:2]
print(ret)
限制分頁
##########################排序##############################
ret0 = session.query(Users).order_by(Users.id.desc()).all()                   #根據id,由大到小排序(desc).
ret1 = session.query(Users).order_by(Users.id.asc(),Users.age.desc()).all()   #根據id,由小到大排序(asc),如id相等,由大到小排序(desc).
print([i.id for i in ret0 ]  )
print([i.id for i in ret1 ]  )
排序
################################分組###############################

from sqlalchemy.sql import func

ret0 = session.query(Users).group_by(Users.age).all() #根據name字段進行分組

ret1 = session.query(     #使用name字段進行分組,求每組中 最大id 、最小id 、id 之和
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)) .group_by(Users.name).all()


ret2 = session.query(
    func.max(Users.id),
    func.sum(Users.id),                          #對分組之后的數據進行 having篩選,
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
'''
having的作用:
例如:查詢公司中 部門人數大於3人的部門,先按照部門分組,然后求人數,然后再having 大於3的;

'''
group_by分組查詢和.having二次篩選
##################################連表查詢###############################################

ret = session.query(models.Users).join(models.Hobby,models.Users.id == models.Hobby.id,isouter=True).filter(models.Users.id >1)
#2個沒有外鍵關系的表 做連表查詢
print(ret)

ret = session.query(models.Person).join(models.Hobby).all()              #inin_join
print(ret)
ret = session.query(models.Person).join(models.Hobby,isouter=True).all() #left_join 調換位置 更換為 right_join
print(ret)

''''
left join:以左表為准,顯示符合搜索條件的記錄;

aID     aNum     bID     bName
5     a20050115    NULL     NULL


right join:以右表為准,顯示符合搜索條件的記錄;

aID     aNum     bID     bName
NULL     NULL     8     2006032408


inin_join:並不以誰為基礎,它只顯示符合條件
aID     aNum     bID     bName


'''
連表查詢
# 組合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
組合

 

3.基於relationship的增加、查詢操作

relationship可以幫助我們 快速在存在1對多、多對多關系的表之間做反向連表查詢和數據添加

#連表方式1:指定字段獲取
persons=session.query(models.Person.name,models.Hobby.caption.label('hobby_caption')).join(models.Hobby)
for row in  persons:
    print(row.name,row.hobby_caption,)

#連表方式2:獲取所有字段
persons = session.query(models.Person, models.Hobby).join(models.Hobby)
for row in persons:
    #print(row)=2個對象
    print(row[0].name,row[1].caption)

#連表方式3:relationship 連表查詢
persons = session.query(models.Person).all()
for row in persons:
    print(row.name,row.hobby.caption)

'''
 hobby = relationship("Hobby", backref='pers')
 Hobby:正向查詢
 backref:反向查詢

ps:查詢喜歡姑娘的所有人
hobby_obj=session.query(models.Hobby).filter(models.Hobby.id==2).first()
print(hobby_obj.pers)
'''

################################relationship增加######################
#1.relationship正向增加
person_obj=models.Person(name='Tony',hobby=models.Hobby(caption='any'))


#2.relationship 反向增加
hobby_obj=models.Hobby(caption='人妖')
hobby_obj.pers=[
        models.Person(name='李淵'),
        models.Person(name='西門慶'),
    ]
session.add(person_obj,hobby_obj)
基於relationship 做1對多操作
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from SqlALchemy import models

engine = create_engine( "mysql+pymysql://webproject:web@192.168.1.18:3306/web?charset=utf8")
Session = sessionmaker(bind=engine)
session = Session()

#############################多對多添加#################################

#添加方式1:同時添加男、女對象,直接添加相親表;前提是 知道新添加男、女對象的ID;
session.add_all(
    [
    models.Boy(name='張無忌'),
    models.Boy(name='宋青書'),
    models.Girl(name='周芷若'),
    models.Girl(name='趙敏'),
])
session.commit()

s2g =models.G2B(girl_id=1,boy_id=1)
session.add(s2g)
session.commit()

#添加方式2:通過女生對象 添加 相親記錄表
girl_obj = models.Girl(name='滅絕師太')                                     #創建1位女性朋友 滅絕
girl_obj.boys = [models.Boy(name='張三豐'),models.Boy(name='方正大師')]    #然后滅絕和 張三豐、方正大師分別相了1次親
session.add(girl_obj)
session.commit()

##添加方式3:通過男生對象 添加 相親記錄
boy_obj = session.query(models.Boy).filter(models.Boy.name=='尹志平').first()  #創建1位男性朋友 尹志平
boy_obj.girls = [models.Girl(name='小龍女'),models.Girl(name='黃蓉')]         #然后尹志平 和小龍女、黃蓉分別 相了一次親
session.add(boy_obj)
session.commit()

##################################多對多查詢###################################

#boys = relationship('Boy', secondary='g2b', backref='girls')
#1.基於 relationship 正向查詢
mie_jue = session.query(models.Girl).filter(models.Girl.name=='滅絕師太').first()
print( [i.name for i in mie_jue.boys]) #['方正大師', '張三豐']

#2.基於 relationship 反向查詢
yi_zhi_ping = session.query(models.Boy).filter(models.Boy.name=='尹志平').first()
print( [i.name for i in yi_zhi_ping.girls]) #['小龍女', '黃蓉']
基於relationship 多對多操作

以下為相親表表結構:

###################### 相親表多對多示例 #########################
class G2B(Base):
    __tablename__ = 'g2b'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))

class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    # 與生成表結構無關,僅用於查詢方便
    boys = relationship('Boy', secondary='g2b', backref='girls')


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)
models

 

4.關聯子查詢

什么是SQL子查詢?

mysql> select id,name,(select max(id) from girl) as maxgirl from boy;      #SQL子查詢 +----+--------------+---------+
| id | name         | maxgirl |
+----+--------------+---------+
|  2 | 宋青書       |       7 |
|  5 | 尹志平       |       7 |
|  3 | 張三豐       |       7 |
|  1 | 張無忌       |       7 |
|  4 | 方正大師     |       7 |
+----+--------------+---------+
5 rows in set (0.00 sec)

mysql> 

 

 

 

查詢每個學生的平均分!

First, query the SID from Student

Second,with SID query everyone `s socres  compute average score。

select id,name,(select avg(score) from 成績表 where 成績表.sid =學生表.id ) as avg_socre from 學生表;
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()

# 關聯子查詢
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
FROM server 
WHERE server.id = `group`.id) AS anon_1 
FROM `group`
"""


# 原生SQL
"""
# 查詢
cursor = session.execute('select * from users')
result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""

session.close()
SQLALchemy關聯子查詢操作

 

 

 

四、Flask-SQLAlchemy組件

FlaskSQLAlchemy是flask和SQLALchemy的管理者,其本質是在flask項目中 通過對文件管理、導入,把Flask和QLAlchemy兩個組件無縫連接在一起,

還可以幫助我們實現自動開啟、關閉連接、配置提升開發效率

 

根據一個常見flask項目的目錄結構,梳理一下它的運行流程便知;

sansa項目

 

 

程序入口run.py導入sansa包執行__init__.py文件

0.導入sansa包會執行sanas的__init__.py文件導入create_app

1.執行create_app函數

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
生成依賴文件:
    pipreqs ./

"""
from sansa import create_app  #0.導入sansa包會執行sanas的__init__.py文件導入create_app

app = create_app()              #1.執行create_app函數

if __name__ == '__main__':
    app.run()
run.py

 

執行sansa.__init__.py

0.導入flask_sqlalchemy,注意這里導入的是flask_sqlalchemy不是原始的sqlalchemy

1.讀取、注冊flask的配置文件

2.通過配置文件,將flask_sqlalchemy注冊到app中

3.通過flask藍圖把account.account(路由和視圖) 注冊到app里(導入視圖)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
#0、導入flask_sqlalchemy,注意這里導入的是flask_sqlalchemy不是原始的sqlalchemy
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

from .models import *
from .views import account

def create_app():
    app = Flask(__name__)
    #1、讀取、注冊flask的配置文件
    app.config.from_object('settings.DevelopmentConfig')
    #2、通過配置文件,將flask_sqlalchemy注冊到app中
    db.init_app(app)
    #3、通過藍圖把account.account(路由和視圖) 注冊到app里
    app.register_blueprint(account.account)  #導入視圖
    return app
項目\__init__.py

 

db對象在執行run.py剛剛啟動調用了sansa\__init__.py程序的時候就實例化好了並封裝好了 配置文件、self.Model = self.make_declarative_base(model_class, metadata)現在就可以使用了db對象創建models文件了。

 

開始創建model

0.db在執行run.py剛剛啟動調用了sansa\__init__.py程序的時候就實例化好了

1.導入sansa.__init__中的實例化完成的db對象class Users(db.Model):

2.db對象封裝好了 配置信息、ORM基類、create_all方法

#!/usr/bin/env python
# -*- coding:utf-8 -*      #0.db在執行run.py剛剛啟動調用了sansa\__init__.py程序的時候就實例化好了
from . import db         #1.導入sansa.__init__中的實例化完成的db對象
class Users(db.Model):    #2.db在啟動的時候 已經封裝好了 配置文件、self.Model = self.make_declarative_base(model_class, metadata) 就可以使用了
    """
    用戶表
    """
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)

    def __repr__(self):
        return '<User %r>' % self.username
model.py

 

讀取models.py中的映射去執行SQL創建表

0.加載models表映射關系

1.創建app對象

2.使用db對象根據model這種映射關系執行創建表操作

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sansa import create_app
from sansa import db
app = create_app()
from sansa.models import * #0.加載models表映射關系
with app.app_context():     #1.創建app對象
    db.create_all()          #2.使用db對象根據model這種映射關系執行創建表操作
create_table.py

 

通過視圖操作表

0.導入db對象,包含了engin和 創建連接;

1.導入models;

2. db.session直接獲取連接,開始操作。。。。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sansa import create_app
from sansa import db
app = create_app()
from sansa.models import * #0.加載models表映射關系
with app.app_context():     #1.創建app對象
    db.create_all()          #2.使用db對象根據model這種映射關系執行創建表操作
    #db.drop_all()           #3.使用db對象 刪除表
views\account.py

 

 

五、pipreqs組件

拿到別人的新項目之后發現缺少 這個、那個....模塊運行不起來,然后根據報錯逐一得去pip到最后發現安裝得版本不一致;

這不是你的問題而是項目開發者的不夠規范;

 

1.安裝pipreqs組件

pip install pipreqs

 

2.在項目/目錄下執行pipreqs ./,搜集項目中所有使用得第三方包;

[root@cmdb cmdb_rbac_arya]# pipreqs ./
INFO: Successfully saved requirements file in ./requirements.txt
[root@cmdb cmdb_rbac_arya]# ls
123.txt                cron_ansible_api.py  manage.py   requirements.txt  webcron
ansible_api_runner.py  cron_close_order.py  Monaco.ttf  static            web.sql
arya                   cron_writesql.py     multitask   templates         work_order_remind.py
cmdb                   DBshow               nohup.out   tools
cmdb_rbac_arya         Get_biying_image.py  rbac        w8.pid
[root@cmdb cmdb_rbac_arya]# cat requirements.txt 
paramiko==2.4.1
ansible==2.6.3
PyMySQL==0.8.0
pandas==0.22.0
Django==1.11.7
XlsxWriter==1.0.4
redis==2.10.6
requests==2.18.4
Pillow==5.3.0
[root@cmdb cmdb_rbac_arya]# 

 

 

pymysql

1.批量更新

import pymysql
import datetime
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import conf

#MySQL基礎配置類
class MySQLHandler(object):
    def __init__(self,timedelta,mysql_conf,):
        self.mysql_conf=mysql_conf
        self.logpath='/data/work_data/logs/{0}.log'.format(__file__.split('/')[-1][:-3])
        self.timedelta=timedelta
    def query(self, sql):
        self.ret = []
        mysql_conn = pymysql.connect(**self.mysql_conf)
        cursor = mysql_conn.cursor()
        cursor.execute(sql)
        columns = [x[0] for x in cursor.description]
        for row in cursor.fetchall():
            d = dict(zip(columns, row))
            self.ret.append(d)
        cursor.execute('commit')
        cursor.close()
        mysql_conn.close()
        return self.ret
    def update(self, sql):
        self.ret = []
        mysql_conn = pymysql.connect(**self.mysql_conf)
        cursor = mysql_conn.cursor()
        cursor.execute(sql)
        cursor.execute('commit')
        cursor.close()
        mysql_conn.close()
        return self.ret
    def update_many(self,sql,args):
        self.ret = []
        mysql_conn = pymysql.connect(**self.mysql_conf)
        cursor = mysql_conn.cursor()
        cursor.executemany(sql,args)
        cursor.execute('commit')
        cursor.close()
        mysql_conn.close()
        return self.ret


    def log(self,record):
        # print(record)
        with open(file=self.logpath,mode='a', encoding='utf-8') as f:
            current_record = record
            f.write(current_record)
批量更新

 

2.事務

class MultipleManualHandler(base_handler.BaseHandler):
    @base_handler.error_if_not_super
    def post(self,customer_id):
        data = json.loads(self.request.body.decode('utf8'))
        sql = 'select id from customer_product_manual_version where customer_id="%s"' % (customer_id)
        old_db_records = [str(r['id']) for r in  self.query(sql=sql)]
        import pymysql
        from conf import mysql_conf
        mysql_con = pymysql.connect(**mysql_conf)
        cursor=mysql_con.cursor()
        try:#事物操作
            sql = 'delete from customer_product_manual_version where id in(%s)' % (','.join(old_db_records))
            cursor.execute(sql)
            for p,v in data.items():
                value_list=(customer_id,p,cache.version_to_major(v),v,self.user,datetime.datetime.now())
                sql = 'insert into customer_product_manual_version (customer_id,product,major_version,version,last_modify_user_id,last_modify_time) values("%s","%s","%s","%s","%s","%s");'%(value_list)
                cursor.execute(sql)
        except Exception as e:
            print(e)
            mysql_con.rollback()  # 出錯事務回滾
        else:
            mysql_con.commit()    #沒有出錯提交
        finally:
            cursor.close()
            mysql_con.close()
        sql = 'select product,version from customer_product_manual_version where customer_id="%s"' % (customer_id)
        newest_result=self.query(sql=sql)
        response = {d['product']: d['version'] for d in newest_result}
        self.write(self.to_json(response))
pymysql事務

 

 

 

 

 

參考:銀角大王


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM