flask-models 操作


flask-models 操作:

常用的SQLAlchemy關系選項

  • 選項名 說明
    backref 在關系的另一模型中添加反向引用
    primary join 明確指定兩個模型之間使用的聯結條件
    uselist 如果為False,不使用列表,而使用標量值
    order_by 指定關系中記錄的排序方式
    secondary 指定多對多中記錄的排序方式
    secondary join 在SQLAlchemy中無法自行決定時,指定多對多關系中的二級聯結條件

示例:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)


class Config(object):
    """配置參數"""
    # sqlalchemy的配置參數
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask"
    # 設置成 True,SQLAlchemy 將會追蹤對象的修改並且發送信號。這需要額外的內存, 如果不必要的可以禁用它。
    SQLALCHEMY_TRACK_MODIFICATIONS = True

# 實例化SQLAlchemy對象
app.config.from_object(Config)

# 創建數據庫sqlalchemy工具對象
db = SQLAlchemy(app)
# 創建數據庫模型類

class Role(db.Model):
    """角色表"""
    __tablename__ = "tbl_roles"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    users = db.relationship("User", backref="role")


class User(db.Model):
    """用戶表"""
    __tablename__ = "tbl_users"     # 指明數據庫的表名

    id = db.Column(db.Integer, primary_key=True)    # 整型的主鍵, 會默認設置為自增主鍵
    name = db.Column(db.String(64), unique=True)
    email = db.Column(db.String(128))
    password = db.Column(db.String(128))
    role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))

數據庫基本操作:

    1.Flask-SQLAlchemy中,插入、修改、刪除操作,均由數據庫會話管理。會話用db.session表示。在准備把數據寫入數據庫前,要先將數據添加到會話中然后調用commit()方法提交會話
    
    2.數據庫會話是為了保證數據的一致性,避免因部分更新導致數據不一致。提交操作把會話對象全部寫入數據庫,如果寫入過程發生錯誤,整個會話都會失效。
    
    3.數據庫會話也可以回滾,通過db.session.rollback()方法,實現會話提交數據前的狀態
    
    4.在Flask-SQLAlchemy中,查詢操作是通過query對象操作數據。最基本的查詢是返回表中所有數據,可以通過過濾器進行更精確的數據庫查詢。

增刪查改:

創建表:
	db.create_all()
    
刪除表:
	db.drop_all()
    
    
插入數據:
	 # 創建對象
        ro1 = Role(name='admin')
        # session記錄對象任務
        db.session.add(ro1)
        # 提交任務到數據庫
        db.session.commit()
        
插入多條數據:
	us1 = User(name='wang', email='wang@163.com', password='123456', role_id=ro1.id)
    us2 = User(name='zhang', email='zhang@163.com', password='201512', role_id=ro2.id)
    us3 = User(name='chen', email='chen@126.com', password='987654', role_id=ro2.id)
    us4 = User(name='zhou', email='zhou@163.com', password='456789', role_id=ro1.id)

    # 一次保存多條數據
    db.session.add_all([us1, us2, us3, us4])
    db.session.commit()

查詢:

filter_by精確查詢:
    User.query.filter_by(name="wang").all()
    
    --> User 列表
    
    取值:
    	user = User.query.filter_by(name="wang").all()
        
        user[0].name

查詢過濾器:

  • 過濾器 說明
    filter() 把過濾器添加到原查詢上,返回一個新查詢
    filter_by() 把等值過濾器添加到原查詢上,返回一個新查詢
    limit 使用指定的值限定原查詢返回的結果
    offset() 偏移原查詢返回的結果,返回一個新查詢
    order_by() 根據指定條件對原查詢結果進行排序,返回一個新查詢
    group_by() 根據指定條件對原查詢結果進行分組,返回一個新查詢

SQLAlchemy查詢執行器

  • 方法 說明
    all() 以列表形式返回查詢的所有結果
    first() 返回查詢的第一個結果,如果未查到,返回None
    first_or_404() 返回查詢的第一個結果,如果未查到,返回404
    get() 返回指定主鍵對應的行,如不存在,返回None
    get_or_404() 返回指定主鍵對應的行,如不存在,返回404
    count() 返回查詢結果的數量
    paginate() 返回一個Paginate對象,它包含指定范圍內的結果
first() : 返回查詢到的第一個對象
    
   user = User.query.first()

all()  :  返回查詢到的所有對象
    
    User.query.all()
    
 
filter 模糊查詢,返回名字結尾字符為g的所有數據。
	
     users = User.query.filter(User.name.endswith("g")).all()
get(),參數為主鍵,如果主鍵不存在沒有返回內容
	User.query.get(1)
    
邏輯非,返回名字不等於wang的所有數據
	user.query.filter(User.name!="wang").all()
    
邏輯與,需要導入and,返回and()條件滿足的所有數據
	
    from sqlalchemy import and_
     User.query.filter(and_(User.name!="wang",User.email.endswith("163.com"))).all()
        
邏輯或,需要導入or_ :
    from sqlalchemy import or_
>>> User.query.filter(or_(User.name!="wang", User.email.endswith("163.com"))).all()

not_ 相當於取反:
    from sqlalchemy import not_
>>> User.query.filter(not_(User.name=="chen")).all()

db.sessiom 查詢:

>>> db.session.query(Role).all()
[<Role 1>, <Role 2>]
>>> db.session.query(Role).get(2)
<Role 2>
>>> db.session.query(Role).first()
取不到數據返回None

    >>> user= User.query.get(5)
    >>> user
    >>> type(user)
    <class 'NoneType'>

offset偏移:
    >>> users = User.query.offset(2).all()
    >>> users[0].name
    'chen'

limit:

 users = User.query.offset(1).limit(2).all()
    >>> users[0].name
    'zhang'
    >>> users[1].name
    'chen'

order_by:

>>> users = User.query.order_by(User.id.desc()).all()
>>> users
[<User 4>, <User 3>, <User 2>, <User 1>]
>>> users[0].name
'zhou'

group_by:

>>> from sqlalchemy import func
>>> db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id)
<flask_sqlalchemy.BaseQuery object at 0x0000025DBA5B0CC0>
>>> db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).all()
[(1, 2), (2, 2)]

表內添加__repr__ :

class User(db.Model):
    """用戶表"""
    __tablename__ = "tbl_users"     # 指明數據庫的表名

    id = db.Column(db.Integer, primary_key=True)    # 整型的主鍵, 會默認設置為自增主鍵
    name = db.Column(db.String(64), unique=True)
    email = db.Column(db.String(128))
    password = db.Column(db.String(128))
    role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))

    def __repr__(self):
        return "User object:%s" % self.name

關聯查詢:

>>> user = User.query.get(1)
>>> user
User object:wang
>>> user.role_id
1

更新數據庫:

first:
    >>> user = User.query.get(4)
    >>> user
    User object:zhou
    >>> user.name = "test"
    >>> db.session.add(user)
    >>> db.session.commit()
    >>> User.query.get(4)
    User object:test
second:
    >>> user = User.query.filter_by(id=1).update({"name":"test1"})
    >>> db.session.commit()
    >>> User.query.get(1)
    User object:test1

刪除:

>>> user =User.query.get(1)
>>> db.session.delete(user)
>>> db.session.commit()

案列:

from flask import Flask, render_template, request, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired

app = Flask(__name__)

class Config(object):
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask"
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    SECRET_KEY = "sahq28y1qhihsd0-121ewq"

# 實例化SQLAlchemy對象
app.config.from_object(Config)

# 創建數據庫sqlalchemy工具對象
db = SQLAlchemy(app)

# 定義數據庫的模型
class Author(db.Model):
    """作者"""
    __tablename__ = "tbl_authors"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(32), unique=True)
    books = db.relationship("Book", backref="author")


class Book(db.Model):
    """書籍"""
    __tablename__ = "tbl_books"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    author_id = db.Column(db.Integer, db.ForeignKey("tbl_authors.id"))

# 創建表單模型類
class AuthorBookForm(FlaskForm):
    """作者數據表單模型類"""
    author_name = StringField(label="作者", validators=[DataRequired("作者必填")])
    book_name = StringField(label="書籍", validators=[DataRequired("書籍必填")])
    submit = SubmitField(label="保存")

@app.route("/index", methods=["POST", "GET"])
def index():
    form = AuthorBookForm()
    if form.validate_on_submit():
        author_name = form.author_name.data
        book_name = form.book_name.data

        author = Author(name=author_name)
        db.session.add(author)
        db.session.commit()

        book = Book(name=book_name, author_id=author.id)
        # book = Book(name=book_name, author=author)
        db.session.add(book)
        db.session.commit()

    authors = Author.query.all()
    return render_template("author_book.html", form=form, authors=authors)

@app.route("/delete_book")
def delete_book():
    book_id = request.args.get("book_id")
    book = Book.query.get(book_id)
    author_id = book.id
    author = Author.query.get(author_id)
    db.session.delete(book)
    db.session.commit()

    db.session.delete(author)
    db.session.commit()
    return redirect(url_for("index"))

