組件一:flask-session
安裝: pip install flask-session
使用方法:先導入 from flask_session import Session
創建一個flask的app,app=Flask(__name__) 將app傳入Session即可 Session(app)
或者先實例化一個session對象 sess = Session() sess.init_app(app)
實例化時,將app對象,傳入,並且覆蓋了原本flask中的session_interface
然后在這個_get_interface中又做了分類。我們只需要在配置文件中配置上連接的類型,即可使用對應的session
比如:app.config["SESSION_TYPE"] = "redis" 就會將原本的session覆蓋成redis的session。
然后,還要有redis的連接,這個覆蓋了session的類為RedisSessionInterface
這個類在實例化的時候,可以傳入四個參數,config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT']
這四個參數分別為redis對象,redis中所有的keys加一個前綴,是否使用簽名對session進行加密,默認為False,最后一個參數為是否使用持久化。
所以我們連接redis就需要設置配置文件 app.config["SESSION_REDIS"] = redis對象 即可
小結:flask中使用flask-session
from redis import Redis app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port=6379) Session(app)
一般情況下,我們會把配置添加到配置文件settings中

from redis import Redis class Config(object): DEBUG = False SECRET_KEY = "asdfasdfasdf" SESSION_TYPE = 'redis' SESSION_REDIS = Redis(host='127.0.0.1', port=6379) class ProductionConfig(Config): DEBUG = False class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True
flask-session的請求流程:請求進來后,在到達視圖函數之前,先執行flask-session中的open_session方法,這個方法中做的事情:先去用戶的cookie中獲取隨機字符串,如果沒有,表示請求是第一次進來,就生成一個隨機字符串sid(str(uuid.uuid4()))並且生成一個特殊的字典,將這個sid放到特殊字典中,然后返回這個特殊的字典,接着執行視圖函數,這是,視圖函數可以向這個特殊的字典中寫入session,在返回響應時,會先執行flask-session中的save_session方法,這個方法中,將這個特殊的字典,先轉化為字典並序列化為一個json字符串 json.dumps(dict(特殊字典)) ,接着將這個值存入redis中 redis.setex(key,val) ,又將這個特殊字典中的sid(隨機字符串) 設置為cookie返回給瀏覽器 response.set_cookie(sid),下次用戶在請求進來,先判斷有無這個隨機字符串,有,就從redis中取出對應的特殊字典,然后就可以從這個特殊字典中獲取設置的session。
組件二:DBUtils
我們使用偏移MySQL時,每次對數據庫的操作就需要連接數據庫,表的增刪改查,然后關閉數據庫,這樣每操作一次,就連接關閉,就會造成性能上的問題。或許會想到使用單例,連接一次,就可以一直使用,這樣單線程應用完全沒有問題,但如果涉及到多線程應用那么就需要加鎖,一旦加鎖那么連接勢必就會排隊等待,當請求比較多時,性能就會降低了。

import pymysql import threading from threading import RLock LOCK = RLock() CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') def task(arg): with LOCK: cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
備注:不加鎖,多線程下會報錯。
DBUtils是Python的一個用於實現數據庫連接池的模塊。
安裝: pip install DBUtils
使用:此連接池有兩種連接模式:
- 模式一:為每個線程創建一個連接,線程即使調用了close方法,也不會關閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接自動關閉。
from DBUtils import PersistentDB POOL = PersistentDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接) threadlocal=None, # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() func()
- 模式二:創建一批連接到連接池供所有線程共享使用。
PS:由於pymysql、MySQLdb等threadsafety值為1,所以該模式連接池中的線程會被所有線程共享。import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 檢測當前正在運行連接數的是否小於最大鏈接數,如果不小於則:等待或報raise TooManyConnections異常 # 否則 # 則優先去初始化時創建的鏈接中獲取鏈接 SteadyDBConnection。 # 然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。 # 如果最開始創建的鏈接沒有鏈接,則去創建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。 # 一旦關閉鏈接后,連接就返回到連接池讓后續線程繼續使用。 conn = POOL.connection() # print(th, '鏈接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
PS: 查看連接 show status like 'Threads%';

import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=20, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=0, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='zhaopanpan', password='qwe123..', database='code', charset='utf8' ) def on_open(cur=pymysql.cursors.DictCursor): conn = POOL.connection() cursor = conn.cursor(cursor=cur) return conn,cursor def on_close(conn,cursor): cursor.close() conn.close() def fetchone(sql,args,cur=pymysql.cursors.DictCursor): """ 獲取單條數據 :param sql: SQL語句 select title from book where id=%s; :param args: SQL中如果有需要傳的參數,則 args=[參數,參數2] ,否則 args=[] :param cur: 表示返回的值默認是以字典的形式返回,如果設置為[]或None表示以元組的形式返回 :return: """ conn,cursor = on_open(cur) cursor.execute(sql, args) result = cursor.fetchone() on_close(conn,cursor) return result def fetchall(sql,args,cur=pymysql.cursors.DictCursor): """ 獲取多條數據 :param sql: :param args: :param cur: :return: """ conn, cursor = on_open(cur) cursor.execute(sql, args) result = cursor.fetchall() on_close(conn, cursor) return result def exec_sql(sql,args,cur=pymysql.cursors.DictCursor): """ 添加/刪除/修改 :param sql: insert into table(%s,%s) values(....) :param args: :param cur: :return: """ conn, cursor = on_open(cur) cursor.execute(sql, args) conn.commit() on_close(conn, cursor)
組件三:wtforms
WTForms是一個支持多個web框架的form組件,主要用於對用戶請求數據進行驗證。django中也可以使用,但是django自帶
安裝:pip install wtforms
用戶登錄注冊示例
1. 用戶登錄
當用戶登錄時候,需要對用戶提交的用戶名和密碼進行多種格式校驗。如:
密碼不能為空;密碼長度必須大於12;密碼必須包含 字母、數字、特殊字符等(自定義正則);

from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): name = simple.StringField( label='用戶名', validators=[ validators.DataRequired(message='用戶名不能為空.'), validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能為空.'), validators.Length(min=8, message='用戶名長度必須大於%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用戶提交數據通過格式驗證,提交的值為:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登錄</h1> <form method="post"> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
2. 用戶注冊
注冊頁面需要讓用戶輸入:用戶名、密碼、密碼重復、性別、愛好等。

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class RegisterForm(Form):
name = simple.StringField(
label='用戶名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)
pwd = simple.PasswordField(
label='密碼',
validators=[
validators.DataRequired(message='密碼不能為空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重復密碼',
validators=[
validators.DataRequired(message='重復密碼不能為空.'),
validators.EqualTo('pwd', message="兩次密碼輸入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='郵箱',
validators=[
validators.DataRequired(message='郵箱不能為空.'),
validators.Email(message='郵箱格式錯誤')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性別',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = core.SelectMultipleField(
label='愛好',
choices=(
(1, '籃球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '籃球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球'))
def validate_pwd_confirm(self, field):
"""
自定義pwd_confirm字段規則,例:與pwd字段是否一致
:param field:
:return:
"""
# 最開始初始化時,self.data中已經有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密碼不一致") # 繼續后續驗證
raise validators.StopValidation("密碼不一致") # 不再繼續后續驗證
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 1})
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用戶提交數據通過格式驗證,提交的值為:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶注冊</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
備注:對於動態的choices,應該重寫其構造方法。

class StudentForm(Form): name = simple.StringField( label='學生姓名', widget=widgets.TextInput(), render_kw={'class': 'form-control', 'placeholder': '請輸入學生姓名'}, validators=[validators.DataRequired(message='學生姓名不能為空')] ) class_id = core.SelectField( label='請選擇班級', choices=(), coerce=int ) def __init__(self, *args, **kwargs): super(StudentForm, self).__init__(*args, **kwargs) self.class_id.choices = operate.fetchall(sql='select id,name from classes', args=[], cur=None)
可以生成csrf標簽

from flask import Flask, render_template, request, redirect, session from wtforms import Form from wtforms.csrf.core import CSRF from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets from hashlib import md5 app = Flask(__name__, template_folder='templates') app.debug = True class MyCSRF(CSRF): """ Generate a CSRF token based on the user's IP. I am probably not very secure, so don't use me. """ def setup_form(self, form): self.csrf_context = form.meta.csrf_context() self.csrf_secret = form.meta.csrf_secret return super(MyCSRF, self).setup_form(form) def generate_csrf_token(self, csrf_token): gid = self.csrf_secret + self.csrf_context token = md5(gid.encode('utf-8')).hexdigest() return token def validate_csrf_token(self, form, field): print(field.data, field.current_token) if field.data != field.current_token: raise ValueError('Invalid CSRF') class TestForm(Form): name = html5.EmailField(label='用戶名') pwd = simple.StringField(label='密碼') class Meta: # -- CSRF # 是否自動生成CSRF標簽 csrf = True # 生成CSRF標簽name csrf_field_name = 'csrf_token' # 自動生成標簽的值,加密用的csrf_secret csrf_secret = 'xxxxxx' # 自動生成標簽的值,加密用的csrf_context csrf_context = lambda x: request.url # 生成和比較csrf標簽 csrf_class = MyCSRF # -- i18n # 是否支持本地化 # locales = False locales = ('zh', 'en') # 是否對本地化進行緩存 cache_translations = True # 保存本地化緩存信息的字段 translations_cache = {} @app.route('/index/', methods=['GET', 'POST']) def index(): if request.method == 'GET': form = TestForm() else: form = TestForm(formdata=request.form) if form.validate(): print(form) return render_template('index.html', form=form) if __name__ == '__main__': app.run()
實例化流程分析

# 源碼流程 1. 執行type的 __call__ 方法,讀取字段到靜態字段 cls._unbound_fields 中; meta類讀取到cls._wtforms_meta中 2. 執行構造方法 a. 循環cls._unbound_fields中的字段,並執行字段的bind方法,然后將返回值添加到 self._fields[name] 中。 即: _fields = { name: wtforms.fields.core.StringField(), } PS:由於字段中的__new__方法,實例化時:name = simple.StringField(label='用戶名'),創建的是UnboundField(cls, *args, **kwargs),當執行完bind之后,才變成執行 wtforms.fields.core.StringField() b. 循環_fields,為對象設置屬性 for name, field in iteritems(self._fields): # Set all the fields to attributes so that they obscure the class # attributes with the same names. setattr(self, name, field) c. 執行process,為字段設置默認值:self.process(formdata, obj, data=data, **kwargs) 優先級:obj,data,formdata; 再循環執行每個字段的process方法,為每個字段設置值: for name, field, in iteritems(self._fields): if obj is not None and hasattr(obj, name): field.process(formdata, getattr(obj, name)) elif name in kwargs: field.process(formdata, kwargs[name]) else: field.process(formdata) 執行每個字段的process方法,為字段的data和字段的raw_data賦值 def process(self, formdata, data=unset_value): self.process_errors = [] if data is unset_value: try: data = self.default() except TypeError: data = self.default self.object_data = data try: self.process_data(data) except ValueError as e: self.process_errors.append(e.args[0]) if formdata: try: if self.name in formdata: self.raw_data = formdata.getlist(self.name) else: self.raw_data = [] self.process_formdata(self.raw_data) except ValueError as e: self.process_errors.append(e.args[0]) try: for filter in self.filters: self.data = filter(self.data) except ValueError as e: self.process_errors.append(e.args[0]) d. 頁面上執行print(form.name) 時,打印標簽 因為執行了: 字段的 __str__ 方法 字符的 __call__ 方法 self.meta.render_field(self, kwargs) def render_field(self, field, render_kw): other_kw = getattr(field, 'render_kw', None) if other_kw is not None: render_kw = dict(other_kw, **render_kw) return field.widget(field, **render_kw) 執行字段的插件對象的 __call__ 方法,返回標簽字符串
鈎子函數
校驗單個字段,可以使用 def validate_字段名(filed) 可以通過 field.data 獲得輸入的值。
校驗全局使用 validate

a. 執行form的validate方法,獲取鈎子方法 def validate(self): extra = {} for name in self._fields: inline = getattr(self.__class__, 'validate_%s' % name, None) if inline is not None: extra[name] = [inline] return super(Form, self).validate(extra) b. 循環每一個字段,執行字段的 validate 方法進行校驗(參數傳遞了鈎子函數) def validate(self, extra_validators=None): self._errors = None success = True for name, field in iteritems(self._fields): if extra_validators is not None and name in extra_validators: extra = extra_validators[name] else: extra = tuple() if not field.validate(self, extra): success = False return success c. 每個字段進行驗證時候 字段的pre_validate 【預留的擴展】 字段的_run_validation_chain,對正則和字段的鈎子函數進行校驗 字段的post_validate【預留的擴展】