python使用dbutils的PooledDB連接池,操作數據庫
1、使用dbutils的PooledDB連接池,操作數據庫。 這樣就不需要每次執行sql后都關閉數據庫連接,頻繁的創建連接,消耗時間 2、如果是使用一個連接一直不關閉,多線程下,插入超長字符串到數據庫,運行一段時間后很容易出現OperationalError: (2006, ‘MySQL server has gone away’)這個錯誤。 使用PooledDB解決。 3、記錄工作中連接mysql和sqlserver的兩種示例
連接mysql數據庫

# coding=utf-8 """ 使用DBUtils數據庫連接池中的連接,操作數據庫 OperationalError: (2006, ‘MySQL server has gone away’) """ import json import pymysql import datetime from DBUtils.PooledDB import PooledDB import pymysql class MysqlClient(object): __pool = None; def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True, maxusage=100, setsession=None, reset=True, host='127.0.0.1', port=3306, db='test', user='root', passwd='123456', charset='utf8mb4'): """ :param mincached:連接池中空閑連接的初始數量 :param maxcached:連接池中空閑連接的最大數量 :param maxshared:共享連接的最大數量 :param maxconnections:創建連接池的最大數量 :param blocking:超過最大連接數量時候的表現,為True等待連接數量下降,為false直接報錯處理 :param maxusage:單個連接的最大重復使用次數 :param setsession:optional list of SQL commands that may serve to prepare the session, e.g. ["set datestyle to ...", "set time zone ..."] :param reset:how connections should be reset when returned to the pool (False or None to rollback transcations started with begin(), True to always issue a rollback for safety's sake) :param host:數據庫ip地址 :param port:數據庫端口 :param db:庫名 :param user:用戶名 :param passwd:密碼 :param charset:字符編碼 """ if not self.__pool: self.__class__.__pool = PooledDB(pymysql, mincached, maxcached, maxshared, maxconnections, blocking, maxusage, setsession, reset, host=host, port=port, db=db, user=user, passwd=passwd, charset=charset, cursorclass=pymysql.cursors.DictCursor ) self._conn = None self._cursor = None self.__get_conn() def __get_conn(self): self._conn = self.__pool.connection(); self._cursor = self._conn.cursor(); def close(self): try: self._cursor.close() self._conn.close() except Exception as e: print e def __execute(self, sql, param=()): count = self._cursor.execute(sql, param) print count return count @staticmethod def __dict_datetime_obj_to_str(result_dict): """把字典里面的datatime對象轉成字符串,使json轉換不出錯""" if result_dict: result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)} result_dict.update(result_replace) return result_dict def select_one(self, sql, param=()): """查詢單個結果""" count = self.__execute(sql, param) result = self._cursor.fetchone() """:type result:dict""" result = self.__dict_datetime_obj_to_str(result) return count, result def select_many(self, sql, param=()): """ 查詢多個結果 :param sql: qsl語句 :param param: sql參數 :return: 結果數量和查詢結果集 """ count = self.__execute(sql, param) result = self._cursor.fetchall() """:type result:list""" [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result] return count, result def execute(self, sql, param=()): count = self.__execute(sql, param) return count def begin(self): """開啟事務""" self._conn.autocommit(0) def end(self, option='commit'): """結束事務""" if option == 'commit': self._conn.autocommit() else: self._conn.rollback() if __name__ == "__main__": mc = MysqlClient() sql1 = 'SELECT * FROM shiji WHERE id = 1' result1 = mc.select_one(sql1) print json.dumps(result1[1], ensure_ascii=False) sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)' param = (2, 3, 4) print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)
如果獨立使用pymysql數據庫,最好是配合DButils庫。
連接sqlserver數據庫

from DBUtils.PooledDB import PooledDB class SqlClient(object): """默認是連接sqlserver的客戶端""" def __init__(self, host=None, user=None, password=None, sql=None, database=None): if database: pool = PooledDB(pymssql, database=database, mincached=5, maxcached=10, maxshared=5, maxconnections=10, blocking=True, maxusage=100, setsession=None, reset=True, host=host, user=user, password=password ) else: pass conn = pool.connection() cursor = conn.cursor() self.sql = sql self.conn = conn self.cursor = cursor def all(self): self.cursor.execute(self.sql) self.data = self.cursor.fetchall() return self.data def first(self): self.cursor.execute("select top 1 * from v_ASRS_STORE_MESVIEW") self.data = self.cursor.fetchone() return self.data def count(self, sql="select count(*) as count from v_ASRS_STORE_MESVIEW"): self.cursor.execute(sql) self.data = self.cursor.fetchone() return self.data[0] def close(self): self.conn.close() self.cursor.close()

from client import SqlClient CONFIG = { "default": {"HOST": "10.10.10.10", "USER": "aaa", "NAME": "AAA", "PASSWORD": "123"}, } def main(): conf = dict( host=CONFIG['default']['HOST'], user=CONFIG['default']['USER'], database=CONFIG['default']['NAME'], password=CONFIG['default']['PASSWORD']) sc = SqlClient(sql, **conf) res = sc.all() print(res)
OK, thank