使用 Flask-SQLAlchemy 来操作数据库
1 配置
本文使用sqlite来作为例子演示,在config.py里面更新下数据库的配置
import os basedir = os.path.abspath(os.path.dirname(__file__)) SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'app.db') SQLALCHEMY_MIGRATE_REPO = os.path.join(basedir, 'db_repository')
Flask-SQLAlchemy会引用到 SQLALCHEMY_DATABASE_URI 来获取数据库的存储位置,SQLAlchemy-migrate 会引用SQLALCHEMY_MIGRATE_REPO,移动的数据库文件会放到这个目录下
配置对象中还有一个很有用的选项,即 SQLALCHEMY_COMMIT_ON_TEARDOWN 键,将其设为 True时,每次请求结束后都会自动提交数据库中的变动
完善数据库基本配置之后,在app/__init__.py初始化数据库引擎对象
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config.from_object('config') db = SQLAlchemy(app) from app import views, models
这样就初始化了一个db对象,并且引入models,models就是数据库的描述模版,下面会介绍
2 设计数据库model
如何用Flask-SQLAlchemy描述这张表?app/models.py
from app import db class User(db.Model): id = db.Column(db.Integer, primary_key=True) nickname = db.Column(db.String(64), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True) def __repr__(self): return '<User %r>' % (self.nickname)
3 创建数据库
配置好了,数据库表也定义好了,接下来就是创建数据库。创建一个创建数据库的脚本,放到跟config.py同目录下,命名为db_create.py。先安装SQLAlchemy-migrate
easy_install SQLAlchemy-migrate 安装完成之后,编写db_create.py
#!flask/bin/python from migrate.versioning import api from config import SQLALCHEMY_DATABASE_URI from config import SQLALCHEMY_MIGRATE_REPO from app import db import os.path db.create_all() if not os.path.exists(SQLALCHEMY_MIGRATE_REPO): api.create(SQLALCHEMY_MIGRATE_REPO, 'database repository') api.version_control(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) else: api.version_control(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, api.version(SQLALCHEMY_MIGRATE_REPO))
运行db_create.py创建数据库,python ./db_create.py,创建成功后会生成如下文件
3 数据迁移
models.py更新了,添加一张表,怎么更新数据库,可以把每次数据库结构的变动当作一次数据迁移
#!flask/bin/python import imp from migrate.versioning import api from app import db from config import SQLALCHEMY_DATABASE_URI from config import SQLALCHEMY_MIGRATE_REPO v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) migration = SQLALCHEMY_MIGRATE_REPO + ('/versions/%03d_migration.py' % (v+1)) tmp_module = imp.new_module('old_model') old_model = api.create_model(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) exec(old_model, tmp_module.__dict__) script = api.make_update_script_for_model(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, tmp_module.meta, db.metadata) open(migration, "wt").write(script) api.upgrade(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) v = api.db_version(SQLALCHEMY_DATABASE_URI, SQLALCHEMY_MIGRATE_REPO) print('New migration saved as ' + migration) print('Current database version: ' + str(v))
运行 python ./db_migrate.py
SQLAlchemy-migrate通过对比数据库的结构(从app.db文件读取)和models结构(从app/models.py文件读取)的方式来创建迁移任务,两者之间的差异将作为一个迁移脚本记录在迁移库中,迁移脚本知道如何应用或者撤销一次迁移,所以它可以方便的升级或者降级一个数据库的格式
3 操作实战
更新app/models.py
from app import db class User(db.Model): id = db.Column(db.Integer, primary_key=True) nickname = db.Column(db.String(64), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True) posts = db.relationship('Post', backref='author', lazy='dynamic') def __repr__(self): return '<User %r>' % (self.nickname) class Post(db.Model): id = db.Column(db.Integer, primary_key = True) body = db.Column(db.String(140)) timestamp = db.Column(db.DateTime) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) def __repr__(self): return '<Post %r>' % (self.body)
运行 python ./db_migrate.py,查看数据库磁盘结构
Users这张表并没有posts字段,why?SQLAlchemy运行时会动态向Usrs插入posts字段,这样访问Users.posts会包含这个用户发出的所有文章列表。这里relationship第二个参数backref='author',效果就是系统动态往Post插入author字段,这样访问post.author就能访问到该文章作者的信息了
3 操作数据库
可以在线操作,也可以离线操作,只要在python文件里面
from app import db, models
就可以操作数据库了
from app import db, models
import datetime
@app.route('/testdb_addusers')
def testdb_addusers():
users = { 'Miguel', 'sysnap', 'helxx', 'xxxx'}
for name in users:
mail = name + '@email.com'
u = models.User(nickname=name, email=mail)
db.session.add(u)
p = models.Post(body='my first post!', timestamp=datetime.datetime.utcnow(), author=u)
db.session.add(p)
db.session.commit()
return "add user ok"
@app.route('/testdb_queryall')
def testdb_queryall():
users = models.User.query.all()
for u in users:
print u.email,u.nickname, u.posts.all()
return "xxxx"