准備數據

1 from sqlalchemy.ext.declarative import declarative_base 2 from sqlalchemy import Column 3 from sqlalchemy import Integer, String, Text, Date, DateTime, ForeignKey, UniqueConstraint, Index 4 from sqlalchemy import create_engine 5 from sqlalchemy.orm import relationship 6 7 Base = declarative_base() 8 9 10 class Depart(Base): 11 __tablename__ = 'depart' 12 id = Column(Integer, primary_key=True) 13 title = Column(String(32), index=True, nullable=False) 14 15 16 class Users(Base): 17 __tablename__ = 'users' 18 19 id = Column(Integer, primary_key=True) 20 name = Column(String(32), index=True, nullable=False) 21 depart_id = Column(Integer, ForeignKey("depart.id")) 22 23 # 用於鏈表操作 與表的創建無關 24 dp = relationship("Depart", backref='pers') 25 26 27 class Student(Base): 28 __tablename__ = 'student' 29 id = Column(Integer, primary_key=True) 30 name = Column(String(32), index=True, nullable=False) 31 32 course_list = relationship('Course', secondary='student2course', backref='student_list') 33 34 35 class Course(Base): 36 __tablename__ = 'course' 37 id = Column(Integer, primary_key=True) 38 title = Column(String(32), index=True, nullable=False) 39 40 41 class Student2Course(Base): 42 __tablename__ = 'student2course' 43 id = Column(Integer, primary_key=True, autoincrement=True) 44 student_id = Column(Integer, ForeignKey('student.id')) 45 course_id = Column(Integer, ForeignKey('course.id')) 46 47 __table_args__ = ( 48 UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 聯合唯一索引 49 # Index('ix_id_name', 'name', 'extra'), # 聯合索引 50 ) 51 52 53 def create_all(): 54 engine = create_engine( 55 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 56 max_overflow=0, # 超過連接池大小外最多創建的連接 57 pool_size=5, # 連接池大小 58 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 59 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 60 ) 61 62 Base.metadata.create_all(engine) 63 64 65 def drop_all(): 66 engine = create_engine( 67 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 68 max_overflow=0, # 超過連接池大小外最多創建的連接 69 pool_size=5, # 連接池大小 70 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 71 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 72 ) 73 Base.metadata.drop_all(engine) 74 75 76 if __name__ == '__main__': 77 # drop_all() 78 create_all()
基本操作
1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from models import Users, Student, Depart 4 5 engine = create_engine( 6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 7 max_overflow=0, # 超過連接池大小外最多創建的連接 8 pool_size=5, # 連接池大小 9 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 10 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 11 ) 12 SessionFactory = sessionmaker(bind=engine) 13 14 # 從連接池獲取一個連接 15 session = SessionFactory() 16 17 # ############################## 基本增刪改查 ############################### 18 # 1. 增加 19 obj = Users(name='tang') 20 session.add(obj) 21 session.commit() 22 23 # 批量增加 24 session.add_all([ 25 Users(name='tang'), 26 Users(name='chen') 27 ]) 28 session.commit() 29 30 # 2. 查 31 result = session.query(Users).all() 32 for row in result: 33 print(row.id,row.name) 34 35 # sqlalchemy 的語法跟Python很相似 36 result = session.query(Users).filter(Users.id >= 2) 37 for row in result: 38 print(row.id,row.name) 39 40 41 # 獲取第一個 42 result = session.query(Users).filter(Users.id >= 2).first() 43 print(result) 44 45 # 3.刪 46 session.query(Users).filter(Users.id >= 2).delete() 47 session.commit() 48 49 # 4.改 通過字典 50 session.query(Users).filter(Users.id == 4).update({Users.name:'tang'}) 51 session.query(Users).filter(Users.id == 4).update({'name':'tang'}) 52 session.query(Users).filter(Users.id == 4).update({'name':Users.name+"_lao"},synchronize_session=False) 53 session.commit() 54 55 # ############################## 其他常用 ############################### 56 # 1. 指定列 去別名 57 # 對應原生SQL:select id,name as cname from users; 58 result = session.query(Users.id,Users.name.label('cname')).all() 59 for item in result: 60 print(item[0],item.id,item.cname) 61 62 63 # 2. 默認條件and 64 session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() 65 66 # 3. between 67 session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() 68 69 # 4. in 70 session.query(Users).filter(Users.id.in_([1,3,4])).all() 71 # not in 72 session.query(Users).filter(~Users.id.in_([1,3,4])).all() 73 74 # 5. 子查詢 75 session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='tang'))).all() 76 77 # 6. and 和 or 78 from sqlalchemy import and_, or_ 79 session.query(Users).filter(Users.id > 3, Users.name == 'tang').all() 80 session.query(Users).filter(and_(Users.id > 3, Users.name == 'tang')).all() 81 session.query(Users).filter(or_(Users.id < 2, Users.name == 'tang')).all() 82 session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all() 83 84 # 7. filter_by 只需字段名 85 session.query(Users).filter_by(name='alex').all() 86 87 # 8. 通配符 88 ret = session.query(Users).filter(Users.name.like('e%')).all() 89 ret = session.query(Users).filter(~Users.name.like('e%')).all() 90 91 # 9. 切片 92 result = session.query(Users)[1:2] 93 94 # 10.排序 95 ret = session.query(Users).order_by(Users.name.desc()).all() 96 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() 97 98 # 11. group by 99 from sqlalchemy.sql import func 100 101 ret = session.query(Users.depart_id,func.count(Users.id),).group_by(Users.depart_id).all() 102 for item in ret: 103 print(item) 104 # 105 # from sqlalchemy.sql import func 106 # 分組之后再進行查詢 107 ret = session.query( 108 Users.depart_id, 109 func.count(Users.id), 110 ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all() 111 for item in ret: 112 print(item) 113 114 # 12.union 和 union all 115 """ 116 select id,name from users 117 UNION 118 select id,name from users; 119 """ 120 """ 121 select id,name from users 122 UNION ALL 123 select id,name from users; 124 """ 125 q1 = session.query(Depart.title).filter(Depart.id > 2) 126 q2 = session.query(Student.name).filter(Student.id < 2) 127 ret = q1.union(q2).all() 128 # 129 # q1 = session.query(Users.name).filter(Users.id > 2) 130 # q2 = session.query(Favor.caption).filter(Favor.nid < 2) 131 # ret = q1.union_all(q2).all() 132 133 """ 134 union 和 union_all 的區別 135 union 去重 136 union_all 不去重 137 138 相同點:合並的兩張表的列要相同 139 """ 140 141 """ 142 union 和 join的區別 143 union是垂直合並成一張表 144 join是水平合並成一張表 145 """ 146 147 """ 148 查看原生sql 打印不獲取結果的語句就可以 149 sql = session.query(Users).filter(Users.id==1) 150 print(sql) 151 """ 152 153 session.close()
鏈表操作 與 外鍵relation字段的使用

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from models import Users,Depart 4 5 6 engine = create_engine( 7 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 8 max_overflow=0, # 超過連接池大小外最多創建的連接 9 pool_size=5, # 連接池大小 10 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 11 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 12 ) 13 14 SessionFactory = sessionmaker(bind=engine) 15 session = SessionFactory() 16 17 18 # 單表操作 19 ret = session.query(Users).all() 20 for row in ret: 21 print(row.id,row.name, row.depart_id) 22 23 24 # 鏈表操作 25 ret = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id==Depart.id).all() 26 for row in ret: 27 print(row.id, row.name, row.title) 28 29 # isouter 表示 left join 沒有right join 只能調換查詢順序 30 ret = session.query(Users.id, Users.name, Depart.title).join(Users,isouter=True).all() 31 # print(ret) 32 for row in ret: 33 print(row.id, row.name, row.title) 34 35 36 # 3. relation字段:查詢所有用戶+所屬部門名稱 37 ret = session.query(Users).all() 38 for row in ret: 39 # relation dp的作用 40 print(row.id,row.name,row.depart_id, row.dp.title) 41 42 # 4. relation字段:查詢銷售部所有的人員 43 ret = session.query(Depart).filter(Depart.title=='銷售部').first() 44 for row in ret.pers: 45 print(row.id, row.name, ret.title) 46 47 # 5. 創建一個名稱叫:IT部門,再在該部門中添加一個員工:tanglaoer 48 u1 = Users(name='tanglaoer',dp=Depart(title='IT')) 49 session.add(u1) 50 session.commit() 51 52 # 6. 創建一個名稱叫:技術部,再在該部門中添加一個員工:tang lao san 53 d1 = Depart(title='技術部') 54 d1.pers = [Users(name='tang'),Users(name='lao'), Users(name='san')] 55 session.add(d1) 56 session.commit() 57 58 # 在已存在的技術部 添加幾名員工 59 d1 = session.query(Depart).filter(Depart.title == '技術部').first() 60 d1.pers = [Users(name='LIN'), Users(name='WU'),Users(name='SEN')] 61 session.add(d1) 62 session.commit() 63 64 session.close()
多對多操作

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from models import Student, Course, Student2Course 4 5 engine = create_engine( 6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 7 max_overflow=0, # 超過連接池大小外最多創建的連接 8 pool_size=5, # 連接池大小 9 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 10 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 11 ) 12 13 SessionFactory = sessionmaker(bind=engine) 14 15 session = SessionFactory() 16 # 1. 錄入數據 17 session.add_all([ 18 Student(name='tang'), 19 Student(name='chen'), 20 Course(title='生物'), 21 Course(title='體育'), 22 ]) 23 session.commit() 24 25 # 可批量增加多對多外鍵 26 session.add_all([ 27 Student2Course(student_id=2,course_id=1), 28 Student2Course(student_id=1,course_id=1), 29 Student2Course(student_id=1,course_id=2), 30 ]) 31 32 # 2. 三張表關聯 33 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).order_by(Course.id.asc()).all() 34 print(ret) 35 session.commit() 36 37 # 3. “tang”選的所有課 38 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).filter(Student.name=='tang').all() 39 print(ret) 40 41 # relation 字段的使用 42 ret = session.query(Student).filter(Student.name== 'tang').first() 43 for row in ret.course_list: 44 print(row.title) 45 46 47 # 4. 選了“生物”的所有人 48 # relation 字段的方向使用 49 ret = session.query(Course).filter(Course.title == '生物').first() 50 for row in ret.student_list: 51 print(row.name, ret.title) 52 53 # 5. 創建一個課程,創建2學生,兩個學生選新創建的課程。 54 obj = Course(title='英語') 55 obj.student_list = [Student(name='lin'), Student(name='wu')] 56 session.add(obj) 57 session.commit() 58 59 # 創建一個學生,加入多門新創建課程 60 stu = Student(name='tang') 61 stu.course_list = [Course(title='數學'), Course(title='地理')] 62 session.add(stu) 63 session.commit() 64 65 # 把tang添加到已存在的課程中 66 from sqlalchemy import or_ 67 stu = session.query(Student).filter(Student.name=='tang').first() 68 stu.course_list = session.query(Course).filter(or_(Course.id == 1, Course.id ==3)).all() 69 print(stu.course_list) 70 session.add(stu) 71 session.commit() 72 73 session.close()
sqlalchemy 連接與多線程的操作

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from models import Student 4 engine = create_engine( 5 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 6 max_overflow=0, # 超過連接池大小外最多創建的連接 7 pool_size=5, # 連接池大小 8 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 9 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 10 ) 11 SessionFactory = sessionmaker(bind=engine) 12 13 def task(): 14 # 去連接池中獲取一個連接 15 # 第一版本 16 session = SessionFactory() 17 18 ret = session.query(Student).all() 19 print(ret) 20 # 將連接交還給連接池 21 session.close() 22 23 24 from threading import Thread 25 26 for i in range(20): 27 t = Thread(target=task) 28 t.start()

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from sqlalchemy.orm import scoped_session 4 from models import Student,Course,Student2Course 5 6 engine = create_engine( 7 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 8 max_overflow=0, # 超過連接池大小外最多創建的連接 9 pool_size=5, # 連接池大小 10 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 11 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 12 ) 13 SessionFactory = sessionmaker(bind=engine) 14 session = scoped_session(SessionFactory) 15 # scoped_session 里面有threading.local 16 # 為每個線程賦予一個連接 17 18 def task(): 19 ret = session.query(Student).all() 20 print(ret) 21 # 將連接交還給連接池 22 session.remove() 23 24 25 from threading import Thread 26 27 for i in range(20): 28 t = Thread(target=task) 29 t.start()
sqlalchemy 寫原生SQL語句

1 from sqlalchemy.orm import sessionmaker 2 from sqlalchemy import create_engine 3 from sqlalchemy.orm import scoped_session 4 5 engine = create_engine( 6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8", 7 max_overflow=0, # 超過連接池大小外最多創建的連接 8 pool_size=5, # 連接池大小 9 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 10 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) 11 ) 12 SessionFactory = sessionmaker(bind=engine) 13 session = scoped_session(SessionFactory) 14 15 16 def task(): 17 """""" 18 # 方式一: 19 # 查詢 20 cursor = session.execute('select * from users') 21 result = cursor.fetchall() 22 print(result) 23 24 # 添加 參數通過"冒號" 25 cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'tanglaoer'}) 26 session.commit() 27 print(cursor.lastrowid) 28 29 # 方式二: 30 # 與pymysql的鏈接一模一樣 31 conn = engine.raw_connection() 32 cursor = conn.cursor() 33 cursor.execute( 34 "select * from users" 35 ) 36 result = cursor.fetchall() 37 print(result) 38 cursor.close() 39 conn.close() 40 41 # 將連接交還給連接池 42 session.remove() 43 44 45 from threading import Thread 46 47 for i in range(20): 48 t = Thread(target=task) 49 t.start()