轉自:Python 使用 PyMysql、DBUtils 創建連接池,提升性能 和 python多線程操作數據庫問題
python多線程並發操作數據庫,會存在鏈接數據庫超時、數據庫連接丟失、數據庫操作超時等問題。
解決方法:使用數據庫連接池,並且每次操作都從數據庫連接池獲取數據庫操作句柄,操作完關閉連接返回數據庫連接池。
*連接數據庫需要設置charset = 'utf8', use_unicode = True,不然會報中文亂碼問題
*網上說解決python多線程並發操作數據庫問題,連接時使用self.conn.ping(True)(檢查並保持長連接),但是我這邊親測無法解決,建議還是使用數據庫連接池
解決方案:DBUtils
Python 編程中可以使用 PyMysql 進行數據庫的連接及諸如查詢/插入/更新等操作,但是每次連接 MySQL 數據庫請求時,都是獨立的去請求訪問,相當浪費資源,而且訪問數量達到一定數量時,對 mysql 的性能會產生較大的影響。因此,實際使用中,通常會使用數據庫的連接池技術,來訪問數據庫達到資源復用的目的。
DBUtils 是一套 Python 數據庫連接池包,並允許對非線程安全的數據庫接口進行線程安全包裝。DBUtils 來自 Webware for Python 。
原理:
在程序創建連接的時候,可以從一個空閑的連接中獲取,不需要重新初始化連接,提升獲取連接的速度
關閉連接的時候,把連接放回連接池,而不是真正的關閉,所以可以減少頻繁地打開和關閉連接
python多線程代碼:
import threading class MyThread(threading.Thread): def __init__(self, name, count, exec_object): threading.Thread.__init__(self) self.name = name self.count = count self.exec_object = exec_object def run(self): while self.count >= 0: count = count - 1 self.exec_object.execFunc(count) thread1 = MyThread('MyThread1', 3, ExecObject()) thread2 = MyThread('MyThread2', 5, ExecObject()) thread1.start() thread2.start() thread1.join() # join方法 執行完thread1的方法才繼續主線程 thread2.join() # join方法 執行完thread2的方法才繼續主線程 # 執行順序 並發執行thread1 thread2,thread1和thread2執行完成才繼續執行主線程 # ExecObject類是自定義數據庫操作的業務邏輯類 # ########join方法詳解######## thread1 = MyThread('MyThread1', 3, ExecObject()) thread2 = MyThread('MyThread2', 5, ExecObject()) thread1.start() thread1.join() # join方法 執行完thread1的方法才繼續主線程 thread2.start() thread2.join() # join方法 執行完thread2的方法才繼續主線程 # 執行順序 先執行thread1,執行完thread1再執行thread2,執行完thread2才繼續執行主線程
mysql數據庫連接池代碼:
import MySQLdb from DBUtils.PooledDB import PooledDB class MySQL: host = 'localhost' user = 'root' port = 3306 pasword = '' db = 'testDB' charset = 'utf8' pool = None limit_count = 3 # 最低預啟動數據庫連接數量 def __init__(self): self.pool = PooledDB(MySQLdb, self.limit_count, host = self.host, user = self.user, passwd = self.pasword, db = self.db, port = self.port, charset = self.charset, use_unicode = True) def select(self, sql): conn = self.pool.connection() cursor = conn.cursor() cursor.execute(sql) result = cursor.fetchall() cursor.close() conn.close() return result def insert(self, table, sql): conn = self.pool.connection() cursor = conn.cursor() try: cursor.execute(sql) conn.commit() return {'result':True, 'id':int(cursor.lastrowid)} except Exception as err: conn.rollback() return {'result':False, 'err':err} finally: cursor.close() conn.close()
精簡版的連接池例子
import pymysql from DBUtils.PooledDB import PooledDB pool = PooledDB(pymysql,5,host='ip',user='user',passwd='passwd',db='db',port=3306,setsession=['SET AUTOCOMMIT = 1']) # 5為連接池里的最少連接數,setsession=['SET AUTOCOMMIT = 1']是用來設置線程池是否打開自動更新的配置,0為False,1為True conn = pool.connection() #以后每次需要數據庫連接就是用connection()函數獲取連接就好了 cur=conn.cursor() SQL="select * from table" count=cur.execute(SQL) results=cur.fetchall() cur.close() conn.close()
PooledDB 的參數:
POOL = PooledDB(
creator=pymysql, # 使用鏈接數據庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制
maxshared=1, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制
setsession=[], # 開始會話前執行的命令列表。如:[“set datestyle to …”, “set time zone …”]
ping=0,
# ping MySQL服務端,檢查是否服務可用。
# 如:0 = None = never,
# 1 = default = whenever it is requested,
# 2 = when a cursor is created,
# 4 = when a query is executed,
# 7 = always
host=‘127.0.0.1’,
port=3306,
user=‘root’,
password=’’,
database=‘ziji’,
charset=‘utf8’
)
在 uwsgi 中,每個 http 請求都會分發給一個進程,連接池中配置的連接數都是一個進程為單位的(即上面的最大連接數,都是在一個進程中的連接數),而如果業務中,一個 http 請求中需要的 sql 連接數不是很多的話(其實大多數都只需要創建一個連接),配置的連接數配置都不需要太大。
連接池對性能的提升表現在:
在程序創建連接的時候,可以從一個空閑的連接中獲取,不需要重新初始化連接,提升獲取連接的速度
關閉連接的時候,把連接放回連接池,而不是真正的關閉,所以可以減少頻繁地打開和關閉連接
可能遇到的問題
python3 安裝 第三方庫DBUtils安裝成功 項目里卻import不了的解決方案
python3 mysql錯誤 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
參考:python3 mysql錯誤 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
AttributeError: 'NoneType' object has no attribute 'read'
推測問題是,多線程操作數據庫連接的時候,相互交叉釋放了其他線程的連接。
import threading lock = threading.Lock() lock.acquire() conn = pool.getConn() cur = conn.cursor() cur.execute(sql) rows = cur.fetchall() lock.release()