Python 使用DBUtils 創建連接池解決多線程中連接丟失的問題


轉自:Python 使用 PyMysql、DBUtils 創建連接池,提升性能 和 python多線程操作數據庫問題

python多線程並發操作數據庫,會存在鏈接數據庫超時、數據庫連接丟失、數據庫操作超時等問題。

解決方法:使用數據庫連接池,並且每次操作都從數據庫連接池獲取數據庫操作句柄,操作完關閉連接返回數據庫連接池。

*連接數據庫需要設置charset = 'utf8', use_unicode = True,不然會報中文亂碼問題

*網上說解決python多線程並發操作數據庫問題,連接時使用self.conn.ping(True)(檢查並保持長連接),但是我這邊親測無法解決,建議還是使用數據庫連接池

解決方案:DBUtils

Python 編程中可以使用 PyMysql 進行數據庫的連接及諸如查詢/插入/更新等操作,但是每次連接 MySQL 數據庫請求時,都是獨立的去請求訪問,相當浪費資源,而且訪問數量達到一定數量時,對 mysql 的性能會產生較大的影響。因此,實際使用中,通常會使用數據庫的連接池技術,來訪問數據庫達到資源復用的目的。
DBUtils 是一套 Python 數據庫連接池包,並允許對非線程安全的數據庫接口進行線程安全包裝。DBUtils 來自 Webware for Python 。

原理:

在程序創建連接的時候,可以從一個空閑的連接中獲取,不需要重新初始化連接,提升獲取連接的速度
關閉連接的時候,把連接放回連接池,而不是真正的關閉,所以可以減少頻繁地打開和關閉連接

python多線程代碼:

import threading
 
class MyThread(threading.Thread):
 
    def __init__(self, name, count, exec_object):
        threading.Thread.__init__(self)
        self.name = name
        self.count = count
        self.exec_object = exec_object
 
    def run(self):
        while self.count >= 0:
            count = count - 1
            self.exec_object.execFunc(count)
 
thread1 = MyThread('MyThread1', 3, ExecObject())
thread2 = MyThread('MyThread2', 5, ExecObject())
thread1.start()
thread2.start()
thread1.join() # join方法 執行完thread1的方法才繼續主線程
thread2.join() # join方法 執行完thread2的方法才繼續主線程
# 執行順序 並發執行thread1 thread2,thread1和thread2執行完成才繼續執行主線程
 
# ExecObject類是自定義數據庫操作的業務邏輯類
# 
 
########join方法詳解########
thread1 = MyThread('MyThread1', 3, ExecObject())
thread2 = MyThread('MyThread2', 5, ExecObject())
thread1.start()
thread1.join() # join方法 執行完thread1的方法才繼續主線程
thread2.start()
thread2.join() # join方法 執行完thread2的方法才繼續主線程
# 執行順序 先執行thread1,執行完thread1再執行thread2,執行完thread2才繼續執行主線程

mysql數據庫連接池代碼:

import MySQLdb
from DBUtils.PooledDB import PooledDB
 
class MySQL:
 
    host = 'localhost'
    user = 'root'
    port = 3306
    pasword = ''
    db = 'testDB'
    charset = 'utf8'
 
    pool = None
    limit_count = 3 # 最低預啟動數據庫連接數量
 
    def __init__(self):
        self.pool = PooledDB(MySQLdb, self.limit_count, host = self.host, user = self.user, passwd = self.pasword, db = self.db,
            port = self.port, charset = self.charset, use_unicode = True)
 
    def select(self, sql):
        conn = self.pool.connection()
        cursor = conn.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
        cursor.close()
        conn.close()
        return result
 
    def insert(self, table, sql):
        conn = self.pool.connection()
        cursor = conn.cursor()
        try:
            cursor.execute(sql)
            conn.commit()
            return {'result':True, 'id':int(cursor.lastrowid)}
        except Exception as err:
            conn.rollback()
            return {'result':False, 'err':err}
        finally:
            cursor.close()
            conn.close()
 

精簡版的連接池例子

import pymysql
from DBUtils.PooledDB import PooledDB

pool = PooledDB(pymysql,5,host='ip',user='user',passwd='passwd',db='db',port=3306,setsession=['SET AUTOCOMMIT = 1']) # 5為連接池里的最少連接數,setsession=['SET AUTOCOMMIT = 1']是用來設置線程池是否打開自動更新的配置,0為False,1為True
conn = pool.connection() #以后每次需要數據庫連接就是用connection()函數獲取連接就好了
cur=conn.cursor()
SQL="select * from table"
count=cur.execute(SQL)
results=cur.fetchall()
cur.close()
conn.close()

PooledDB 的參數:

POOL = PooledDB(
creator=pymysql, # 使用鏈接數據庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制
maxshared=1, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制
setsession=[], # 開始會話前執行的命令列表。如:[“set datestyle to …”, “set time zone …”]
ping=0,
# ping MySQL服務端,檢查是否服務可用。
# 如:0 = None = never,
# 1 = default = whenever it is requested,
# 2 = when a cursor is created,
# 4 = when a query is executed,
# 7 = always
host=‘127.0.0.1’,
port=3306,
user=‘root’,
password=’’,
database=‘ziji’,
charset=‘utf8’
)

在 uwsgi 中,每個 http 請求都會分發給一個進程,連接池中配置的連接數都是一個進程為單位的(即上面的最大連接數,都是在一個進程中的連接數),而如果業務中,一個 http 請求中需要的 sql 連接數不是很多的話(其實大多數都只需要創建一個連接),配置的連接數配置都不需要太大。

連接池對性能的提升表現在:

在程序創建連接的時候,可以從一個空閑的連接中獲取,不需要重新初始化連接,提升獲取連接的速度
關閉連接的時候,把連接放回連接池,而不是真正的關閉,所以可以減少頻繁地打開和關閉連接

可能遇到的問題

python3 安裝 第三方庫DBUtils安裝成功 項目里卻import不了的解決方案

默認使用pip 下載的DBUtils是2.0版本的,但是python 3.0版本不能適配DBUtils 2.0 。所以解決方案是先卸載2.0的BUtils,然后pip 低版本的BUtils 。例如我pip install DBUtils==1.3 安裝之后就好了。

 python3 mysql錯誤 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')

這個問題如果是在單線程中出現的,那可能是MySQL持久化鏈接保持時間為8小時(28800秒),過期后斷開連。如果數據庫沒有新建連接,則會報此錯。解決思路是調用前判斷連接是否有效,如果有效繼續,無效創建連接。

參考:python3 mysql錯誤 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')

AttributeError: 'NoneType' object has no attribute 'read' 

推測問題是,多線程操作數據庫連接的時候,相互交叉釋放了其他線程的連接。

解決方案是在獲取數據庫連接時加鎖,查詢完畢后釋放鎖
import threading
 
lock = threading.Lock()

lock.acquire()
conn = pool.getConn()
cur = conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
lock.release()

轉自:Python 使用 PyMysql、DBUtils 創建連接池,提升性能 和 python多線程操作數據庫問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM