一、flask-session
Flask本身的session機制並不安全,將session數據加密后存儲在瀏覽器,但是如果secret_key泄露的話會造成session數據的泄露。
flask-session組件提供了新的session保存機制,把session數據序列化后存儲在數據庫中,鍵是一個uuid,然后將uuid傳給前端,這樣就不用把數據存放在客戶端了。
使用流程:
import redis from flask_session import Session app.config["SESSION_TYPE"] = "redis" app.config["SESSION_REDIS"] = redis.Redis(host="127.0.0.1",port=6379,password="xxx",db=7) Session(app)
二、DBUtils
DBUtils是python用來實現數據庫連接的模塊。有兩種實現方式,一是為每一個線程創建一個連接,二是創建一個連接池。
模式一:

POOL = PersistentDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接) threadlocal=None, # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() conn.close() func()
模式二:

import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): # 檢測當前正在運行連接數的是否小於最大鏈接數,如果不小於則:等待或報raise TooManyConnections異常 # 否則 # 則優先去初始化時創建的鏈接中獲取鏈接 SteadyDBConnection。 # 然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。 # 如果最開始創建的鏈接沒有鏈接,則去創建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。 # 一旦關閉鏈接后,連接就返回到連接池讓后續線程繼續使用。 conn = POOL.connection() # print(th, '鏈接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() func()
Flask中的使用:

class Conf(object): SALT = b"asd" SECRET_KEY = "asdf" POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='ywj971020', database='code_count', charset='utf8' ) conn = Pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) #如果不傳參數,默認返回的數據格式是元祖 cursor.execute(sql) data = cursor.fetchone() data_list = cursor.fetchall() cursor.close() conn.close()
三、wtform
https://www.cnblogs.com/wupeiqi/articles/8202357.html
1、用戶登錄實例

#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): name = simple.StringField( label='用戶名', validators=[ validators.DataRequired(message='用戶名不能為空.'), validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能為空.'), validators.Length(min=8, message='用戶名長度必須大於%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用戶提交數據通過格式驗證,提交的值為:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run() app.py

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登錄</h1> <form method="post"> <!--<input type="text" name="name">--> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <!--<input type="password" name="pwd">--> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html> login.html
2、用戶注冊實例

from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): name = simple.StringField( label='用戶名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能為空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重復密碼', validators=[ validators.DataRequired(message='重復密碼不能為空.'), validators.EqualTo('pwd', message="兩次密碼輸入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='郵箱', validators=[ validators.DataRequired(message='郵箱不能為空.'), validators.Email(message='郵箱格式錯誤') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性別', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='愛好', choices=( (1, '籃球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '籃球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定義pwd_confirm字段規則,例:與pwd字段是否一致 :param field: :return: """ # 最開始初始化時,self.data中已經有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密碼不一致") # 繼續后續驗證 raise validators.StopValidation("密碼不一致") # 不再繼續后續驗證 @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 1}) return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用戶提交數據通過格式驗證,提交的值為:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run() app.py

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用戶注冊</h1> <form method="post" novalidate style="padding:0 50px"> {% for item in form %} <p>{{item.label}}: {{item}} {{item.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html> register.html
3、其他
數據庫數據實時更新:自定義一個__init__方法,因為靜態字段只在程序開始時加載一次,構造方法會實時更新。
設置input框的默認值:在實例化Form對象時傳參:data={'字段名' : '默認值'}
四、SQLAlchemy
https://www.cnblogs.com/wupeiqi/articles/8259356.html
SQLAlchemy是一個ORM框架,ORM是關系對象映射(類對應數據庫的表,類里的字段對應表中的列,對象對應數據庫的行)
1、創建表

import datetime from sqlalchemy import create_engine from sqlalchemy.ext..declarative import declarative_base from sqlalchemy import Column,Datetime Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer,primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) ctime = Column(Datetime,default=datetime.datetime.now) def create_tables(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.create_all(engine) def drop_tables(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, # 超過連接池大小外最多創建的連接 pool_size=5, # 連接池大小 pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯 pool_recycle=-1 # 多久之后對線程池中的線程進行一次連接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == "__main__": create_tables() #drop_tables()
2、基本增刪改查
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) # 每次執行數據庫操作時,都需要創建一個session session = Session() #1.添加 obj = Users(name='xx') session.add(obj) #session.add_all(Users(name='xx'),User(name='xx')) session.commit() #2.查詢 result = session.query(Users).all() result = session.query(Users).filter(Users.id==1).first() #3.刪除 session.query(Users).filter(User.id=1).delete() session.commit() #4.修改 session.query(Users).filter(User.id=1).update({'name':'xx'}) session.query(Users).filter(User.id=1).update({Users.name:'xx'})
3.常用查詢操作

#查詢時只取某些字段 result = session.query(Users.id,Users.name).all() #為查詢的字段起別名 result = session.query(Users.name.label('cname')).all() for item in result: print(item.cname) # 條件 ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() #默認的逗號就是and ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() #between ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() #in ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() #not in ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() #子查詢 from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() #默認的and可以不加and_ ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() #模糊匹配like ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 切片/分頁 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分組 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
給查詢字段起別名:session.query(Users,name.label("n")).all()
4.一對多和多對多

#創建FK字段 class Depart(Base): __tablename__ = 'depart' id = Column(Int, primary_key=True) name = Column(String(32), nullable=False) class Users(Base): __tablename__ = 'users' id = Column(Int, primary_key=True) name = Column(String(32), nullable=False) depart_id = Column(Int, ForeignKey("depart.id")) dp = relationship('Depart', backref='us') #1.查詢用戶名+所在部門 ret = session.query(Users.name, Depart.name).join(Depart).all() #ret = session.query(Users.name, Depart.name).join(Depart, isouter=True).all() #表示使用left join #2.relation字段 查詢用戶名+所在部門 ret = session.query(Users).all() for row in ret: print(ret.name, ret.dp.name) #3.relation字段 查找某個部門所有員工 ret = session.query(Depart).filter(Depart.name='銷售').first() for row in ret.us: print(row.name) #4.添加一個IT部門,同時增加一名員工 user = Users(name='xxx', dp.name='IT') #自動創建IT部門 session.add(user) session.commit() #5.添加一個IT部門,同時增加多名員工 dep = Depart(name='IT') dep.us = [Users(name=xxx), Users(name=xxx)] session.add(dep) session.commit() 一對多示例

#創建表 class Server2Group(Base): __tablename__ = 'server2group' id = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), #創建聯合唯一索引 # Index('ix_id_name', 'name', 'extra'), ) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) # 與生成表結構無關,僅用於查詢方便 servers = relationship('Server', secondary='server2group', backref='groups') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False)
5.兩種連接方式

from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) def task(): session = Session() #從連接池中取一個連接 xxxxx session.close() #把連接返回給連接池 from threading import Thread for i in range(20): t = Thread(target=task) t.start()

from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = scoped_session(Session) #這里相當於是基於threading.Local為每個線程創建獨立的內存空間,去連接池中取連接,這個連接只有當前的線程可以使用。 def task(): xxxxx session.remove() from threading import Thread for i in range(20): t = Thread(target=task) t.start()
6.原生sql

#查詢 cursor = session.execute('select * from xx') result = cursor.fetchall() #添加 cursor = session.execute('insert into xxxxx') session.commit()

conn = engine.raw_connection() cursor = conn.cursor() cursor.execute(xxxx) result = cursor.fetchall() cursor.close() conn.close()