form表單及數據庫表操作
1、form表單驗證 wtforms
需要下載包 pip3 install wtforms
簡單使用:

from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms import validators from wtforms.fields import simple from wtforms import widgets app = Flask(__name__, template_folder='templates') # app.debug = True class LoginForm(Form): name = simple.StringField( # 標簽名 label='用戶名', validators=[ # 必須要有,錯誤信息 validators.DataRequired(message='用戶名不能為空!'), # 校驗規則 validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d') ], widget=widgets.TextInput(), # 頁面上顯示的類型text或者password render_kw={'class': 'outter'} # 給標簽設置類屬性 ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能為空!'), validators.Length(min=8, message='密碼長度必須大於%(min)d'), # 可以使用正則 validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'password'} ) @app.route('/login', methods=['POST', 'GET']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用戶提交的是:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登錄</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
常規使用:

from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True # 需要用什么字段導什么字段來定義 class RegisterForm(Form): # 自定義鈎子校驗,也可以用validators.EqualTo直接校驗 def validate_pwd_confirm (self, field): """ 自定義pwd_confirm字段規則,例:與pwd字段是否一致 :param field: :return: """ # 最開始初始化時,self.data中已經有所有的值 if field.data != self.data['pwd']: raise validators.ValidationError("密碼不一致") # 繼續后續驗證 #raise validators.StopValidation("密碼不一致123123123") # 不再繼續后續驗證 name = simple.StringField( label='用戶名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='zack' ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能為空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重復密碼', validators=[ #validators.DataRequired(message='重復密碼不能為空.'), validate_pwd_confirm, # 校驗等於pwd validators.EqualTo('pwd', message="兩次密碼輸入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='郵箱', validators=[ validators.DataRequired(message='郵箱不能為空.'), validators.Email(message='郵箱格式錯誤') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性別', choices=( (1, '男'), (2, '女'), ), coerce=int # “1” “2” ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='愛好', choices=( (1, '籃球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '籃球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) # def __init__(self, *args, **kwargs): # super(RegisterForm, self).__init__(*args, **kwargs) # self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球')) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm() # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用戶提交數據通過格式驗證,提交的值為:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run()

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶注冊</h1> <form method="post" novalidate style="padding:0 50px"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
2、flask的ORM sqlalchemy
pip3 install sqlalchemy
SQLAlchemy是一個基於Python實現的ORM框架。該框架建立在 DB API之上,使用關系對象映射進行數據庫操作,將類和對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果
1、ORM簡單使用
1、創建和刪除表
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String Base = declarative_base() class User(Base): __tablename__ = 'user' # 數據庫表名 id = Column(Integer, primary_key=True) # 主鍵 name = Column(String(32), index=True, nullable=False) # 索引,不能為空 age = Column(Integer) def __repr__(self): return self.name # 根據類創建數據庫表 def init_db(): engine = create_engine( # 數據庫名+鏈接數據庫://root:密碼@ip:port/數據庫名?charset=字符集 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8', max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) # 根據類刪除數據庫表 def drop_db(): engine = create_engine( # 數據庫名+鏈接數據庫://root:密碼@ip:port/數據庫名?charset=字符集 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8', max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_db() # 刪除表 init_db() # 創建表
2、給表添加數據
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import User # 1 連接引擎 engine = create_engine( 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8', ) Connection = sessionmaker(bind=engine) # 2 每次執行數據庫操作時都要連接,后面一般用session=Connection() conn = Connection() # 1 單增 # 創建數據 obj = User(name='Immy', age=18) # 將數據添加到表中 conn.add(obj) # 2 群增 conn.add_all([ User(name='zack', age=18), User(name='vicky', age=20), # 或者其他表也行 # Order(name='aaa', price=100) ]) # 3 刪除(整體刪) conn.query(User).delete() # 4 改(傳字典的形式) # 方式1:群改 conn.query(User).update({'name':'tank', 'age': 17}) # 方式2:群改,類似Django的f查詢,加字符串的時候必需要synchronize_session=False conn.query(User).update({User.name:User.name+' is sb', 'age': 12},synchronize_session=False) # 方式3:加數字的時候可以直接加,也可以設置synchronize_session=False conn.query(User).update({User.age:User.age+10}) # 5 查(查不需要commit,也能拿到結果) # 打印SQL語句 res = conn.query(User) print(res) # 查所有,得到列表 res = conn.query(User).all() print(res) # 查單條記錄 res = conn.query(User).first() print(res.age) # 查詢哪些字段,.label並將字段取別名隱藏本身名字 res = conn.query(User.age, User.name.label('yhm')).first() print(res.age, res.yhm) # 6 過濾用filter(傳表達式)或者用filter_by(傳參數) res = conn.query(User).filter(User.name == 'zack').first() res = conn.query(User).filter_by(name='zack').first() print(res) # 利用filter過濾修改 conn.query(User).filter(User.name == 'vicky').update({User.name:'wxm', 'age': 1}) res = conn.query(User).filter(User.name == 'wxm').first() print(res.age, res.name) # 3 必須提交才能生效 conn.commit() # 4 關閉連接,將連接放回連接池 conn.close()
3、單表查詢
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import User engine = create_engine( 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8' ) Connection = sessionmaker(bind=engine) # 一般都用session session = Connection() # 1 表達式的and條件用 , 連接 ret = session.query(User).filter(User.name=='zack', User.age==18).first() print(ret) # 2 表達式的between條件 ret = session.query(User).filter(User.age.between(18, 20)).all() print(ret) # 3 sql查詢的in_操作,相當於Django中的__in ret = session.query(User).filter(User.id.in_([7,8,9])).all() print(ret) # 4 查詢取反 ~ ret = session.query(User).filter(~User.id.in_([7,8])).all() print(ret) # 5 or查詢and查詢,or_,and_需要導入 from sqlalchemy import or_, and_ ret = session.query(User).filter(or_(User.id == 7, User.name == 'wxm')).all() res = session.query(User).filter(and_(User.id == 7, User.name == 'Immy')).all() print(ret, res) # or_與and_聯合使用 ret =session.query(User).filter(or_(User.id == 7, and_(User.age==18, User.name=='zack'))).all() print(ret) # 6 like查詢 # 必須以I開頭 ret = session.query(User).filter(User.name.like("I%")).all() print(ret) # 第二個字母是m,_m ret = session.query(User).filter(User.name.like("_m%")).all() print(ret) # 不以I開頭的,~ ret = session.query(User).filter(~User.name.like("I%")).all() print(ret) # 7 排序,order_by # 降序desc ret = session.query(User).filter(User.id>1).order_by(User.id.desc()).all() print(ret) # 升序,asc ret = session.query(User).filter(User.id>1).order_by(User.id.asc()).all() print(ret) # 先升序再降序,用 , 隔開 # 先按年齡升序如果有相同年齡的再按id降序 ret =session.query(User).filter(User.id>=1).order_by(User.age.asc(), User.id.desc()).all() print(ret) # 8 分組查詢 # 按照年齡分組 ret = session.query(User).group_by(User.name).all() print(ret) # 分組后要聚合操作需要用func from sqlalchemy.sql import func # 選出年齡最小大於等於18的組 ret = session.query(User).group_by(User.name).having(func.min(User.age) >= 18).all() print(ret) # 選出組內最小年紀大於等於18的組,查詢組內的最小年齡與最大年齡的和與名字 ret = session.query(User.name, func.min(User.age), func.max(User.age), func.sum(User.age) ).group_by(User.name).having(func.min(User.age) >= 18).all() print(ret)
2、ORM表關系
1、一對多

from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship Base = declarative_base() class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) catption = Column(String(32), default='洗腳') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32)) # 外鍵hobby值tablename而不是Hobby類名, hobby_id = Column(Integer, ForeignKey('hobby.id')) # 更新數據庫沒有關系,不會增加新字段,只能用於快速連表查詢 # relationship的第一個參數,是類名,第二個參數backref,用於反向查詢 hobby = relationship("Hobby", backref="pres") def __repr__(self): return self.name # 根據類創建數據庫表 def init_db(): engine = create_engine( # 數據庫名+鏈接數據庫://root:密碼@ip:port/數據庫名?charset=字符集 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8', max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) # 根據類刪除數據庫表 def drop_db(): engine = create_engine( # 數據庫名+鏈接數據庫://root:密碼@ip:port/數據庫名?charset=字符集 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8', max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_db() # 刪除表 init_db() # 創建表
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Hobby, Person engine = create_engine( 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8' ) Connection = sessionmaker(bind=engine) # 一般都用session session = Connection() # 1 單獨給兩張表添加數據,不使用關聯關系 session.add_all([ Hobby(catption='吃雞'), Hobby(catption='學習'), Person(name='vicky', hobby_id=1), Person(name='zack', hobby_id=2) ]) # 2 用關聯關系 添加 person = Person(name='Mr沈', hobby=Hobby(catption='學習')) session.add(person) hobby = Hobby(catption='旅游') hobby.pres = [Person(name='Immy'), Person(name='zack')] session.add(hobby) # 3 正向查詢 pr = session.query(Person).filter(Person.name=='vicky').first() print(pr) print(pr.hobby.catption) # 4 反向查詢 ver = session.query(Hobby).filter(Hobby.catption=='旅游').first() print(ver.catption) print(ver.pres) # 5 如果不用relationship連表,我們自己連表查詢,isouter=True表示是left join,不填默認為inner join person_list = session.query(Hobby).join(Person, Person.hobby_id == Hobby.id, isouter=True).all() print(person_list) session.commit() session.close()
2、多對多

from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship Base = declarative_base() # 一個男孩可以喜歡多個女孩,一個女孩也可以喜歡多個男孩 class Boy2Girl(Base): __tablename__ = "boy2girl" id = Column(Integer, primary_key=True) girl_id = Column(Integer, ForeignKey("girl.id")) boy_id = Column(Integer, ForeignKey("boy.id")) class Girl(Base): __tablename__ = "girl" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) def __repr__(self): return self.name class Boy(Base): __tablename__ = "boy" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) #secondary=boy2girl 中間表的表名 girl = relationship("Girl",secondary="boy2girl",backref = "boys") def __repr__(self): return self.name # 根據類創建數據庫表 def init_db(): engine = create_engine( # 數據庫名+鏈接數據庫://root:密碼@ip:port/數據庫名?charset=字符集 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8', max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) # 根據類刪除數據庫表 def drop_db(): engine = create_engine( # 數據庫名+鏈接數據庫://root:密碼@ip:port/數據庫名?charset=字符集 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8', max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # drop_db() # 刪除表 init_db() # 創建表
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Boy, Girl, Boy2Girl engine = create_engine( 'mysql+pymysql://root:2694@127.0.0.1:3306/py13?charset=utf8' ) Connection = sessionmaker(bind=engine) # 一般都用session session = Connection() # 利用關聯添加數據 boy = Boy(name='Zack') boy.girl=[Girl(name='vicky'), Girl(name='wxm')] session.add(boy) session.commit() girl = Girl(name='vicky') girl.boys=[Boy(name='Mr沈'), Boy(name='zack')] session.add(girl) session.commit() # 使用relationship的關系,正向查 b = session.query(Boy).filter(Boy.name == 'zack').first() print(b.name) print(b.girl) # 使用relationship的關系,反向查 g = session.query(Girl).filter(Girl.name=='vicky').first() print(g.name) print(g.boys)
3、Flask-SQLAlchemy與
flask_sqlalchemy是flask和SQLAchemy的管理者,通過他把他們做連接的ORM
要用就必須先安裝。
所有的到導入都找 下面的db
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
flask_migrate數據庫遷移命令
命令:manager.add_command('db1', MigrateCommand) # 1 當項目第一次執行遷移的時候。 python3 manage.py db1 init # 只需要初始化一次 python3 manage.py db1 migrate # 等同於django的makemigrations python3 manage.py db1 upgrade # 等同於django的migrate