python 數據庫連接池


數據庫連接池:
from DBUtils.PooledDB import PooledDB
import pymysql

Release_Write_database_setting = {
    "host": '192.168.32.6',
    "port": 31306,
    "user": 'root',
    "password": 'xxx',
    "database": 'tornado',
    "charset": 'utf8',
    "cursorclass": pymysql.cursors.DictCursor
}

Release_Query_database_setting = {
    "host": '192.168.32.8',
    "port": 31306,
    "user": 'root',
    "password": 'xxx',
    "database": 'tornado',
    "charset": 'utf8',
    "cursorclass": pymysql.cursors.DictCursor
}

Development_Write_database_setting = {
    "host" : '192.168.185.4',
    "port" : 31306,
    "user" : 'root',
    "password" : 'xxx',
    "database" :  'tornado20200708quan',
    # "database" :  'tornado',
    "charset" : 'utf8',
    "cursorclass" : pymysql.cursors.DictCursor
}

Development_Query_database_setting = {
    "host" : '192.168.185.5',
    "port" : 31306,
    "user" : 'root',
    "password" : 'xxx',
    "database" :  'tornado20200708quan',
    "charset" : 'utf8',
    "cursorclass" : pymysql.cursors.DictCursor
}
# 發布時更改
Query_database_setting = Development_Query_database_setting
Write_database_setting = Development_Write_database_setting


class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
            return cls._instances[cls]

class DBpool():
    __metaclass__ = Singleton
    _pool = None
    def __init__(self):
        # 密碼、 用戶名相關 等
        self._pool = PooledDB(
            creator=pymysql,
            mincached=1,
            maxcached=20,
            **Query_database_setting
        )
        self._conn = None
        self._cursor = None
        self.getCoon()

    def getCoon(self):
        return self._pool.connection()


    # def getCoon(self):
    #     self._conn = self._pool.connection()
    #     self._cursor = self._conn.cursor()
    #     # return self._pool.connection()

    # def execut(self, sql, param=()):
    #     count = self._cursor.execute(sql, param)
    #     return count

參考 1:普通數據庫連接 對比 數據庫連接池
 

不用數據庫連接池的寫法:

