python數據庫連接池
一丶持久數據庫 (persistent_db)
# 1. dbutils.persistent_db 中的類 PersistentDB使用任何 DB-API2 數據庫模塊
# 2. 實現到數據庫的穩定、線程仿射、持久連接。
# 3. “線程仿射”和“持久”意味着各個數據庫連接保持分配給各自的線程,並且在線程的生命周期內不會關閉

#
1. 每當線程第一次打開數據庫連接時,將打開一個到數據庫的新連接,該連接將從現在開始用於該特定線程
2. 當線程關閉數據庫連接時,它仍然保持打開狀態,以便下次同一個線程請求連接時,可以使用這個已經打開的連接
3. 當線程死亡時,連接將自動關閉
簡而言之:
persistent_db嘗試回收數據庫連接以提高線程應用程序的整體數據庫訪問性能,但它確保線程之間永遠不會共享連接
二丶池化數據庫 (pooled_db)

三丶持久數據庫 (persistent_db)
### persistent_db 的參數
- creator : 返回新的 DB-API 2 連接對象的任意函數或符合 DB-API 2 的數據庫模塊
- maxusage : 單個連接的最大重用次數(默認為0或None表示無限重用)
每當達到限制時,連接將被重置
- setsession : 可用於准備會話的 SQL 命令的可選列表,例如["set datestyle to German", ...]
- failures : 如果默認值(OperationalError,InterfaceError,InternalError)不適用於使用的數據庫模塊,則應應用連接故障轉移機制的可選異常類或異常類元組
- ping : 一個可選標志,用於控制何時使用ping()方法檢查連接,如果這種方法可用(0 =無= 從不,1 = 默認 = 每當請求時, 2 = 創建游標時,4 = 當執行查詢, 7 = 總是,以及這些值的所有其他位組合)
- closeable : 如果設置為 true,則允許關閉連接,但默認情況下,這將被忽略
- threadlocal : 一個可選類,用於表示將使用的線程本地數據,而不是我們的 Python 實現(threading.local 更快,但不能在所有情況下都使用)
### 與本地數據庫mydb的每個連接都被重用 1000 次
import pgdb # import used DB-API 2 module
from dbutils.persistent_db import PersistentDB
persist = PersistentDB(pgdb, 1000, database='mydb')
### 這些參數設置生成器后,您可以請求此類數據庫連接
db = persist.connection()
四丶池化數據庫 (pooled_db)
# pooled_db 的參數
- creator : 返回新的 DB-API 2 連接對象的任意函數或符合 DB-API 2 的數據庫模塊
- mincached : 池中的初始空閑連接數(默認為0表示啟動時不建立連接)
- maxcached : 池中的最大空閑連接數(默認值0或None表示無限池大小)
- maxshared : 允許的最大共享連接數(默認值0或None表示所有連接都是專用的)
當達到此最大數量時,如果連接被請求為可共享,則連接將被共享。
- maxconnections : 一般允許的最大連接數(默認值0或None表示任意數量的連接)
- blocking : 確定超過最大值時的行為
- maxusage : 單個連接的最大重用次數(默認為0或None表示無限重用), 當達到此連接的最大使用次數時,連接會自動重置(關閉並重新打開)
- setsession : 可用於准備會話的 SQL 命令的可選列表,例如["set datestyle to German", ...]
- reset : 返回池時應如何重置連接(False或None回滾以begin() 開始的事務,默認值True出於安全考慮總是發出回滾)
- failures : 如果默認值(OperationalError,InterfaceError,InternalError)不適用於使用的數據庫模塊,則應應用連接故障轉移機制的可選異常類或異常類元組
- ping : 一個可選標志,用於控制何時使用ping()方法檢查連接(如果此類方法可用)(0 =無= 從不,1 = 默認 = 每當從池中獲取時, 2 = 創建游標時,4 = 何時執行查詢, 7 = 總是,以及這些值的所有其他位組合)
### 想要一個與本地數據庫mydb的至少五個連接的數據庫連接池
import pgdb # import used DB-API 2 module
from dbutils.pooled_db import PooledDB
pool = PooledDB(pgdb, 5, database='mydb')
### 設置連接池后,可以從該池請求數據庫連接:
db = pool.connection()
### 設置非零maxshared參數, 默認情況下連接可能會與其他線程共享。如果您想擁有專用連接
db = pool.connection(shareable=False)
↓
db = pool.dedicated_connection() # 專用連接
### 如果您不再需要它,您應該立即使用 db.close() 將其返回到池中。您可以以相同的方式獲得另一個連接
### 連接不是線程安全的,這將過早釋放連接以供重用
db = pool.connection()
cur = db.cursor()
cur.execute(...)
res = cur.fetchone()
cur.close() # or del cur
db.close() # or del db
### 以將上下文管理器用於更簡單的代碼
with pool.connection() as db:
with db.cursor() as cur:
cur.execute(...)
res = cur.fetchone()
五丶 代碼+官網
# 官網
https://webwareforpython.github.io/DBUtils/main.html
### 腳本
# -*- coding: utf-8 -*-
import os
import logging
import pymysql
from dbutils.pooled_db import PooledDB
base_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(
level=30,
filename=os.path.join(base_path, 'log', 'db_process.log'),
filemode='a',
)
class DbTool(object):
"""
# 單例模式 + 數據連接池
"""
mincached = 10 # 連接池種空閑連接的初始數量
maxcached = 20 # 連接池種空閑連接的最大數量
maxshared = 10 # 共享連接的最大數量
maxconnections = 200 # 創建連接池的最大數量
blocking = True # 超過最大連接數量的時. True等待連接數下降,False直接報錯處理
maxusage = 100 # 單個連接的最大重復使用次數
setsession = None #
reset = True #
_pool = None
__instance = None
def __init__(self, db_config):
host = db_config['host']
port = db_config['port']
user = db_config['user']
password = db_config['password']
db = db_config['db']
if not self._pool:
self._pool = PooledDB(
creator=pymysql,
maxconnections=self.maxconnections,
mincached=self.mincached,
maxcached=self.maxcached,
blocking=self.blocking,
maxusage=self.maxusage,
setsession=self.setsession,
host=host,
port=port,
user=user,
password=password,
database=db,
)
logging.info('SUCCESS: create postgresql success.\n')
def __new__(cls, *args, **kwargs):
"""
:param args:
:param kwargs:
"""
if not cls.__instance:
cls.__instance = super(DbTool, cls).__new__(cls)
return cls.__instance
def pg_select_operator(self, sql):
'''
# 查詢 SQL
:param sql: sql語句
:return: sql結果
'''
conn = self._pool.connection()
cursor = conn.cursor()
result = False
try:
cursor.execute(sql)
result = cursor.fetchall()
except Exception as e:
logging.error('ERROR:', e)
finally:
cursor.close()
conn.close()
return result
def pg_update_operator(self, sql):
result = False
conn = self._pool.connection()
cursor = conn.cursor()
try:
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR:', e)
finally:
cursor.close()
conn.commit()
conn.close()
return result
def pg_insert_operator(self, sql):
result = False
conn = self._pool.connection()
cursor = conn.cursor()
try:
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR:', e)
finally:
# 關閉連接錢提交
cursor.close()
conn.commit()
conn.close()
return result
def close_pool(self):
'''
關閉連接池
:return:
'''
if self._pool != None:
self._pool.close()
if __name__ == '__main__':
db_config = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "",
"db": "testdb"
}
obj = DbTool(db_config)
sql = """
INSERT INTO `testdb`.`p_user`(`id`, `user_id`, `user_name`, `user_sex`, `user_phone`, `user_email`, `user_pwd`, `isadmin`, `create_time`, `end_time`, `user_origal`) VALUES (12, '10000001', 'admin_x', '1', '13811111112', 'admin_x@163.com', 'pbkdf2:sha256:150000$EU5aTt0N$555f3c7a0d28c092f8b5c3402f0218fffd51b6fa83bab54fed1670f969898c55', '1', '2021-08-01 11:34:07', NULL, NULL);
"""
print('id:', id(obj))
try:
obj.pg_insert_operator(sql)
except Exception as e:
print(e)