python - pymysql封裝


前言

在搭建測試框架過程中,會遇到需要頻繁操作數據庫的情況,會用到pymysql進行數據庫的操作,當操作的連接數過多時,會出現斷連的情況。以下代碼是借鑒其他大佬的代碼,忘記是哪位大佬的代碼,后續看到再補上鏈接。在此致謝!

代碼部分

1、封裝鏈接池部分

from timeit import default_timer

import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor


class DB_MySQL_Pool:
    """db連接池"""
    __pool = None
    __MAX_CONNECTIONS = 100  # 創建連接池的最大數量
    __MIN_CACHED = 10  # 連接池中空閑連接的初始數量
    __MAX_CACHED = 20  # 連接池中空閑連接的最大數量
    __MAX_SHARED = 10  # 共享連接的最大數量
    __BLOCK = True  # 超過最大連接數量時候的表現,為True等待連接數量下降,為false直接報錯處理
    __MAX_USAGE = 100  # 單個連接的最大重復使用次數
    __CHARSET = 'UTF8'
    '''
        set_session:可選的SQL命令列表,可用於准備
                    會話,例如[“將日期樣式設置為...”,“設置時區...”]
                    重置:當連接返回池中時,應該如何重置連接
                    (False或None表示回滾以begin()開始的事務,
                    為安全起見,始終發出回滾命令)
    '''
    __RESET = True
    __SET_SESSION = ['SET AUTOCOMMIT = 1']  # 設置自動提交

    def __init__(self, host, port, user, password, database):
        if not self.__pool:
            self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password,
                                             database=database,
                                             maxconnections=self.__MAX_CONNECTIONS,
                                             mincached=self.__MIN_CACHED,
                                             maxcached=self.__MAX_CACHED,
                                             maxshared=self.__MAX_SHARED,
                                             blocking=self.__BLOCK,
                                             maxusage=self.__MAX_USAGE,
                                             setsession=self.__SET_SESSION,
                                             reset=self.__RESET,
                                             charset=self.__CHARSET)

    def get_connect(self):
        return self.__pool.connection()

2、封裝pymysql操作

class DB_MySQL:
    def __init__(self) -> None:
        self.__host = "自己的host"
        self.__port = 3306
        self.__user = "自己的user"
        self.__password = "自己的密碼"
        self.__database = "自己的數據庫"
        self._log_time = True
        self._log_label = "總用時"
        self.connects_pool = DB_MySQL_Pool(
            host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)

    def __enter__(self):
        # 如果需要記錄時間
        if self._log_time is True:
            self._start = default_timer()

        connect = self.connects_pool.get_connect()
        cursor = connect.cursor(pymysql.cursors.DictCursor)
        # https://blog.51cto.com/abyss/1736844
        # connect.autocommit = False # 如果使用連接池 則不能在取出后設置 而應該在創建線程池時設置
        self._connect = connect
        self._cursor = cursor
        return self

    def __exit__(self, *exc_info):
        self._connect.commit()
        self._cursor.close()
        self._connect.close()

        if self._log_time is True:
            diff = default_timer() - self._start
            print('-- %s: %.6f 秒' % (self._log_label, diff))

    def select_all(self, sql):
        """查詢返回全部結果"""
        self._cursor.execute(sql)
        return self._cursor.fetchall()

    def select_one(self, sql):
        """查詢返回單個結果"""
        self._cursor.execute(sql)
        return self._cursor.fetchone()

    def insert(self, sql):
        """插入數據"""
        res = self._cursor.execute(sql)
        return res

3、完整代碼

from timeit import default_timer

import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor


class DB_MySQL_Pool:
    """db連接池"""
    __pool = None
    __MAX_CONNECTIONS = 100  # 創建連接池的最大數量
    __MIN_CACHED = 10  # 連接池中空閑連接的初始數量
    __MAX_CACHED = 20  # 連接池中空閑連接的最大數量
    __MAX_SHARED = 10  # 共享連接的最大數量
    __BLOCK = True  # 超過最大連接數量時候的表現,為True等待連接數量下降,為false直接報錯處理
    __MAX_USAGE = 100  # 單個連接的最大重復使用次數
    __CHARSET = 'UTF8'
    '''
        set_session:可選的SQL命令列表,可用於准備
                    會話,例如[“將日期樣式設置為...”,“設置時區...”]
                    重置:當連接返回池中時,應該如何重置連接
                    (False或None表示回滾以begin()開始的事務,
                    為安全起見,始終發出回滾命令)
    '''
    __RESET = True
    __SET_SESSION = ['SET AUTOCOMMIT = 1']  # 設置自動提交

    def __init__(self, host, port, user, password, database):
        if not self.__pool:
            self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password,
                                             database=database,
                                             maxconnections=self.__MAX_CONNECTIONS,
                                             mincached=self.__MIN_CACHED,
                                             maxcached=self.__MAX_CACHED,
                                             maxshared=self.__MAX_SHARED,
                                             blocking=self.__BLOCK,
                                             maxusage=self.__MAX_USAGE,
                                             setsession=self.__SET_SESSION,
                                             reset=self.__RESET,
                                             charset=self.__CHARSET)

    def get_connect(self):
        return self.__pool.connection()


class DB_MySQL:
    def __init__(self) -> None:
        self.__host = "自己的host"
        self.__port = 3306
        self.__user = "自己的user"
        self.__password = "自己的密碼"
        self.__database = "自己的數據庫"
        self._log_time = True
        self._log_label = "總用時"
        self.connects_pool = DB_MySQL_Pool(
            host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)

    def __enter__(self):
        # 如果需要記錄時間
        if self._log_time is True:
            self._start = default_timer()

        connect = self.connects_pool.get_connect()
        cursor = connect.cursor(pymysql.cursors.DictCursor)
        # https://blog.51cto.com/abyss/1736844
        # connect.autocommit = False # 如果使用連接池 則不能在取出后設置 而應該在創建線程池時設置
        self._connect = connect
        self._cursor = cursor
        return self

    def __exit__(self, *exc_info):
        self._connect.commit()
        self._cursor.close()
        self._connect.close()

        if self._log_time is True:
            diff = default_timer() - self._start
            print('-- %s: %.6f 秒' % (self._log_label, diff))

    def select_all(self, sql):
        """查詢返回全部結果"""
        self._cursor.execute(sql)
        return self._cursor.fetchall()

    def select_one(self, sql):
        """查詢返回單個結果"""
        self._cursor.execute(sql)
        return self._cursor.fetchone()

    def insert(self, sql):
        """插入數據"""
        res = self._cursor.execute(sql)
        return res


if __name__ == '__main__':
    with DB_MySQL() as db:
        print(db.select_one("""select * from banner"""))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM