一對多:ForeignKey
multitb_models.py
import datetime from sqlalchemy import create_engine # 引入 創建引擎 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index # 引入列和數據類型 from sqlalchemy.orm import relationship Base = declarative_base() # Base 要自己實例化 class Depart(Base): __tablename__ = "depart" # 表名 id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) class Users(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) depart_id = Column(Integer, ForeignKey("depart.id")) # ForeignKey 需要導入;"depart.id" :表名.id (是表名,而不是類名) # 與生成表結構無關,僅用於 跨表 查詢方便(即不會在 users 這張表中生成 dp 這個字段) dp = relationship("Depart", backref='pers') # relationship() 中的 "Depart" 是類名; dp 是與 Depart 這個類做關聯; backref="pers"用於反向查詢(由 Depart 查詢 Users) def init_db(): """ 根據類創建數據庫的表 :return: """ engine = create_engine( # 創建數據庫連接 "mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest?charset=utf8", # mysql 表示要連接的數據庫;pymysql 表示用 pymysql 來連接;用戶名是 root,密碼是123,連接本地的 dbtest 這個數據庫 max_overflow=0, # 超過連接池大小外最多創建的連接;即超過 pool_size 后最多能溢出多少個連接 pool_size=5, # 連接池大小 pool_timeout=10, # 池中沒有線程(連接)最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置);-1表示不重建 ) Base.metadata.create_all(engine) # Base.metadata.create_all() : 找到當前 py 文件下面 繼承了Base的所有的類,在數據庫中生成一張表 def drop_db(): """ 根據類刪除數據庫中的表 :return: """ engine = create_engine( "mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) # Base.metadata.drop_all() : 找到當前 py 文件下面 繼承了Base的所有的類,在數據庫中刪除相應的表 if __name__ == "__main__": init_db() # drop_db()
multitb_crud.py
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from multitb_models import Users,Depart engine = create_engine("mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # ############### ForeignKey ############## # 1. 查詢所有的用戶 + 所屬部門名稱 ret1 = session.query(Users,Depart).join(Depart).all() # .join() 時 默認的 on 是根據 ForeignKey("depart.id"),進行 on users.depart_id = depart.id (默認是通過 ForeignKey 進行連表) for row in ret1: # 此時 row 為 一個元組,里面的元素為 Users 和 Depart 的對象 print(row[0].name,row[1].title) """ ret1 = session.query(Users,Depart).join(Depart).all() 中的 .join() 也可以指定 on ,如下: ret1 = session.query(Users,Depart).join(Depart,Users.depart_id == Depart.id).all() """ ret2 = session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id==Depart.id).all() # 只取 users 的 id name 和 depart 的 title for row in ret2: # row 也是元組的形式 print(row.id,row.name,row.title) """ .join() 默認是 inner join,想要變成 left join 可以在 join()中設置 isouter=True ,如下: session.query(Users.id,Users.name,Depart.title).join(Depart,Users.depart_id==Depart.id,isouter=True).all() 另外,SQLAlchemy 的 join() 沒有 right join,想要 right join 可以在 query() 中將 Users 和 Depart 調換下位置 注: .join() 后面可以繼續 .join() , 即可以 連很多張表 """ # 2. relationship 字段:查詢所有的用戶 + 所屬部門名稱 (類似 正向查詢) ret3 = session.query(Users).all() for row in ret3: print(row.id,row.name,row.dp.title) # row.dp 是 該row對象(Users對象) 對應的 depart這張表中 所關聯的外鍵 對象記錄,所以 row.dp.title 即為 Depart.title # 打印結果: # 2 neo 開發部 # 3 alex 開發部 # 4 egon 市場部 # 5 wu 運維部 # 3. relationship 字段:查詢開發部的所有人員 (類似 反向查詢) obj = session.query(Depart).filter(Depart.title=="開發部").first() # obj 是 title 為 "開發部" 的一個 Depart對象 print(obj.pers) # 打印結果:列表的形式;列表中的元素其為所關聯的 Users對象 # [<multitb_models.Users object at 0x0000018899C76940>, <multitb_models.Users object at 0x0000018899C76A20>] for row in obj.pers: print(row.id,row.name,obj.title) # 4. relationship 字段:創建一個名為 “銷售部” 的部門,並在該部門中添加一個名為 “maple” 的員工 (一次性創建所有的關聯數據) user1 = Users(name="maple",dp=Depart(title="銷售部")) # 通過這種寫法,能在 users 表中創建一個name 為 "maple"的記錄,由於其關聯的 depart_id 是新添加的,其也能在 depart表中創建一個 title為"銷售部"的記錄,並自動將 title=="銷售部"的depart的記錄id添加到 users表中的 depart_id字段(每次都會創建一個 Depart 的實例對象) session.add(user1) session.commit() # 5. relationship 字段:創建一個名為“IT部”的部門,並在該部門中添加多個員工 depart1 = Depart(title="IT部") # 創建 Depart 的一個對象 depart1.pers = [ # 為 depart1 的反向字段 添加一個列表,列表中是 depart1 所對應的 Users 對象 Users(name="neo1"), Users(name="neo2"), Users(name="neo3") ] session.add(depart1) # 只需要把 depart1 添加;因為 既有 title,又有 其所對應的 Users 對象 session.commit() session.close()
多對多:m2m
m2m_models.py
from sqlalchemy import create_engine # 引入 創建引擎 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index # 引入列和數據類型 from sqlalchemy.orm import relationship Base = declarative_base() # Base 要自己實例化 class Student(Base): __tablename__ = "student" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) # 表中不會額外增加字段,只是為了方便跨表操作 course_list = relationship("Course", secondary="student2course", backref="student_list") # 第一個參數表示 和哪張表作關聯,第二個參數表示 通過哪張表和 "Course"做關聯,第三個參數表示反向字段名 class Course(Base): __tablename__ = "course" id = Column(Integer, primary_key=True) title = Column(String(32), index=True, nullable=False) # SQLAlchemy 的多對多要自己創建第三張表 class Student2Course(Base): __tablename__ = "student2course" id = Column(Integer, primary_key=True, autoincrement=True) student_id = Column(Integer, ForeignKey("student.id")) course_id = Column(Integer, ForeignKey("course.id")) # 建立聯合唯一 __table_args__ = ( UniqueConstraint("student_id", "course_id", name="uix_stu_cou"), # UniqueConstraint:聯合唯一索引;該索引的名字是 uix_stu_cou # Index() 可用於 聯合索引(沒有唯一的要求) ) def init_db(): """ 根據類創建數據庫的表 :return: """ engine = create_engine( # 創建數據庫連接 "mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest?charset=utf8", # mysql 表示要連接的數據庫;pymysql 表示用 pymysql 來連接;用戶名是 root,密碼是123,連接本地的 dbtest 這個數據庫 max_overflow=0, # 超過連接池大小外最多創建的連接;即超過 pool_size 后最多能溢出多少個連接 pool_size=5, # 連接池大小 pool_timeout=10, # 池中沒有線程(連接)最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置);-1表示不重建 ) Base.metadata.create_all(engine) # Base.metadata.create_all() : 找到當前 py 文件下面 繼承了Base的所有的類,在數據庫中生成一張表 def drop_db(): """ 根據類刪除數據庫中的表 :return: """ engine = create_engine( "mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) # Base.metadata.drop_all() : 找到當前 py 文件下面 繼承了Base的所有的類,在數據庫中刪除相應的表 if __name__ == "__main__": init_db() # drop_db()
m2m_crud.py
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from m2m_models import Student, Course, Student2Course engine = create_engine("mysql+pymysql://root:tj037778@127.0.0.1:3306/dbtest", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # ####################### 多對多:m2m ########################### # 一、基本用法(不使用 relationship) # 1. 錄入數據 # session.add_all([ # Student(name="neo"), # Student(name="alex"), # Course(title="生物"), # Course(title="體育"), # ]) # session.commit() # 錄入 關系表 的數據 # session.add_all([ # Student2Course(student_id=1,course_id=1), # Student2Course(student_id=1,course_id=2), # Student2Course(student_id=2,course_id=1), # ]) # session.commit() # 2. 三張表關聯:查詢每個學生對應的課程 ret1 = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc()) for row in ret1: print(row) # 打印結果: # (2, 'neo', '生物') # (3, 'neo', '體育') # (4, 'alex', '生物') # 3. 查詢 neo 對應的所有課 ret1 = session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=="neo").order_by(Student2Course.id.asc()).all() for row in ret1: print(row) # 二、 使用 relationship # 4. 查詢 neo 對應的所有課 (正向查詢) obj = session.query(Student).filter(Student.name=="neo").first() # name=="neo" 的 Student 的對象 for item in obj.course_list: # item 是 obj.course_list 這個列表中的一個個對象 print(item.id,item.title) # 5. 查詢選了“生物”的所有的人 (反向查詢) course_obj = session.query(Course).filter(Course.title=="生物").first() for item in course_obj.student_list: # course_obj.student_list print(item.id,item.name) # 6. 創建一個新的課程“英語”,並創建兩個學生並讓這兩個學生對應新創建的“英語”課程 new_course_obj = Course(title="英語") new_course_obj.student_list = [ # 內部會自動創建關系 Student(name="alina"), Student(name="mike") ] session.add(new_course_obj) session.commit() session.close()
SQLAlchemy 的兩種連接方式
方式一:將獲取連接的操作放到線程函數里面
import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from db import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) def task(arg): session = Session() # 多線程的情況下,獲取數據庫連接的操作要放到線程函數里面,而不能放到全局 obj1 = Users(name="alex1") session.add(obj1) session.commit() session.close() # 將連接交還給連接池 for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
方式二:利用 scoped_session (推薦使用這種:寫法簡單)
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = scoped_session(Session) # scoped_session() 的原理是 threading.local():為每個線程獲取一個連接 def task(arg): obj1 = Users(name="alex1") session.add(obj1) session.commit() session.remove() # session.remove() : 將連接交還給連接池 import threading for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
SQLAlchemy 執行原生SQL
方式一:
import time import threading from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine from sqlalchemy.sql import text from sqlalchemy.engine.result import ResultProxy from db import Users, Hosts engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = Session() # 查詢 # cursor = session.execute('select * from users') # 和 pymysql 的用法一樣 # result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'}) # 此處的 字符串格式化 不是利用 "%",而是 :value session.commit() print(cursor.lastrowid) session.close()
方式二:
import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) def task(arg): conn = engine.raw_connection() # 獲取連接 cursor = conn.cursor() # 創建 游標 cursor.execute( "select * from t1" ) result = cursor.fetchall() cursor.close() conn.close() for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()