前言
在搭建測試框架過程中,會遇到需要頻繁操作數據庫的情況,會用到pymysql進行數據庫的操作,當操作的連接數過多時,會出現斷連的情況。以下代碼是借鑒其他大佬的代碼,忘記是哪位大佬的代碼,后續看到再補上鏈接。在此致謝!
代碼部分
1、封裝鏈接池部分
from timeit import default_timer
import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor
class DB_MySQL_Pool:
"""db連接池"""
__pool = None
__MAX_CONNECTIONS = 100 # 創建連接池的最大數量
__MIN_CACHED = 10 # 連接池中空閑連接的初始數量
__MAX_CACHED = 20 # 連接池中空閑連接的最大數量
__MAX_SHARED = 10 # 共享連接的最大數量
__BLOCK = True # 超過最大連接數量時候的表現,為True等待連接數量下降,為false直接報錯處理
__MAX_USAGE = 100 # 單個連接的最大重復使用次數
__CHARSET = 'UTF8'
'''
set_session:可選的SQL命令列表,可用於准備
會話,例如[“將日期樣式設置為...”,“設置時區...”]
重置:當連接返回池中時,應該如何重置連接
(False或None表示回滾以begin()開始的事務,
為安全起見,始終發出回滾命令)
'''
__RESET = True
__SET_SESSION = ['SET AUTOCOMMIT = 1'] # 設置自動提交
def __init__(self, host, port, user, password, database):
if not self.__pool:
self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password,
database=database,
maxconnections=self.__MAX_CONNECTIONS,
mincached=self.__MIN_CACHED,
maxcached=self.__MAX_CACHED,
maxshared=self.__MAX_SHARED,
blocking=self.__BLOCK,
maxusage=self.__MAX_USAGE,
setsession=self.__SET_SESSION,
reset=self.__RESET,
charset=self.__CHARSET)
def get_connect(self):
return self.__pool.connection()
2、封裝pymysql操作
class DB_MySQL:
def __init__(self) -> None:
self.__host = "自己的host"
self.__port = 3306
self.__user = "自己的user"
self.__password = "自己的密碼"
self.__database = "自己的數據庫"
self._log_time = True
self._log_label = "總用時"
self.connects_pool = DB_MySQL_Pool(
host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)
def __enter__(self):
# 如果需要記錄時間
if self._log_time is True:
self._start = default_timer()
connect = self.connects_pool.get_connect()
cursor = connect.cursor(pymysql.cursors.DictCursor)
# https://blog.51cto.com/abyss/1736844
# connect.autocommit = False # 如果使用連接池 則不能在取出后設置 而應該在創建線程池時設置
self._connect = connect
self._cursor = cursor
return self
def __exit__(self, *exc_info):
self._connect.commit()
self._cursor.close()
self._connect.close()
if self._log_time is True:
diff = default_timer() - self._start
print('-- %s: %.6f 秒' % (self._log_label, diff))
def select_all(self, sql):
"""查詢返回全部結果"""
self._cursor.execute(sql)
return self._cursor.fetchall()
def select_one(self, sql):
"""查詢返回單個結果"""
self._cursor.execute(sql)
return self._cursor.fetchone()
def insert(self, sql):
"""插入數據"""
res = self._cursor.execute(sql)
return res
3、完整代碼
from timeit import default_timer
import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor
class DB_MySQL_Pool:
"""db連接池"""
__pool = None
__MAX_CONNECTIONS = 100 # 創建連接池的最大數量
__MIN_CACHED = 10 # 連接池中空閑連接的初始數量
__MAX_CACHED = 20 # 連接池中空閑連接的最大數量
__MAX_SHARED = 10 # 共享連接的最大數量
__BLOCK = True # 超過最大連接數量時候的表現,為True等待連接數量下降,為false直接報錯處理
__MAX_USAGE = 100 # 單個連接的最大重復使用次數
__CHARSET = 'UTF8'
'''
set_session:可選的SQL命令列表,可用於准備
會話,例如[“將日期樣式設置為...”,“設置時區...”]
重置:當連接返回池中時,應該如何重置連接
(False或None表示回滾以begin()開始的事務,
為安全起見,始終發出回滾命令)
'''
__RESET = True
__SET_SESSION = ['SET AUTOCOMMIT = 1'] # 設置自動提交
def __init__(self, host, port, user, password, database):
if not self.__pool:
self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password,
database=database,
maxconnections=self.__MAX_CONNECTIONS,
mincached=self.__MIN_CACHED,
maxcached=self.__MAX_CACHED,
maxshared=self.__MAX_SHARED,
blocking=self.__BLOCK,
maxusage=self.__MAX_USAGE,
setsession=self.__SET_SESSION,
reset=self.__RESET,
charset=self.__CHARSET)
def get_connect(self):
return self.__pool.connection()
class DB_MySQL:
def __init__(self) -> None:
self.__host = "自己的host"
self.__port = 3306
self.__user = "自己的user"
self.__password = "自己的密碼"
self.__database = "自己的數據庫"
self._log_time = True
self._log_label = "總用時"
self.connects_pool = DB_MySQL_Pool(
host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)
def __enter__(self):
# 如果需要記錄時間
if self._log_time is True:
self._start = default_timer()
connect = self.connects_pool.get_connect()
cursor = connect.cursor(pymysql.cursors.DictCursor)
# https://blog.51cto.com/abyss/1736844
# connect.autocommit = False # 如果使用連接池 則不能在取出后設置 而應該在創建線程池時設置
self._connect = connect
self._cursor = cursor
return self
def __exit__(self, *exc_info):
self._connect.commit()
self._cursor.close()
self._connect.close()
if self._log_time is True:
diff = default_timer() - self._start
print('-- %s: %.6f 秒' % (self._log_label, diff))
def select_all(self, sql):
"""查詢返回全部結果"""
self._cursor.execute(sql)
return self._cursor.fetchall()
def select_one(self, sql):
"""查詢返回單個結果"""
self._cursor.execute(sql)
return self._cursor.fetchone()
def insert(self, sql):
"""插入數據"""
res = self._cursor.execute(sql)
return res
if __name__ == '__main__':
with DB_MySQL() as db:
print(db.select_one("""select * from banner"""))