import MySQLdb
conn= MySQLdb.connect(host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
cur=conn.cursor()
SQL="select * from table1"
r=cur.execute(SQL)
r=cur.fetchall()
cur.close()
conn.close()
 

使用 PooledDB

from DBUtils.PooledDB import PooledDB
import pymysql
import xlwt
import os
import threading
import zipfile


class CreateExcel(object):
"""
查詢數據庫,並生成excel 文檔
"""
def __init__(self, mysql_info):
self.mysql_info = mysql_info
self.pool = PooledDB(pymysql, 5, host=self.mysql_info['host'], user=self.mysql_info['user'],
password=self.mysql_info['password'], db=self.mysql_info['db'], port=self.mysql_info['port'],
charset='utf-8')
連接參數定義:

1. mincached,最少的空閑連接數,如果空閑連接數小於這個數,pool會創建一個新的連接
2. maxcached,最大的空閑連接數,如果空閑連接數大於這個數,pool會關閉空閑連接
3. maxconnections,最大的連接數,
4. blocking,當連接數達到最大的連接數時,在請求連接的時候,如果這個值是True,請求連接的程序會一直等待,直到當前連接數小於最大連接數,如果這個值是False,會報錯,
5. maxshared 當連接數達到這個數,新請求的連接會分享已經分配出去的連接
 

這里的 5  應該是默認的連接數吧

在uwsgi中,每個http請求都會分發給一個進程,連接池中配置的連接數都是一個進程為單位的(即上面的最大連接數,都是在一個進程中的連接數),而如果業務中,一個http請求中需要的sql連接數不是很多的話(其實大多數都只需要創建一個連接),配置的連接數配置都不需要太大。
連接池對性能的提升表現在:
1.在程序創建連接的時候,可以從一個空閑的連接中獲取,不需要重新初始化連接,提升獲取連接的速度
2.關閉連接的時候,把連接放回連接池,而不是真正的關閉,所以可以減少頻繁地打開和關閉連接
參考 2 : PooledDB參數解釋
 

版本環境 python 3.7 DBUtils 1.3 mysqlclient 1.4.6 連接池初始化

pool = PooledDB(creator=MySQLdb, mincached=0, maxcached=0,
maxshared=0, maxconnections=0,
blocking=False,maxusage=None,
setsession=None, reset=True,
failures=None, ping=1,
*args, **kwargs)
參數說明 creator

#creator => 任何符合DB-API 2.0規范的函數或者兼容的數據庫模塊
mincached

#mincached => 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
maxcached

#maxcached => 鏈接池中最大閑置的鏈接數(0和None不限制)
maxshared

#maxshared => maximum number of shared connections
(0 or None means all connections are dedicated)
When this maximum number is reached, connections are
shared if they have been requested as shareable
maxconnections

#maxconnections => 允許的最大鏈接數(0或None表示不限制)
blocking

#blocking => 鏈接池沒有可用鏈接后,是否阻塞等待。
True表示阻塞等待,直到獲取到鏈接;
False不等待,拋異常退出
maxusage

#maxusage => 同一個鏈接最多被重復使用的次數(0和None表示無限制)
setsession

#setsession => 可選的會話命令:開始會話前執行的命令列表。
例如["set datestyle to…","set time zone…"]
reset

#reset => 當連接放回池中時,重置連接的方式,默認為True。
False或者None表示使用begin()開啟了事務的鏈接,會執行回滾;
安全起見,建議使用True,當為True時表示所有鏈接都執行回滾操作
failures

#failures => 當默認的(OperationalError,InternalError)異常不能滿足要求時,
可以自定義拋出異常:默認為None;
自定義為傳入的為tuple或者issubclass(failures, Exception)
ping

#ping => 檢查連接是否仍然處於活動狀態的方式
0 = None = never,
1 = default = whenever fetched from the pool,
2 = when a cursor is created,
4 = when a query is executed,
7 = always, and all other bit combinations of these values
args, kwargs

#args, kwargs => 傳遞給creator的參數
使用

# -*- coding: utf-8 -*-
# @Time : 2020/1/26 0026 20:28
# @Email : lofish@foxmail.com(擼小魚)

import MySQLdb
import MySQLdb.cursors
from DBUtils.PooledDB import PooledDB
import datetime


class DbManager(object):

def __init__(self, host, port, db_name, user_name, password):
cmds = ["set names utf8mb4;"]
conn_args = {'host': host,
'port': port,
'db': db_name,
'user': user_name,
'passwd': password,
'charset': 'utf8',
'cursorclass': MySQLdb.cursors.DictCursor
}
# 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建,mincached: 5
# 鏈接池中最大閑置的鏈接數(0和None不限制): 20
self._pool = PooledDB(MySQLdb, mincached=5, maxcached=20, setsession=cmds, **conn_args)

def connection(self):
return self._pool.connection()


_db_manager = None


def create_db_manager(host, port, dbname, username, password):
global _db_manager
if _db_manager is None:
_db_manager = DbManager(host, port, dbname, username, password)
return _db_manager
 

參考 3 : 樣例代碼(采納)
 

python使用dbutils的PooledDB連接池,操作數據庫
1、使用dbutils的PooledDB連接池,操作數據庫。

這樣就不需要每次執行sql后都關閉數據庫連接,頻繁的創建連接,消耗時間

2、如果是使用一個連接一直不關閉,多線程下,插入超長字符串到數據庫,運行一段時間后很容易出現OperationalError: (2006, ‘MySQL server has gone away’)這個錯誤。

使用PooledDB解決。

 

# coding=utf-8
"""
使用DBUtils數據庫連接池中的連接,操作數據庫
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql


class MysqlClient(object):
__pool = None;

def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host='127.0.0.1', port=3306, db='test',
user='root', passwd='123456', charset='utf8mb4'):
"""

:param mincached:連接池中空閑連接的初始數量
:param maxcached:連接池中空閑連接的最大數量
:param maxshared:共享連接的最大數量
:param maxconnections:創建連接池的最大數量
:param blocking:超過最大連接數量時候的表現,為True等待連接數量下降,為false直接報錯處理
:param maxusage:單個連接的最大重復使用次數
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:數據庫ip地址
:param port:數據庫端口
:param db:庫名
:param user:用戶名
:param passwd:密碼
:param charset:字符編碼
"""

if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn = None
self._cursor = None
self.__get_conn()

def __get_conn(self):
self._conn = self.__pool.connection();
self._cursor = self._conn.cursor();

def close(self):
try:
self._cursor.close()
self._conn.close()
except Exception as e:
print e

def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
print count
return count

@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典里面的datatime對象轉成字符串,使json轉換不出錯"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
result_dict.update(result_replace)
return result_dict

def select_one(self, sql, param=()):
"""查詢單個結果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result

def select_many(self, sql, param=()):
"""
查詢多個結果
:param sql: qsl語句
:param param: sql參數
:return: 結果數量和查詢結果集
"""
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result

def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count

def begin(self):
"""開啟事務"""
self._conn.autocommit(0)

def end(self, option='commit'):
"""結束事務"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()


if __name__ == "__main__":
mc = MysqlClient()
sql1 = 'SELECT * FROM shiji WHERE id = 1'
result1 = mc.select_one(sql1)
print json.dumps(result1[1], ensure_ascii=False)

sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
param = (2, 3, 4)
print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)
 

 

參考 4 : 樣例代碼
 

#!/usr/bin/env python
#_*_ coding:utf-8_*_

import tornado.ioloop
import tornado.web
import tornado.escape
import pymssql,pymysql
from DBUtils.PooledDB import PooledDB


class Database:
def __init__(self,*db):
if len(db) == 5:
#mysql
self.host = db[0]
self.port = db[1]
self.user = db[2]
self.pwd = db[3]
self.db = db[4]
else:
#mssql
self.host = db[0]
self.port = None
self.user = db[1]
self.pwd = db[2]
self.db = db[3]
self._CreatePool()

def _CreatePool(self):
if not self.db:
raise NameError + '沒有設置數據庫信息'
if (self.port == None):
self.Pool = PooledDB(creator=pymssql, mincached=2, maxcached=5, maxshared=3, maxconnections=6,
blocking=True, host=self.host, user=self.user, \
password=self.pwd, database=self.db, charset="utf8")
else:
self.Pool = PooledDB(creator=pymysql, mincached=2, maxcached=5, maxshared=3, maxconnections=6,
blocking=True, host=self.host, port=self.port, \
user=self.user, password=self.pwd, database=self.db, charset="utf8")

def _Getconnect(self):
self.conn = self.Pool.connection()
cur = self.conn.cursor()
if not cur:
raise "數據庫連接不上"
else:
return cur
# 查詢sql

def ExecQuery(self, sql):
cur = self._Getconnect()
cur.execute(sql)
relist = cur.fetchall()
cur.close()
self.conn.close()
return relist
# 非查詢的sql

def ExecNoQuery(self, sql):
cur = self._Getconnect()
cur.execute(sql)
self.conn.commit()
cur.close()
self.conn.close()

gdbp = Database
class MainHadle(tornado.web.RequestHandler):
def get(self,*args):
filename = self.get_argument('filename')
print(filename)
self.set_header('Content-Type', 'application/octet-stream')
self.set_header('Content-Disposition', 'attachment; filename=%s'%filename.encode('utf-8'))
with open(filename,'rb') as f:
while True:
data = f.read(1024)
if not data:
break
self.write(data)

class MainIndex(tornado.web.RequestHandler):
def get(self):
self.write('Hello')

class CheckUser(tornado.web.RequestHandler):
def get(self):
user = self.get_argument('user')
pwd = self.get_argument('passwd')
#print(user)

if user != '' and pwd != '':
lssql = "select usr_code,password from sb_user where usr_code= '%s' " % user
#print(lssql)
rds = gdbp.ExecQuery(lssql)
if rds[0][1] == pwd :
js_str = tornado.escape.json_encode('{"result":"true","msg":""}')
self.write(js_str)
else:
js_str = tornado.escape.json_encode('{"result":"false","msg":"用戶或密碼錯誤"}')
self.write(js_str)
#print(rds[0][0])
else:
js_str = tornado.escape.json_encode('{"result":"false","msg":"參數錯誤"}')
self.write(js_str)




def make_app():
return tornado.web.Application([(r"/download",MainHadle),(r"/",MainIndex),(r"/checkuser",CheckUser)])

def make_dbpool():
global gdbp
gdbp = Database('172.20.1.2','sa','xxx','MPL')


if __name__ == '__main__':
app = make_app()
app.listen(8888)
make_dbpool()
tornado.ioloop.IOLoop.current().start()

————————————————
版權聲明:本文為CSDN博主「Lucky@Dong」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/zzddada/article/details/113771780


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM