Flask請求擴展和數據庫連接池


1.1.Flask之請求擴展

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, Request, render_template

app = Flask(__name__, template_folder='templates')
app.debug = True


@app.before_first_request
def before_first_request1():
    print('before_first_request1')


@app.before_first_request
def before_first_request2():
    print('before_first_request2')


@app.before_request
def before_request1():
    Request.nnn = 123
    print('before_request1')


@app.before_request
def before_request2():
    print('before_request2')


@app.after_request
def after_request1(response):
    print('before_request1', response)
    return response


@app.after_request
def after_request2(response):
    print('before_request2', response)
    return response


@app.errorhandler(404)
def page_not_found(error):
    return 'This page does not exist', 404


@app.template_global()
def sb(a1, a2):
    return a1 + a2


@app.template_filter()
def db(a1, a2, a3):
    return a1 + a2 + a3


@app.route('/')
def hello_world():
    return render_template('hello.html')


if __name__ == '__main__':
    app.run()
所有請求擴展

(1)基於before_request 做用戶登錄認證

@app.before_request
def process_request(*args,**kwargs):
    #登錄頁面不需要驗證
    if request.path == '/login':
        return None
    user = session.get('user_info')
    if user:
        return None
    return redirect('/login')

 

(2)before_request和after_request執行順序

from flask import Flask,render_template,request,redirect,session,url_for

app = Flask(__name__)
app.debug = True
app.secret_key = 'abcdef'



@app.before_request
def process_request1(*args,**kwargs):
    print('request1進來')

@app.before_request
def process_request2(*args,**kwargs):
    print('request2進來')

@app.after_request
def process_response1(response):
    print('response1走了')
    return response

@app.after_request
def process_response2(response):
    print('response2走了')
    return response

@app.route('/index',methods=['GET'])
def index():
    print('index函數')
    return 'hello'

if __name__ == '__main__':
    app.run()

運行結果:

 

 可以看出:

  • before_request是從上往下的執行順序(先1后2)
  • after_response是從下往上的執行順序(先2后1)

 

(3)請求攔截后,response所有都執行

在process_request 添加一個return

@app.before_request
def process_request1(*args,**kwargs):
    print('request1進來')
    return "攔截"

再運行結果如下:

 

 (4)定制錯誤信息

當訪問不存在的url,可以自己定制錯誤信息頁面

@app.errorhandler(404)
def error_404(arg):
    return '404錯誤'

 

 

1.2.數據庫連接池

DBUtils是Python的一個用於實現數據庫連接池的模塊。

安裝

進官網下載https://pypi.org/project/DBUtils/1.2/,然后安裝:

連接池有兩種方式

(1)模式一

為每個線程創建連接,線程即使調用了close方法,也不會關閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接自動關閉。

POOL = PersistentDB(
    creator=pymysql,  # 使用鏈接數據庫的模塊
    maxusage=None,  # 一個鏈接最多被重復使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接)
    threadlocal=None,  # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()
View Code

(2)模式二,推薦使用的方式

創建多個連接,多線程來時,去獲取,

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用鏈接數據庫的模塊
    maxconnections=6,  # 連接池允許的最大連接數,0和None表示不限制連接數
    mincached=2,  # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
    maxcached=5,  # 鏈接池中最多閑置的鏈接,0和None不限制
    maxshared=3,  # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
    blocking=True,  # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
    maxusage=None,  # 一個鏈接最多被重復使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)


def func():
    # 檢測當前正在運行連接數的是否小於最大鏈接數,如果不小於則:等待或報raise TooManyConnections異常
    # 否則
    # 則優先去初始化時創建的鏈接中獲取鏈接 SteadyDBConnection。
    # 然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。
    # 如果最開始創建的鏈接沒有鏈接,則去創建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。
    # 一旦關閉鏈接后,連接就返回到連接池讓后續線程繼續使用。
    conn = POOL.connection()

    # print(th, '鏈接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()


func()

 

如果沒有連接池,使用pymysql來連接數據庫時,單線程應用完全沒有問題,但如果涉及到多線程應用那么就需要加鎖,一旦加鎖那么連接勢必就會排隊等待,當請求比較多時,性能就會降低了。

import pymysql
import threading
from threading import RLock

LOCK = RLock()
CONN = pymysql.connect(host='127.0.0.1',
                       port=3306,
                       user='root',
                       password='123',
                       database='pooldb',
                       charset='utf8')


def task(arg):
    with LOCK:
        cursor = CONN.cursor()
        cursor.execute('select * from tb1')
        result = cursor.fetchall()
        cursor.close()

        print(result)


for i in range(10):
    t = threading.Thread(target=task, args=(i,))
    t.start()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM