采用連接池的方式來操作DB
#-*- coding:utf-8 -*-
#!/usr/bin/python3
import pymysql
import configUtil
from DBUtils.PooledDB import PooledDB
class MysqlUtil(object):
# 連接池對象
__pool = None
def __init__(self, config):
# 數據庫構造函數,從連接池中取出連接,並生成操作游標
self.pool = MysqlUtil.__get_conn(config)
@staticmethod
def __get_conn(config):
"""
@summary: 靜態方法,從連接池中取出連接
@return MySQLdb.connection
"""
host = configUtil.read_config(config, "datasource_url", "mysqlConfig")
username = configUtil.read_config(config, "datasource_username", "mysqlConfig")
db_pwd = configUtil.read_config(config, "datasource_password", "mysqlConfig")
db_database = configUtil.read_config(config, "datasource_database", "mysqlConfig")
if MysqlUtil.__pool is None:
__pool = PooledDB(pymysql, mincached=1, maxcached=10, maxconnections=10,
host=host, port=3306, user=username, passwd=db_pwd,
db=db_database, use_unicode=False, blocking=False, charset="utf8")
return __pool
def get_all(self, sql):
"""
@summary: 執行查詢,並取出所有結果集
@param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來
@param param: 可選參數,條件列表值(元組/列表)
@return: result list(字典對象)/boolean 查詢到的結果集
"""
try:
con = self.pool.connection()
cur = con.cursor()
count = cur.execute(sql)
if count > 0:
result = cur.fetchall()
else:
result = False
return result
except Exception as e:
print('SQL執行有誤,原因:', e)
finally:
cur.close()
con.close()
def update(self, sql):
try:
con = self.pool.connection()
cur = con.cursor()
cur.execute(sql)
con.commit()
except Exception as e:
con.rollback() # 事務回滾
print('SQL執行有誤,原因:', e)
finally:
cur.close()
con.close()