Python實現Mysql數據庫連接池


python連接Mysql數據庫:

Python編程中可以使用MySQLdb進行數據庫的連接及諸如查詢/插入/更新等操作,但是每次連接MySQL數據庫請求時,都是獨立的去請求訪問,相當浪費資源,而且訪問數量達到一定數量時,對mysql的性能會產生較大的影響。因此,實際使用中,通常會使用數據庫的連接池技術,來訪問數據庫達到資源復用的目的。

數據庫連接池

python的數據庫連接池包 DBUtils:

 

DBUtils是一套Python數據庫連接池包,並允許對非線程安全的數據庫接口進行線程安全包裝。DBUtils來自Webware for Python。

DBUtils提供兩種外部接口:
  • * PersistentDB :提供線程專用的數據庫連接,並自動管理連接。
  • * PooledDB :提供線程間可共享的數據庫連接,並自動管理連接。

需要的python庫:
下載DBUtils:
Webware 的網站下載最新版本:http://www.webwareforpython.org/downloads/DBUtils/
或者在Python Package Index來下載:http://www.python.org/pypi/DBUtils/

下載pymssql:
http://code.google.com/p/pymssql/downloads/list
(pymssql 是Python語言用來連接微軟 SQL SERVER 數據庫的類庫)



1.寫一個創建連接池,獲取連接以及重新連接數據庫的模塊:

# libby_db_pool.py
# 代碼如下:


#-*- coding:utf-8 -*-
from DBUtils.PooledDB import PooledDB
import pymssql #sqlserver數據庫適配器

from pymssql import OperationalError, InternalError, ProgrammingError

HOST = "127.0.0.1"
PORT = "1433"
CHARSET = "utf8"
NAME = "zkeco_oracle"
USER = "sa"
PASSWORK = "sa"



conn_args = {
    'host':"%s"%HOST,
    'port':"%s"%PORT,
    'database':"%s"%NAME,
    'charset':"%s"%CHARSET,
    'user':"%s"%USER,
    'password':"%s"%PASSWORK
}

"""
    mincached : 啟動時開啟的閑置連接數量(缺省值 0 以為着開始時不創建連接)
    maxcached : 連接池中允許的閑置的最多連接數量(缺省值 0 代表不閑置連接池大小)
    maxshared : 共享連接數允許的最大數量(缺省值 0 代表所有連接都是專用的)如果達到了最大數量,被請求為共享的連接將會被共享使用
    maxconnecyions : 創建連接池的最大數量(缺省值 0 代表不限制)
    blocking : 設置在連接池達到最大數量時的行為(缺省值 0 或 False 代表返回一個錯誤<toMany......>; 其他代表阻塞直到連接數減少,連接被分配)
    maxusage : 單個連接的最大允許復用次數(缺省值 0 或 False 代表不限制的復用).當達到最大數時,連接會自動重新連接(關閉和重新打開)
    setsession : 一個可選的SQL命令列表用於准備每個會話,如["set datestyle to german", ...]

"""

args = (10,10,30,100,True,0,None)

class DbManager():
    def __init__(self):
        try:
            self._pool = PooledDB(pymssql,*args,**conn_args)
        except Exception,e:
            print "The parameters for DBUtils is:",conn_args
    def _getConn(self):
        return self._pool.connection()
    
_dbManager = DbManager()

def getConn():
    """ 獲取數據庫連接 """
    return _dbManager._getConn()

def _reConn():
    """ 重新連接數據庫 """
    global _dbManager
    re = False
    try:
        _dbManager = DbManager()
        re = True
    except:
        import traceback
        traceback.print_exc()
    finally:
        return re
import datetime

def reConn():
    print "%s: now try to reconnect Database!"%(datatime.datatime.now())
    flag = _reConn()
    if flag:
        print "%s reconnect database success!"%(datatime.datatime.now())
    else:
        print "%s reconnect database failed!"%(datatime.datatime.now())



2.寫一個支持增刪查改功能的連接池模塊:

#libby_sql_utils.py
#代碼如下:


#-*- coding:utf-8 -*-
from libby_db_pool import getConn,reConn
from libby_db_pool import OperationalError, InternalError, ProgrammingError
import traceback
def test_conn():
    """
        測試連接池連接是否正常
        return:
        res:True:正常,False:不正常
        msg:如果不正常,為異常信息
    """
    
    test_sql = """
        select 1
    
    """
    
    conn = None
    cur = None
    res = False
    msg = ""
    try:
        conn = getConn()
        cur = conn.cursor()
        cur.execute(test_sql)
        res = cur.fetchall()
        res = True
    except Exception,e:
        trackback.print_exc()
        msg = e
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
        return res,msg


def call_reConn():
    """
        重新創建連接池
    """
    reConn()
    
def p_query(sql):
    """
        dbutils 數據連接池
            只能執行數據查詢sql語句,否則會拋錯
        @parm: 要執行的sql語句
        @return:
            []:查詢結果為空
            None:sql語句執行失敗,出現異常
                    二維list:正常結果
    """
    
    conn = None
    cur = None
    res = None
    try:
        conn = getConn()
        cur = conn.cursor()
        cur.execute(sql)
        res = cur.fetchall()
    except (OperationalError, InternalError):
        call_reConn()
        trackback.print_exc()
    except:
        trackback.exc()
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
        return res

def p_query_one(sql):
    """
        dbutils 數據連接池
                只能執行數據查詢sql語句,否則會報錯
                執行sql查詢語句,獲取第一條記錄
        @parm:要執行的sql語句
        @return:
            []:查詢結果為空
            None:sql語句執行失敗,出現異常
            list:正常結果
    """
    
    conn = None
    cur = None
    res = None
    try:
        conn = getConn()
        cur = conn.cursor()
        cur.execute(sql)
        res = cur.fetchone()
    except (OperationalError,InternalError):
        call_reConn()
    except:
        traceback.print_exc()
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
        return res



def p_execute(sql):
    """
        dbutils 數據連接池
                執行數據操作語句,包括 update,insert,delete
        @parm:要執行的sql
        @return:
            None:sql語句執行失敗,出現異常
            number:影響記錄條數
            -2:數據庫連接失敗導致執行失敗
    """
    conn = None
    cur = None
    res = None
    try:
        conn = getConn()
        cur = conn.cursor()
        cur.execute(sql)
        res = cur._cursor.rowcount
        conn.commit()
    except Exception,e:
        if conn:
            conn.rollback()
        traceback.print_exc()
    finally:
        if res == -1:#可能是數據庫斷開連接
            ret,msg = test_conn()
            if not ret:
                call_reConn()
                res = -2
        if cur:
            cur.close()
        if conn:
            conn.close()
        return res
    


def p_mutiexec(sql_list):
    """
        dbutils 數據連接池
                執行多條數據操作語句,可以用於多條sql語句的事務性操作,包括 update,insert,delete
        
        @parm:要執行的sql語句[]
        @return:
            (flag,res):
                flag<Ture or False>:批次是否全部執行成功
                res<list>:每天sql語句執行影響的行數,如果執行失敗,由此可以判斷第幾條sql語句執行失敗
                            如果遇到數據庫斷開的情況,返回[-2,]
    
    """
    
    conn = None
    cur = None
    res = []
    flag = True
    
    try:
        conn = getConn()
        cur = conn.cursor()
        for sql in sql_list:
            cur.execute(sql)
            num = cur._cursor.rowcount
            res.append(num)
        conn.commit()
    except Exception,e:
        if conn:
            conn.rollback()
        traceback.print_exc()
    finally:
        if -1 in res:
            ret,msg = test_conn()
            if not ret:
                call_reConn()
                flag = False
                res = [-2,]
        if cur:
            cur.close()
        if conn:
            conn.close()
        return flag,res


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM