---恢复内容开始---
Python数据库连接池DBUtils 点我

为什么:因为如果每到一个请求过来就创建一个链接这样效率会很低(建立连接---》通信--》这一环节会很耗时间 网络延时等) 所以建立连接池
DBUtils是Python的一个用于实现数据库连接池的模块。
此连接池有两种连接模式:
- 模式一:为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。
-
from DBUtils.PersistentDB import PersistentDB
import pymysqlPOOL = PersistentDB( creator=pymysql, # 使用链接数据库的模块 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接) threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置 host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8' ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() #这里的关闭没有真正的关闭了,线程在使用的话还是使用的哪一个链接 conn.close() func()
模式二:创建一批连接到连接池,供所有线程共享使用。
PS:由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。 -
import pymysql
from DBUtils.PooledDB import PooledDBPYMYSQL_POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, #这个参数没多大用, 最大可以被大家共享的链接 # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db',#链接的数据库的名字 charset='utf8' )
def func(): # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常 # 否则 # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。 # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。 # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。 # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。 conn = POOL.connection() # print(th, '链接被拿走了', conn1._con) # print(th, '池子里目前有', pool._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() conn.close() #这里的关闭也没有彻底关闭,而是将线程放回数据库连接池中去了 func()
目录结构
在实际工作中的使用链接池方法和配置
在sql中的配置 sql.py
import pymysql from settings import Config from flask import current_app #第一一个数据库连接池的方法的类,用于处理链接,查找, 断开链接等功能 #当使用某种方法的时候直接调用即可 class SQLHelper(object): @staticmethod #处理链接功能, def open(cursor): #从当前的app中的配置文件中去获取连接池 POOL = current_app.config["PYMYSQL_POOL"] #链接 conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod #处理关闭连接的功能 def close(conn,cursor): conn.commit() cursor.close() conn.close() @classmethod #处理查找一个的功能,定义成类方法, def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor): conn,cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod #处理查找多个的功能 def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor): conn, cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
在settings.py中的配置
from datetime import timedelta from redis import Redis import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection class Config(object): DEBUG = True SECRET_KEY = "umsuldfsdflskjdf" PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST= True SESSION_TYPE = "redis"
#将连接池的全部信息写入到这里,然后当线程启动的时候可以从线程的配置文件中获取到这个连接池的信息 并将连接池写入到sql中去
PYMYSQL_POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db',#链接的数据库的名字 charset='utf8' ) class ProductionConfig(Config): SESSION_REDIS = Redis(host='192.168.0.94', port='6379') class DevelopmentConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class TestingConfig(Config): pass
manage.py
from s8pro_flask import create_app app = create_app() if __name__ == '__main__': app.__call__ app.run()
init.py
from flask import Flask from flask_session import Session from .views import account from .views import home def create_app(): app = Flask(__name__) app.config.from_object('settings.DevelopmentConfig') app.register_blueprint(account.account) app.register_blueprint(home.home) # 将session替换成redis session Session(app) return app
account.py 使用连接池进行查询等登录 查看页面的应用
from flask import Blueprint,render_template,request,session,redirect,g,current_app from ..utils.sql import SQLHelper account = Blueprint('account',__name__) from wtforms import Form from wtforms.fields import simple,core,html5 from wtforms import validators from wtforms import widgets class LoginForm(Form): user = simple.StringField( validators=[ validators.DataRequired(message='用户名不能为空.'), # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), render_kw={'class': 'form-control'} ) pwd = simple.PasswordField( validators=[ validators.DataRequired(message='密码不能为空.'), # validators.Length(min=8, message='用户名长度必须大于%(min)d'), # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", # message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @account.route('/login',methods=['GET',"POST"]) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html',form=form) form = LoginForm(formdata=request.form) if not form.validate(): return render_template('login.html', form=form) obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data) if obj: session.permanent = True session['user_info'] = {'id':obj['id'], 'name':obj['name']} return redirect('/index') else: return render_template('login.html',msg='用户名或密码错误',form=form) class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int ) city = core.SelectField( label='城市', choices=SQLHelper.fetch_all('select id,name from city',{},None), # choices=( # (1, '篮球'), # (2, '足球'), # ), coerce=int ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.city.choices = SQLHelper.fetch_all('select id,name from city',{},None) def validate_name(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 # print(field.data) # 当前name传过来的值 # print(self.data) # 当前传过来的所有的值:name,gender..... obj = SQLHelper.fetch_one('select id from users where name=%s',[field.data,]) if obj: raise validators.ValidationError("用户名已经存在") # 继续后续验证 # raise validators.StopValidation("用户名已经存在") # 不再继续后续验证 @account.route('/register',methods=['GET','POST']) def register(): if request.method == 'GET': form = RegisterForm() return render_template('register.html',form=form) form = RegisterForm(formdata=request.form) if form.validate(): print(form.data) else: print(form.errors) return "sdfasdfasdf"
如果没有连接池,使用pymysql来连接数据库时,单线程应用完全没有问题,但如果涉及到多线程应用那么就需要加锁,一旦加锁那么连接势必就会排队等待,当请求比较多时,性能就会降低了。
加锁

#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql import threading from threading import RLock LOCK = RLock() CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') def task(arg): with LOCK: cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
无锁(报错)

#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql import threading CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', database='pooldb', charset='utf8') def task(arg): cursor = CONN.cursor() cursor.execute('select * from tb1') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
---恢复内容结束---