if __name__ == '__main__':
    # db.drop_all()
    # db.create_all()
    # au_xi = Author(name='我吃西紅柿')
    # au_qian = Author(name='蕭潛')
    # au_san = Author(name='唐家三少')
    # db.session.add_all([au_xi, au_qian, au_san])
    # db.session.commit()
    #
    # bk_xi = Book(name='吞噬星空', author_id=au_xi.id)
    # bk_xi2 = Book(name='寸芒', author_id=au_qian.id)
    # bk_qian = Book(name='飄渺之旅', author_id=au_qian.id)
    # bk_san = Book(name='冰火魔廚', author_id=au_san.id)
    # db.session.add_all([bk_xi, bk_xi2, bk_qian, bk_san])
    db.session.commit()

    app.run(debug=True)
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <meta http-equiv='Content-type' content='text/htm'>
</head>
<body>
<form method="post">
<p>添加作者和書籍</p>
{{ form.csrf_token }}
{{ form.author_name.label }}
{{ form.author_name }}
{{ form.author_name.errors.0 }}
<br/>
{{ form.book_name.label }}
{{ form.book_name }}
{{ form.book_name.errors.0 }}
<br/>
{{ form.submit }}
<br/>
</form>
<hr/>

{% for author in authors %}
{{ author.name }}
    <ul>
    {% for book in author.books %} <a href="/delete_book?book_id={{ book.id }}">刪除</a>
    <li>{{ book.name }}</li>
    {% endfor %}
    </ul>
{% endfor %}


</body>
</html>

數據庫遷移:

    Flask中可以使用Flask-Migrate擴展,來實現數據遷移。並且集成到Flask-Script中,所有操作通過命令就能完成。

    為了導出數據庫遷移命令,Flask-Migrate提供了一個MigrateCommand類,可以附加到flask-script的manager對象上
pip3 install flask-migrate

_migration.py:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

app = Flask(__name__)


class Config(object):
    SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask"
    SQLALCHEMY_TRACK_MODIFICATIONS = True

app.config.from_object(Config)

# 創建sqlalchemy的數據庫連接對象
db = SQLAlchemy(app)

# 創建flask腳本管理工具對象
manager = Manager(app)

# 創建數據庫遷移工具對象
Migrate(app, db)

# 向manager對象中添加數據庫的操作命令
manager.add_command("db", MigrateCommand)


# 定義模型Role
class Role(db.Model):
    # 定義表名
    __tablename__ = 'roles'
    # 定義列對象
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)

    def __repr__(self):
        return 'Role:'.format(self.name)


# 定義用戶
class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, index=True)

    def __repr__(self):
        return 'User:'.format(self.username)


if __name__ == '__main__':
    manager.run()
#這個命令會創建migrations文件夾,所有遷移文件都放在里面。
python  _migrate.py db init

創建遷移腳本:

自動創建遷移腳本有兩個函數:
    upgrade()函數把遷移中的改動應用到數據庫中。
    downgrade()函數則將改動刪除
    
    自動創建的遷移腳本會根據模型定義和數據庫當前狀態的差異,生成upgrade()和downgrade()函數的內容。對比不一定完全正確,有可能會遺漏一些細節,需要進行檢查
創建自動遷移腳本
python  _migrate.py db migrate -m 'initial migration'     # -m 表示備注
更新數據庫

python  _migrate.py db upgrade
數據庫里已經存在數據表了,如果需要回到之前的遷移版本,使用回退命令

    回退數據庫時,需要指定回退版本號,由於版本號是隨機字符串,為避免出錯,建議先使用python _migrate.py db history命令查看歷史版本的具體版本號,然后復制具體版本號執行回退
    
    python _migrate.py db downgrade 版本號

發送郵件:

    開發過程中,很多應用程序都需要通過郵件提醒用戶,Flask的擴展包Flask-Mail通過包裝了Python內置的smtplib包,可以用在Flask程序中發送郵件
    
    Flask-Mail連接到簡單郵件協議(Simple Mail Transfer Protocol,SMTP)服務器,並把郵件交給服務器發送
開啟QQ郵箱SMTP服務設置,發送郵件。

from flask import Flask
from flask_mail import Mail, Message

app = Flask(__name__)
#配置郵件:服務器/端口/傳輸層安全協議/郵箱名/密碼
app.config.update(
    DEBUG = True,
    MAIL_SERVER='smtp.qq.com',
    MAIL_PROT=465,
    MAIL_USE_TLS = True,
    MAIL_USERNAME = 'xxxxxxxx@qq.com',
    MAIL_PASSWORD = 'xxxxxx',
)

mail = Mail(app)
@app.route('/')
def index():
 # sender 發送方,recipients 接收方列表
    msg = Message("This is a test ",sender='11111@qq.com', recipients=['aaaaaa@163.com','11111@qq.com'])
    #郵件內容
    msg.body = "Flask test mail"
    #發送郵件
    mail.send(msg)
    print "Mail sent"
    return "Sent Succeed"

if __name__ == "__main__":
    app.run()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM