Python 21 Flask(三)第三方組件


一、flask-session

Flask本身的session機制並不安全,將session數據加密后存儲在瀏覽器,但是如果secret_key泄露的話會造成session數據的泄露。

flask-session組件提供了新的session保存機制,把session數據序列化后存儲在數據庫中,鍵是一個uuid,然后將uuid傳給前端,這樣就不用把數據存放在客戶端了。

使用流程:

import redis
from flask_session import Session

app.config["SESSION_TYPE"] = "redis"
app.config["SESSION_REDIS"] = redis.Redis(host="127.0.0.1",port=6379,password="xxx",db=7)
Session(app)

 

二、DBUtils

DBUtils是python用來實現數據庫連接的模塊。有兩種實現方式,一是為每一個線程創建一個連接,二是創建一個連接池。

模式一:

POOL = PersistentDB(
    creator=pymysql,  # 使用鏈接數據庫的模塊
    maxusage=None,  # 一個鏈接最多被重復使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接)
    threadlocal=None,  # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()
View Code

模式二:

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用鏈接數據庫的模塊
    maxconnections=6,  # 連接池允許的最大連接數,0和None表示不限制連接數
    mincached=2,  # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
    maxcached=5,  # 鏈接池中最多閑置的鏈接,0和None不限制
    maxshared=3,  # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
    blocking=True,  # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
    maxusage=None,  # 一個鏈接最多被重復使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)


def func():
    # 檢測當前正在運行連接數的是否小於最大鏈接數,如果不小於則:等待或報raise TooManyConnections異常
    # 否則
    # 則優先去初始化時創建的鏈接中獲取鏈接 SteadyDBConnection。
    # 然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。
    # 如果最開始創建的鏈接沒有鏈接,則去創建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。
    # 一旦關閉鏈接后,連接就返回到連接池讓后續線程繼續使用。
    conn = POOL.connection()

    # print(th, '鏈接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()


func()
View Code

 Flask中的使用:

class Conf(object):
    SALT = b"asd"
    SECRET_KEY = "asdf"

    POOL = PooledDB(
        creator=pymysql,  # 使用鏈接數據庫的模塊
        maxconnections=6,  # 連接池允許的最大連接數,0和None表示不限制連接數
        mincached=2,  # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
        maxcached=5,  # 鏈接池中最多閑置的鏈接,0和None不限制
        maxshared=3,
        # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
        blocking=True,  # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
        maxusage=None,  # 一個鏈接最多被重復使用的次數,None表示無限制
        setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='ywj971020',
        database='code_count',
        charset='utf8'
    )

conn = Pool.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  #如果不傳參數,默認返回的數據格式是元祖
cursor.execute(sql)
data = cursor.fetchone()
data_list = cursor.fetchall()
cursor.close()
conn.close()
Flask

 

三、wtform

https://www.cnblogs.com/wupeiqi/articles/8202357.html

1、用戶登錄實例

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True


class LoginForm(Form):
    name = simple.StringField(
        label='用戶名',
        validators=[
            validators.DataRequired(message='用戶名不能為空.'),
            validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d')
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'}

    )
    pwd = simple.PasswordField(
        label='密碼',
        validators=[
            validators.DataRequired(message='密碼不能為空.'),
            validators.Length(min=8, message='用戶名長度必須大於%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )



@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用戶提交數據通過格式驗證,提交的值為:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)

if __name__ == '__main__':
    app.run()

app.py
app.py
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>登錄</h1>
<form method="post">
    <!--<input type="text" name="name">-->
    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

    <!--<input type="password" name="pwd">-->
    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
    <input type="submit" value="提交">
</form>
</body>
</html>

login.html
login.html

2、用戶注冊實例

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
    name = simple.StringField(
        label='用戶名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='alex'
    )

    pwd = simple.PasswordField(
        label='密碼',
        validators=[
            validators.DataRequired(message='密碼不能為空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重復密碼',
        validators=[
            validators.DataRequired(message='重復密碼不能為空.'),
            validators.EqualTo('pwd', message="兩次密碼輸入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='郵箱',
        validators=[
            validators.DataRequired(message='郵箱不能為空.'),
            validators.Email(message='郵箱格式錯誤')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性別',
        choices=(
            (1, ''),
            (2, ''),
        ),
        coerce=int
    )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='愛好',
        choices=(
            (1, '籃球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '籃球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '籃球'), (2, '足球'), (3, '羽毛球'))

    def validate_pwd_confirm(self, field):
        """
        自定義pwd_confirm字段規則,例:與pwd字段是否一致
        :param field: 
        :return: 
        """
        # 最開始初始化時,self.data中已經有所有的值

        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密碼不一致") # 繼續后續驗證
            raise validators.StopValidation("密碼不一致")  # 不再繼續后續驗證


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 1})
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用戶提交數據通過格式驗證,提交的值為:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)



if __name__ == '__main__':
    app.run()

app.py
app.py
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>用戶注冊</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for item in form %}
    <p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>
</html>

register.html
register.html

3、其他

數據庫數據實時更新:自定義一個__init__方法,因為靜態字段只在程序開始時加載一次,構造方法會實時更新。

設置input框的默認值:在實例化Form對象時傳參:data={'字段名' : '默認值'}

 

四、SQLAlchemy

https://www.cnblogs.com/wupeiqi/articles/8259356.html

SQLAlchemy是一個ORM框架,ORM是關系對象映射(類對應數據庫的表,類里的字段對應表中的列,對象對應數據庫的行)

1、創建表

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext..declarative import declarative_base
from sqlalchemy import Column,Datetime

Base = declarative_base()

class User(Base):

    __tablename__ = 'users'

    id = Column(Integer,primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    ctime = Column(Datetime,default=datetime.datetime.now)

def create_tables():
        engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超過連接池大小外最多創建的連接
        pool_size=5,  # 連接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,否則報錯
        pool_recycle=-1  # 多久之后對線程池中的線程進行一次連接的回收(重置)
    )

    Base.metadata.create_all(engine)

def drop_tables():
        engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超過連接池大小外最多創建的連接
        pool_size=5,  # 連接池大小
        pool_timeout=30,  # 池中沒有線程最多等待的時間,否則報錯
        pool_recycle=-1  # 多久之后對線程池中的線程進行一次連接的回收(重置)
    )

    Base.metadata.drop_all(engine)

if __name__ == "__main__":
    create_tables()
    #drop_tables()
創建表

2、基本增刪改查

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
  
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
  
# 每次執行數據庫操作時,都需要創建一個session
session = Session()

#1.添加
obj = Users(name='xx')
session.add(obj)
#session.add_all(Users(name='xx'),User(name='xx'))
session.commit()

#2.查詢
result = session.query(Users).all()
result = session.query(Users).filter(Users.id==1).first()

#3.刪除
session.query(Users).filter(User.id=1).delete()
session.commit()

#4.修改
session.query(Users).filter(User.id=1).update({'name':'xx'})
session.query(Users).filter(User.id=1).update({Users.name:'xx'})

 3.常用查詢操作

#查詢時只取某些字段
result = session.query(Users.id,Users.name).all()

#為查詢的字段起別名
result = session.query(Users.name.label('cname')).all()
for item in result:
    print(item.cname)


# 條件
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()    #默認的逗號就是and
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()    #between
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()    #in
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()    #not in
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()    #子查詢
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()    #默認的and可以不加and_
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()

#模糊匹配like
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 切片/分頁
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分組
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 組合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
View Code

給查詢字段起別名:session.query(Users,name.label("n")).all()

4.一對多和多對多

#創建FK字段
class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Int, primary_key=True)
    name = Column(String(32), nullable=False)

class Users(Base):
    __tablename__ = 'users'
    id = Column(Int, primary_key=True)
    name = Column(String(32), nullable=False)
    depart_id = Column(Int, ForeignKey("depart.id"))

    dp = relationship('Depart', backref='us')

#1.查詢用戶名+所在部門
ret = session.query(Users.name, Depart.name).join(Depart).all()
#ret = session.query(Users.name, Depart.name).join(Depart, isouter=True).all()  #表示使用left join

#2.relation字段 查詢用戶名+所在部門
ret = session.query(Users).all()
for row in ret:
    print(ret.name, ret.dp.name)

#3.relation字段 查找某個部門所有員工
ret = session.query(Depart).filter(Depart.name='銷售').first()
for row in ret.us:
    print(row.name)

#4.添加一個IT部門,同時增加一名員工
user = Users(name='xxx', dp.name='IT')    #自動創建IT部門
session.add(user)
session.commit()

#5.添加一個IT部門,同時增加多名員工
dep = Depart(name='IT')
dep.us = [Users(name=xxx), Users(name=xxx)]
session.add(dep)
session.commit()

一對多示例
一對多示例
#創建表
class Server2Group(Base):
    __tablename__ = 'server2group'
    id = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))

    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),     #創建聯合唯一索引
        # Index('ix_id_name', 'name', 'extra'),
    )


class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    # 與生成表結構無關,僅用於查詢方便
    servers = relationship('Server', secondary='server2group', backref='groups')


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
多對多示例

5.兩種連接方式

from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

def task():
    session = Session()    #從連接池中取一個連接
    xxxxx
    session.close()    #把連接返回給連接池

from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
方式一
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = scoped_session(Session)    #這里相當於是基於threading.Local為每個線程創建獨立的內存空間,去連接池中取連接,這個連接只有當前的線程可以使用。

def task():
    xxxxx
    session.remove()

from threading import Thread

for i in range(20):
    t = Thread(target=task)
    t.start()
方式二

6.原生sql

#查詢
cursor = session.execute('select * from xx')
result = cursor.fetchall()

#添加
cursor = session.execute('insert into xxxxx')
session.commit()
方式一
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(xxxx)
result = cursor.fetchall()
cursor.close()
conn.close()
方式二

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM