sqlalchemy所依賴的模塊
import sqlalchemy import sqlalchemy.orm import sqlalchemy.ext.declarative
連接數據庫,利用數據庫字符串構造engine, echo為True將打印所有的sql語句
engine = sqlalchemy.create_engine("mysql+pymysql://dba_0:mimadba_0@101.200.174.172/data_secret", encoding="utf8", echo=False)
with engine.connect() as conn: # 最基礎的用法 result = conn.execute("select * from tablename limit 10;") for item in result: print(item) # execute的幾種用法,這里具體還是得參考pymysql的用法,不需要執行commit操作 conn.execute("insert into tablename(id, url, title) values(1, 'url1', 'title1');") conn.execute("insert into tablename(id, url, title) values(%s, %s, %s);", 2, "url2", "title2") conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", (3, "url3", "title3")) conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", [(31, "url31", "title31"), (32, "url32", "title32")]) # 使用事務可以進行批量提交和回滾 trans = conn.begin() try: conn.execute("insert into tablename(id, url, title) values(%s, %s, %s)", [(4, "url4", "title4"), (5, "url5", "title5")]) trans.commit() except Exception as excep: trans.rollback() raise trans.close()
創建數據庫和刪除數據庫:
def init_db(): """ 根據類創建數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) def drop_db(): """ 根據類刪除數據庫表 :return: """ engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine)
建表:首先需要生成一個BaseModel類,作為所有模型類的基類
BaseModel = sqlalchemy.ext.declarative.declarative_base()
構建數據模型User
class User(BaseModel): __tablename__ = "Users" # 表名 __table_args__ = { "mysql_engine": "InnoDB", # 表的引擎 "mysql_charset": "utf8", # 表的編碼格式 } # 表結構,具體更多的數據類型自行百度 id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True) name = sqlalchemy.Column("name", sqlalchemy.String(50), nullable=False) age = sqlalchemy.Column("age", sqlalchemy.Integer, default=0) # 添加角色id外鍵,關聯到表Roles的id屬性 role_id = sqlalchemy.Column("role_id", sqlalchemy.Integer, sqlalchemy.ForeignKey("Roles.id")) # 添加關系屬性,關聯到本實例的role_id外鍵屬性上 role = sqlalchemy.orm.relationship("Role", foreign_keys="User.role_id") # 添加關系屬性,關聯到本實例的role_id外鍵屬性上,如果使用了這種方式,Role模型中的users可以省略 # role = sqlalchemy.orm.relationship("Role", foreign_keys="User.role_id", backref=sqlalchemy.orm.backref("users"))
### 留意,ForeignKey字段,backref,relationship方法。后面是怎么用到的。relationship不添加額外字段,只是為了方便查詢。
構建數據模型Role:
class Role(BaseModel): __tablename__ = "Roles" # 表名 __table_args__ = { "mysql_engine": "InnoDB", # 表的引擎 "mysql_charset": "utf8", # 表的編碼格式 } # 表結構,具體更多的數據類型自行百度 id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True) name = sqlalchemy.Column("name", sqlalchemy.String(50), unique=True) # 添加關系屬性,關聯到實例User的role_id外鍵屬性上 users = sqlalchemy.orm.relationship("User", foreign_keys="User.role_id")
利用Session對象連接數據庫
DBSessinon = sqlalchemy.orm.sessionmaker(bind=engine) # 創建會話類 session = DBSessinon() # 創建會話對象
# 刪除所有表 BaseModel.metadata.drop_all(engine) # 創建所有表,如果表已經存在,則不會創建 BaseModel.metadata.create_all(engine)
常用查詢:
try: # 清空數據,不需要commit操作 session.query(User).filter(User.id != -1).delete() session.query(Role).filter(Role.id != -1).delete() # 刪除數據的另外一種形式:session.delete() # 插入數據,這里的一個實例只插入一次,第二次插入不生效 session.add(Role(id=1, name="student")) session.add(Role(id=2, name="teacher")) session.commit() session.add(User(name="James", age=20, role_id=1)) session.add(User(name="Wade", age=40, role_id=2)) # 另外一種插入方法,一次插入多項 session.add_all([ User(name='alex1',age=30,role_id=3), User(name='alex2',age=32,role_id=5), ]) user = User(name="Kobe", age=24, role_id=1) session.add(user) session.commit() # 刪除數據 session.query(User).filter(Users.id > 2).delete() session.commit() # 修改數據 user.name = "Allen" session.merge(user) # 使用merge方法,如果存在則修改,如果不存在則插入 session.query(User).filter(User.id == 3).update({User.name: "Allen"}) # 使用update方法 session.query(User).filter(User.id == 4).update({User.age: User.age + 1}) # 使用update方法,自增操作 # 查詢數據 roles = session.query(Role) # 返回全部結果 for role in roles: print("Role:", role.id, role.name) users = session.query(User) # 返回全部結果 for user in users: print("User:", user.id, user.name, user.age, user.role_id) # 其他獲取數據的方式 print("get(id):", session.query(User).get(1)) # 返回結果集中id為1的項 print("get[1:3]:", session.query(User)[1:3]) # 返回結果集中的第2-3項 # 其他高級查詢,這里以Users表為例 users = session.query(User).filter(User.id > 6) # 條件查詢 users = session.query(User).filter(User.id > 6).all() # 條件查詢,返回查詢的全部數據 user = session.query(User).filter(User.id > 6).first() # 條件查詢,返回查詢數據的第一項 users = session.query(User).filter(User.id > 6).limit(10) # 條件查詢,返回最多10條數據 users = session.query(User).filter(User.id > 6).offset(2) # 條件查詢,從第3條數據開始返回 users = session.query(User).filter(User.id > 6, User.name == "Kobe") # 條件查詢,and操作 users = session.query(User).filter(User.id > 6).filter(User.name == "Kobe") # 條件查詢,and操作 users = session.query(User).filter(sqlalchemy.or_(User.id > 6, User.name == "Kobe")) # 條件查詢,or操作 users = session.query(User).filter(User.id.in_((1, 2))) # 條件查詢,in操作 users = session.query(User).filter(sqlalchemy.not_(User.name)) # 條件查詢,not操作 user_count = session.query(User.id).count() # 統計全部user的數量 user_count = session.query(sqlalchemy.func.count(User.id)).scalar() # scalar操作返回第一行數據的第一個字段 session.query(sqlalchemy.func.count("*")).select_from(User).scalar() # scalar操作返回第一行數據的第一個字段 session.query(sqlalchemy.func.count(1)).select_from(User).scalar() # scalar操作返回第一行數據的第一個字段 session.query(sqlalchemy.func.count(User.id)).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表 session.query(sqlalchemy.func.sum(User.age)).scalar() # 求和運算,運用scalar函數 session.query(sqlalchemy.func.avg(User.age)).scalar() # 求均值運算,運用scalar函數 session.query(sqlalchemy.func.md5(User.name)).filter(User.id == 1).scalar() # 運用md5函數 users = session.query(sqlalchemy.distinct(User.name)) # 去重查詢,根據name進行去重 users = session.query(User).order_by(User.name) # 排序查詢,正序查詢 users = session.query(User).order_by(User.name.desc()) # 排序查詢,倒序查詢 users = session.query(User).order_by(sqlalchemy.desc(User.name)) # 排序查詢,倒序查詢的另外一種形式 users = session.query(User.id, User.name) # 只查詢部分屬性 users = session.query(User.name.label("user_name")) # 結果集的列取別名 for user in users: print("label test:", user.user_name) # 這里使用別名 users = session.query(sqlalchemy.func.count(User.name).label("count"), User.age).group_by(User.age) # 分組查詢 for user in users: print("age:{0}, count:{1}".format(user.age, user.count)) # 多表查詢 result = session.query(User, Role).filter(User.role_id == Role.id) for user, role in result: print("user %s's role is %s" % (user.name, role.name)) users = session.query(User).join(Role, User.role_id == Role.id) for user in users: print("user join, name:", user.name) # 關聯屬性的用法 roles = session.query(Role) for role in roles: print("role:%s users:" % role.name) for user in role.users: print("\t%s" % user.name) users = session.query(User) for user in users: print("user %s's role is %s" % (user.name, user.role.name)) except Exception as excep: session.rollback() raise session.close()
relationship的應用:
class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='籃球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) hobby_id = Column(Integer, ForeignKey("hobby.id")) # 與生成表結構無關,僅用於查詢方便 hobby = relationship("Hobby", backref='pers')
# 使用relationship正向查詢 """ v = session.query(Person).first() print(v.name) print(v.hobby.caption) """ # 使用relationship反向查詢 """ v = session.query(Hobby).first() print(v.caption) print(v.pers) """
relationship不會產生額外的字段,他會像django ORM中的雙下划線一樣,可以跨表查詢,backref相當於反向查。
多對多:
SQLAlchemy不會像django那樣指定了manytomany就自動生成第三張表相關聯,而是要我們手動添加第三張表:
class Server2Group(Base): __tablename__ = 'server2group' id = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 與生成表結構無關,僅用於查詢方便 servers = relationship('Server', secondary='server2group', backref='groups') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False)