python使用dbutils的PooledDB連接池,操作數據庫


1、使用dbutils的PooledDB連接池,操作數據庫。

這樣就不需要每次執行sql后都關閉數據庫連接,頻繁的創建連接,消耗時間

2、如果是使用一個連接一直不關閉,多線程下,插入超長字符串到數據庫,運行一段時間后很容易出現OperationalError: (2006, ‘MySQL server has gone away’)這個錯誤。

使用PooledDB解決。

 

# coding=utf-8
"""
使用DBUtils數據庫連接池中的連接,操作數據庫
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql


class MysqlClient(object):
    __pool = None;

    def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
                 maxusage=100, setsession=None, reset=True,
                 host='127.0.0.1', port=3306, db='test',
                 user='root', passwd='123456', charset='utf8mb4'):
        """

        :param mincached:連接池中空閑連接的初始數量
        :param maxcached:連接池中空閑連接的最大數量
        :param maxshared:共享連接的最大數量
        :param maxconnections:創建連接池的最大數量
        :param blocking:超過最大連接數量時候的表現,為True等待連接數量下降,為false直接報錯處理
        :param maxusage:單個連接的最大重復使用次數
        :param setsession:optional list of SQL commands that may serve to prepare
            the session, e.g. ["set datestyle to ...", "set time zone ..."]
        :param reset:how connections should be reset when returned to the pool
            (False or None to rollback transcations started with begin(),
            True to always issue a rollback for safety's sake)
        :param host:數據庫ip地址
        :param port:數據庫端口
        :param db:庫名
        :param user:用戶名
        :param passwd:密碼
        :param charset:字符編碼
        """

        if not self.__pool:
            self.__class__.__pool = PooledDB(pymysql,
                                             mincached, maxcached,
                                             maxshared, maxconnections, blocking,
                                             maxusage, setsession, reset,
                                             host=host, port=port, db=db,
                                             user=user, passwd=passwd,
                                             charset=charset,
                                             cursorclass=pymysql.cursors.DictCursor
                                             )
        self._conn = None
        self._cursor = None
        self.__get_conn()

    def __get_conn(self):
        self._conn = self.__pool.connection();
        self._cursor = self._conn.cursor();

    def close(self):
        try:
            self._cursor.close()
            self._conn.close()
        except Exception as e:
            print e

    def __execute(self, sql, param=()):
        count = self._cursor.execute(sql, param)
        print count
        return count

    @staticmethod
    def __dict_datetime_obj_to_str(result_dict):
        """把字典里面的datatime對象轉成字符串,使json轉換不出錯"""
        if result_dict:
            result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
            result_dict.update(result_replace)
        return result_dict

    def select_one(self, sql, param=()):
        """查詢單個結果"""
        count = self.__execute(sql, param)
        result = self._cursor.fetchone()
        """:type result:dict"""
        result = self.__dict_datetime_obj_to_str(result)
        return count, result

    def select_many(self, sql, param=()):
        """
        查詢多個結果
        :param sql: qsl語句
        :param param: sql參數
        :return: 結果數量和查詢結果集
        """
        count = self.__execute(sql, param)
        result = self._cursor.fetchall()
        """:type result:list"""
        [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
        return count, result

    def execute(self, sql, param=()):
        count = self.__execute(sql, param)
        return count

    def begin(self):
        """開啟事務"""
        self._conn.autocommit(0)

    def end(self, option='commit'):
        """結束事務"""
        if option == 'commit':
            self._conn.autocommit()
        else:
            self._conn.rollback()


if __name__ == "__main__":
    mc = MysqlClient()
    sql1 = 'SELECT * FROM shiji  WHERE  id = 1'
    result1 = mc.select_one(sql1)
    print json.dumps(result1[1], ensure_ascii=False)

    sql2 = 'SELECT * FROM shiji  WHERE  id IN (%s,%s,%s)'
    param = (2, 3, 4)
    print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)

 

如果獨立使用pymysql數據庫,最好是配合DButils庫。

上面的用法中是一直使用了mc這個對象,真實意圖不是這樣的,你可以隨意實例化MysqlClient(),用不同的MysqlClient實例操縱數據庫。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM