---恢復內容開始---
Python數據庫連接池DBUtils 點我
為什么:因為如果每到一個請求過來就創建一個鏈接這樣效率會很低(建立連接---》通信--》這一環節會很耗時間 網絡延時等) 所以建立連接池
DBUtils是Python的一個用於實現數據庫連接池的模塊。
此連接池有兩種連接模式:
- 模式一:為每個線程創建一個連接,線程即使調用了close方法,也不會關閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接自動關閉。
-
from DBUtils.PersistentDB import PersistentDB
import pymysqlPOOL = PersistentDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那么再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接) threadlocal=None, # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() #這里的關閉沒有真正的關閉了,線程在使用的話還是使用的哪一個鏈接 conn.close() func()
模式二:創建一批連接到連接池,供所有線程共享使用。
PS:由於pymysql、MySQLdb等threadsafety值為1,所以該模式連接池中的線程會被所有線程共享。 -
import pymysql
from DBUtils.PooledDB import PooledDBPYMYSQL_POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, #這個參數沒多大用, 最大可以被大家共享的鏈接 # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db',#鏈接的數據庫的名字 charset='utf8' )
def func(): # 檢測當前正在運行連接數的是否小於最大鏈接數,如果不小於則:等待或報raise TooManyConnections異常 # 否則 # 則優先去初始化時創建的鏈接中獲取鏈接 SteadyDBConnection。 # 然后將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。 # 如果最開始創建的鏈接沒有鏈接,則去創建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。 # 一旦關閉鏈接后,連接就返回到連接池讓后續線程繼續使用。 conn = POOL.connection() # print(th, '鏈接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() #這里的關閉也沒有徹底關閉,而是將線程放回數據庫連接池中去了 func()
目錄結構


在實際工作中的使用鏈接池方法和配置
在sql中的配置 sql.py
import pymysql from settings import Config from flask import current_app #第一一個數據庫連接池的方法的類,用於處理鏈接,查找, 斷開鏈接等功能 #當使用某種方法的時候直接調用即可 class SQLHelper(object): @staticmethod #處理鏈接功能, def open(cursor): #從當前的app中的配置文件中去獲取連接池 POOL = current_app.config["PYMYSQL_POOL"] #鏈接 conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod #處理關閉連接的功能 def close(conn,cursor): conn.commit() cursor.close() conn.close() @classmethod #處理查找一個的功能,定義成類方法, def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor): conn,cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod #處理查找多個的功能 def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor): conn, cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
在settings.py中的配置
from datetime import timedelta from redis import Redis import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection class Config(object): DEBUG = True SECRET_KEY = "umsuldfsdflskjdf" PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST= True SESSION_TYPE = "redis"
#將連接池的全部信息寫入到這里,然后當線程啟動的時候可以從線程的配置文件中獲取到這個連接池的信息 並將連接池寫入到sql中去
PYMYSQL_POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db',#鏈接的數據庫的名字 charset='utf8' ) class ProductionConfig(Config): SESSION_REDIS = Redis(host='192.168.0.94', port='6379') class DevelopmentConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class TestingConfig(Config): pass
manage.py
from s8pro_flask import create_app app = create_app() if __name__ == '__main__': app.__call__ app.run()
init.py
from flask import Flask from flask_session import Session from .views import account from .views import home def create_app(): app = Flask(__name__) app.config.from_object('settings.DevelopmentConfig') app.register_blueprint(account.account) app.register_blueprint(home.home) # 將session替換成redis session Session(app) return app
account.py 使用連接池進行查詢等登錄 查看頁面的應用
from flask import Blueprint,render_template,request,session,redirect,g,current_app from ..utils.sql import SQLHelper account = Blueprint('account',__name__) from wtforms import Form from wtforms.fields import simple,core,html5 from wtforms import validators from wtforms import widgets class LoginForm(Form): user = simple.StringField( validators=[ validators.DataRequired(message='用戶名不能為空.'), # validators.Length(min=6, max=18, message='用戶名長度必須大於%(min)d且小於%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( validators=[ validators.DataRequired(message='密碼不能為空.'), # validators.Length(min=8, message='用戶名長度必須大於%(min)d'), # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", # message='密碼至少8個字符,至少1個大寫字母,1個小寫字母,1個數字和1個特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @account.route('/login',methods=['GET',"POST"]) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html',form=form) form = LoginForm(formdata=request.form) if not form.validate(): return render_template('login.html', form=form) obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data) if obj: session.permanent = True session['user_info'] = {'id':obj['id'], 'name':obj['name']} return redirect('/index') else: return render_template('login.html',msg='用戶名或密碼錯誤',form=form) class RegisterForm(Form): name = simple.StringField( label='用戶名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密碼', validators=[ validators.DataRequired(message='密碼不能為空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重復密碼', validators=[ validators.DataRequired(message='重復密碼不能為空.'), validators.EqualTo('pwd', message="兩次密碼輸入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='郵箱', validators=[ validators.DataRequired(message='郵箱不能為空.'), validators.Email(message='郵箱格式錯誤') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性別', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=SQLHelper.fetch_all('select id,name from city',{},None), # choices=( # (1, '籃球'), # (2, '足球'), # ), coerce=int ) hobby = core.SelectMultipleField( label='愛好', choices=( (1, '籃球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '籃球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.city.choices = SQLHelper.fetch_all('select id,name from city',{},None) def validate_name(self, field): """ 自定義pwd_confirm字段規則,例:與pwd字段是否一致 :param field: :return: """ # 最開始初始化時,self.data中已經有所有的值 # print(field.data) # 當前name傳過來的值 # print(self.data) # 當前傳過來的所有的值:name,gender..... obj = SQLHelper.fetch_one('select id from users where name=%s',[field.data,]) if obj: raise validators.ValidationError("用戶名已經存在") # 繼續后續驗證 # raise validators.StopValidation("用戶名已經存在") # 不再繼續后續驗證 @account.route('/register',methods=['GET','POST']) def register(): if request.method == 'GET': form = RegisterForm() return render_template('register.html',form=form) form = RegisterForm(formdata=request.form) if form.validate(): print(form.data) else: print(form.errors) return "sdfasdfasdf"
如果沒有連接池,使用pymysql來連接數據庫時,單線程應用完全沒有問題,但如果涉及到多線程應用那么就需要加鎖,一旦加鎖那么連接勢必就會排隊等待,當請求比較多時,性能就會降低了。
加鎖
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql import threading from threading import RLock LOCK = RLock() CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') def task(arg): with LOCK: cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
無鎖(報錯)
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql import threading CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') def task(arg): cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
---恢復內容結束---
