ORM
ORM 全拼Object-Relation Mapping
,中文意为 对象-关系映射。主要实现模型对象到关系数据库数据的映射
优点 :
-
只需要面向对象编程, 不需要面向数据库编写代码.
-
对数据库的操作都转化成对类属性和方法的操作.
-
不用编写各种数据库的
sql语句
.
-
-
实现了数据模型与数据库的解耦, 屏蔽了不同数据库操作上的差异.
-
不再需要关注当前项目使用的是哪种数据库。
-
通过简单的配置就可以轻松更换数据库, 而不需要修改代码.
-
缺点 :
-
相比较直接使用SQL语句操作数据库,有性能损失.
-
根据对象的操作转换成SQL语句,根据查询的结果转化成对象, 在映射过程中有性能损失.
flask默认提供模型操作,但是并没有提供ORM,所以一般开发的时候我们会采用flask-SQLAlchemy模块来实现ORM操作。
SQLAlchemy是一个关系型数据库框架,它提供了高层的 ORM 和底层的原生数据库的操作。flask-sqlalchemy 是一个简化了 SQLAlchemy 操作的flask扩展。
SQLAlchemy: https://www.sqlalchemy.org/
中文文档: https://www.osgeo.cn/sqlalchemy/index.html
安装 flask-sqlalchemy【清华源】
pip3 install flask-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install flask-mysqldb -i https://pypi.tuna.tsinghua.edu.cn/simple
安装flask-mysqldb时,注意
安装 flask-mysqldb的时候,python底层依赖于一个底层的模块 mysql-client模块
如果没有这个模块,则会报错如下:
Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-21hysnd4/mysqlclient/
解决方案:sudo apt-get install libmysqlclient-dev python3-dev
运行上面的安装命令如果再次报错如下:
dpkg 被中断,您必须手工运行 ‘sudo dpkg --configure -a’ 解决此问题。
则根据提示执行命令以下命令,再次安装mysqlclient
sudo dpkg --configure -a
apt-get install libmysqlclient-dev python3-dev
解决了mysqlclient问题以后,重新安装 flask-mysqldb即可。
1pip3 install flask-mysqldb -i https://pypi.tuna.tsinghua.edu.cn/simple
class Config(object): DEBUG = True SECRET_KEY = "*(%#4sxcz(^(#$#8423" # 数据库链接配置 = 数据库名称://登录账号:登录密码@数据库主机IP:数据库访问端口/数据库名称?charset=编码类型 SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4" # 动态追踪修改设置,如未设置只会提示警告 SQLALCHEMY_TRACK_MODIFICATIONS = True #查询时会显示原始SQL语句 SQLALCHEMY_ECHO = True 配置完成需要去 MySQL 中创建项目所使用的数据库 $ mysql -uroot -p123 mysql > create database students charset=utf8mb4;
-
-
会话用 db.session 表示。在准备把数据写入数据库前,要先将数据添加到会话中然后调用 db.commit() 方法提交会话。
-
-
在 Flask-SQLAlchemy 中,查询操作是通过 query 对象操作数据。
-

from flask import Flask,render_template,request from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) class Config(): DEBUG = True # 数据库链接配置 # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码" SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4" # 动态追踪修改设置,如未设置只会提示警告 SQLALCHEMY_TRACK_MODIFICATIONS = True # 查询时会显示原始SQL语句 SQLALCHEMY_ECHO = True app.config.from_object(Config) db = SQLAlchemy() db.init_app(app) """创建模型类""" class Student(db.Model): __tablename__ = "tb_student" id = db.Column(db.Integer, primary_key=True,comment="主键ID") name = db.Column(db.String(250), comment="姓名") age = db.Column(db.Integer, comment="年龄") sex = db.Column(db.Boolean, default=False, comment="性别") money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包") def __repr__(self): return self.name class Teacher(db.Model): __tablename__ = "tb_teacher" id = db.Column(db.Integer, primary_key=True, comment="主键ID") name = db.Column(db.String(250), comment="姓名") sex = db.Column(db.Boolean, default=False, comment="性别") option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职") def __repr__(self): return self.name class Course(db.Model): __tablename__ = "tb_course" id = db.Column(db.Integer, primary_key=True, comment="主键ID") name = db.Column(db.String(250), unique=True, comment="课程名称") price = db.Column(db.Numeric(6, 2)) def __repr__(self): return self.name @app.route("/") def index(): return "Ok" if __name__ == '__main__': # with app.app_context(): # db.create_all() # 根据模型创建所有的数据表 # # db.drop_all() # 删除模型对应的所有数据表 app.run()

from flask import Flask,render_template,request from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) class Config(): DEBUG = True # 数据库链接配置 # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码" SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4" # 动态追踪修改设置,如未设置只会提示警告 SQLALCHEMY_TRACK_MODIFICATIONS = True # 查询时会显示原始SQL语句 SQLALCHEMY_ECHO = True app.config.from_object(Config) db = SQLAlchemy() db.init_app(app) """创建模型类""" class Student(db.Model): __tablename__ = "tb_student" id = db.Column(db.Integer, primary_key=True,comment="主键ID") name = db.Column(db.String(250), comment="姓名") age = db.Column(db.Integer, comment="年龄") sex = db.Column(db.Boolean, default=False, comment="性别") money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包") def __repr__(self): return self.name class Teacher(db.Model): __tablename__ = "tb_teacher" id = db.Column(db.Integer, primary_key=True, comment="主键ID") name = db.Column(db.String(250), comment="姓名") sex = db.Column(db.Boolean, default=False, comment="性别") option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职") def __repr__(self): return self.name class Course(db.Model): __tablename__ = "tb_course" id = db.Column(db.Integer, primary_key=True, comment="主键ID") name = db.Column(db.String(250), unique=True, comment="课程名称") price = db.Column(db.Numeric(6, 2)) def __repr__(self): return self.name @app.route("/") def index(): """数据库基本操作""" """添加数据""" # 添加一条数据 # student1 = Student(name="xiaohong",age=16,money=100,sex=True) # db.session.add(student1) # db.session.commit() # 批量添加多条数据 # data_list = [ # Student(name="xiaohui1号",age=16,money=1000, sex=True), # Student(name="xiaohui2号",age=16,money=1000, sex=True), # Student(name="xiaohui3号",age=16,money=1000, sex=True), # Student(name="xiaohui4号",age=16,money=1000, sex=True), # Student(name="xiaohui5号",age=16,money=1000, sex=True), # Student(name="xiaohui6号",age=16,money=1000, sex=True), # ] # db.session.add_all(data_list) # db.session.commit() """查询数据""" # 根据主键ID查询一条数据,如果ID不存在,则返回None不会报错! # student = Student.query.get(100) # if student is None: # print("当前学生不存在!") # else: # print(student) # print(student.name,student.age) # 获取属性 # 根据查询条件获取一条数据 # 模型.query.filter(模型.字段==条件值).first() # student = Student.query.filter(Student.id==1).first() # print(student) # print(student.name,student.money) # 根据查询条件获取多条数据 # 模型.query.filter(模型.字段==条件值).all() # student_list = Student.query.filter(Student.id < 5).all() # print(student_list) # """打印效果; # [xiaoming, xiaohong, xiaohui1号, xiaohui2号] # """ # for student in student_list: # print(student.name, student.money) """更新数据""" # 先查询后修改 # student = Student.query.filter(Student.name=="xiaoming").first() # student.money+=1000 # db.session.commit() # 直接根据条件修改 # Student.query.filter(Student.name=="xiaoming",Student.money==1100).update({Student.money:2000}) # 乐观锁 # 实现类似django的F函数效果,字段值累加 # Student.query.filter(Student.name=="xiaoming").update({Student.money:Student.money+500}) # 乐观锁 # db.session.commit() """删除数据""" # 先查询后删除 # student = Student.query.filter(Student.name=="xiaohui6号").first() # db.session.delete(student) # db.session.commit() # 直接根据条件进行删除操作 Student.query.filter(Student.name=="xiaohui5号").delete() db.session.commit() return "Ok" if __name__ == '__main__': # with app.app_context(): # db.create_all() # 根据模型创建所有的数据表 # # db.drop_all() # 删除模型对应的所有数据表 app.run()
分组查询和分组查询结果过滤

from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) class Config(): # DEBUG调试模式 DEBUG = True # json多字节转unicode编码 JSON_AS_ASCII = False # 数据库链接配置 # SQLALCHEMY_DATABASE_URI = "mysql://账号:密码@IP/数据库名?编码" SQLALCHEMY_DATABASE_URI = "mysql://root:123@127.0.0.1:3306/students?charset=utf8mb4" # 动态追踪修改设置,如未设置只会提示警告 SQLALCHEMY_TRACK_MODIFICATIONS = True # 查询时会显示原始SQL语句 SQLALCHEMY_ECHO = True app.config.from_object(Config) db = SQLAlchemy() db.init_app(app) """创建模型类""" class Student(db.Model): __tablename__ = "tb_student" id = db.Column(db.Integer, primary_key=True,comment="主键ID") name = db.Column(db.String(250), comment="姓名") age = db.Column(db.Integer, comment="年龄") sex = db.Column(db.Boolean, default=False, comment="性别") money = db.Column(db.DECIMAL(8,2), nullable=True, comment="钱包") def __repr__(self): return self.name class Teacher(db.Model): __tablename__ = "tb_teacher" id = db.Column(db.Integer, primary_key=True, comment="主键ID") name = db.Column(db.String(250), comment="姓名") sex = db.Column(db.Boolean, default=False, comment="性别") option = db.Column(db.Enum("讲师","助教","班主任"), default="讲师", comment="教职") def __repr__(self): return self.name class Course(db.Model): __tablename__ = "tb_course" id = db.Column(db.Integer, primary_key=True, comment="主键ID") name = db.Column(db.String(250), unique=True, comment="课程名称") price = db.Column(db.Numeric(6, 2)) def __repr__(self): return self.name @app.route("/") def index(): from sqlalchemy import func """ group_by 分组查询""" # 查询男生和女生的最大年龄 Student.query.group()以下Student不支持聚合查询 # ret = db.session.query(Student.sex,func.max(Student.age)).group_by(Student.sex).all() # print(ret) # 查询出男生和女生年龄大于18的人数 # having是针对分组的结果进行过滤处理,所以having能调用的字段,必须是分组查询结果中的字段,否则报错!! # ret = db.session.query(Student.sex,Student.age,func.count(Student.age)).group_by(Student.sex,Student.age).having(Student.age>18).all() # print(ret) """执行原生SQL语句,返回结果不是模型对象, 是列表和元祖""" # 查询多条 # ret = db.session.execute("select id,name,age,IF(sex,'男','女') from tb_student").fetchall() # print(ret) # # 查询单条 # ret = db.session.execute("select * from tb_student where id = 3").fetchone() # print(ret) # 添加/修改/删除 # db.session.execute("UPDATE tb_student SET money=(money + %s) WHERE age = %s" % (200, 22)) # db.session.commit() # 查询出女生和男生中大于18岁的人数 ret = db.session.execute("SELECT IF(sex,'男','女'), count(id) from (SELECT id,name,age,sex FROM `tb_student` WHERE age>18) as stu group by sex").fetchall() print(ret) return "Ok" if __name__ == '__main__': app.run()