python 使用數據庫連接池


python數據庫連接池

一丶持久數據庫 (persistent_db)

# 1. dbutils.persistent_db 中的類 PersistentDB使用任何 DB-API2 數據庫模塊
# 2. 實現到數據庫的穩定、線程仿射、持久連接。
# 3. “線程仿射”和“持久”意味着各個數據庫連接保持分配給各自的線程,並且在線程的生命周期內不會關閉

持久化.png

# 
	1. 每當線程第一次打開數據庫連接時,將打開一個到數據庫的新連接,該連接將從現在開始用於該特定線程	
    2. 當線程關閉數據庫連接時,它仍然保持打開狀態,以便下次同一個線程請求連接時,可以使用這個已經打開的連接
    3. 當線程死亡時,連接將自動關閉

簡而言之: 
	persistent_db嘗試回收數據庫連接以提高線程應用程序的整體數據庫訪問性能,但它確保線程之間永遠不會共享連接

二丶池化數據庫 (pooled_db)

匯集.png

三丶持久數據庫 (persistent_db)

### persistent_db 的參數
	- creator : 返回新的 DB-API 2 連接對象的任意函數或符合 DB-API 2 的數據庫模塊

    - maxusage : 單個連接的最大重用次數(默認為0或None表示無限重用)

    每當達到限制時,連接將被重置

    - setsession : 可用於准備會話的 SQL 命令的可選列表,例如["set datestyle to German", ...]

    - failures : 如果默認值(OperationalError,InterfaceError,InternalError)不適用於使用的數據庫模塊,則應應用連接故障轉移機制的可選異常類或異常類元組
    
    - ping : 一個可選標志,用於控制何時使用ping()方法檢查連接,如果這種方法可用(0 =無= 從不,1 = 默認 = 每當請求時, 2 = 創建游標時,4 = 當執行查詢, 7 = 總是,以及這些值的所有其他位組合)
    
    - closeable : 如果設置為 true,則允許關閉連接,但默認情況下,這將被忽略
    
    - threadlocal : 一個可選類,用於表示將使用的線程本地數據,而不是我們的 Python 實現(threading.local 更快,但不能在所有情況下都使用)
### 與本地數據庫mydb的每個連接都被重用 1000 次
import pgdb # import used DB-API 2 module 
from dbutils.persistent_db import PersistentDB 
persist = PersistentDB(pgdb, 1000, database='mydb')
### 這些參數設置生成器后,您可以請求此類數據庫連接
db = persist.connection()

四丶池化數據庫 (pooled_db)

# pooled_db 的參數
	
    - creator : 返回新的 DB-API 2 連接對象的任意函數或符合 DB-API 2 的數據庫模塊
        
    - mincached : 池中的初始空閑連接數(默認為0表示啟動時不建立連接)

    - maxcached : 池中的最大空閑連接數(默認值0或None表示無限池大小)
        
    - maxshared : 允許的最大共享連接數(默認值0或None表示所有連接都是專用的)

當達到此最大數量時,如果連接被請求為可共享,則連接將被共享。
        
    - maxconnections : 一般允許的最大連接數(默認值0或None表示任意數量的連接)
        
    - blocking : 確定超過最大值時的行為
    
    - maxusage : 單個連接的最大重用次數(默認為0或None表示無限重用), 當達到此連接的最大使用次數時,連接會自動重置(關閉並重新打開)
        
    - setsession : 可用於准備會話的 SQL 命令的可選列表,例如["set datestyle to German", ...]
        
    - reset : 返回池時應如何重置連接(False或None回滾以begin() 開始的事務,默認值True出於安全考慮總是發出回滾)
        
    - failures : 如果默認值(OperationalError,InterfaceError,InternalError)不適用於使用的數據庫模塊,則應應用連接故障轉移機制的可選異常類或異常類元組
        
    - ping : 一個可選標志,用於控制何時使用ping()方法檢查連接(如果此類方法可用)(0 =無= 從不,1 = 默認 = 每當從池中獲取時, 2 = 創建游標時,4 = 何時執行查詢, 7 = 總是,以及這些值的所有其他位組合)
### 想要一個與本地數據庫mydb的至少五個連接的數據庫連接池
import pgdb # import used DB-API 2 module 
from dbutils.pooled_db import PooledDB 
pool = PooledDB(pgdb, 5, database='mydb')

### 設置連接池后,可以從該池請求數據庫連接:
db = pool.connection()

### 設置非零maxshared參數, 默認情況下連接可能會與其他線程共享。如果您想擁有專用連接
db = pool.connection(shareable=False)
		↓
db = pool.dedicated_connection() # 專用連接


### 如果您不再需要它,您應該立即使用 db.close() 將其返回到池中。您可以以相同的方式獲得另一個連接
### 連接不是線程安全的,這將過早釋放連接以供重用
db = pool.connection()
cur = db.cursor()
cur.execute(...)
res = cur.fetchone()
cur.close()  # or del cur
db.close()  # or del db
### 以將上下文管理器用於更簡單的代碼

with pool.connection() as db:
    with db.cursor() as cur:
        cur.execute(...)
        res = cur.fetchone()

五丶 代碼+官網

# 官網
https://webwareforpython.github.io/DBUtils/main.html
### 腳本
# -*- coding: utf-8 -*-
import os
import logging
import pymysql
from dbutils.pooled_db import PooledDB

base_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(
    level=30,
    filename=os.path.join(base_path, 'log', 'db_process.log'),
    filemode='a',
)


class DbTool(object):
    """
      # 單例模式  +  數據連接池
    """
    mincached = 10  # 連接池種空閑連接的初始數量
    maxcached = 20  # 連接池種空閑連接的最大數量
    maxshared = 10  # 共享連接的最大數量
    maxconnections = 200  # 創建連接池的最大數量
    blocking = True  # 超過最大連接數量的時. True等待連接數下降,False直接報錯處理
    maxusage = 100  # 單個連接的最大重復使用次數
    setsession = None  #
    reset = True  #
    _pool = None
    __instance = None

    def __init__(self, db_config):

        host = db_config['host']
        port = db_config['port']
        user = db_config['user']
        password = db_config['password']
        db = db_config['db']

        if not self._pool:
            self._pool = PooledDB(
                creator=pymysql,
                maxconnections=self.maxconnections,
                mincached=self.mincached,
                maxcached=self.maxcached,
                blocking=self.blocking,
                maxusage=self.maxusage,
                setsession=self.setsession,
                host=host,
                port=port,
                user=user,
                password=password,
                database=db,
            )
        logging.info('SUCCESS: create postgresql success.\n')

    def __new__(cls, *args, **kwargs):
        """
        :param args:
        :param kwargs:
        """
        if not cls.__instance:
            cls.__instance = super(DbTool, cls).__new__(cls)
        return cls.__instance

    def pg_select_operator(self, sql):
        '''
            # 查詢 SQL
        :param sql:  sql語句
        :return: sql結果
        '''
        conn = self._pool.connection()
        cursor = conn.cursor()
        result = False
        try:
            cursor.execute(sql)
            result = cursor.fetchall()
        except Exception as e:
            logging.error('ERROR:', e)
        finally:
            cursor.close()
            conn.close()

        return result

    def pg_update_operator(self, sql):
        result = False
        conn = self._pool.connection()
        cursor = conn.cursor()
        try:
            cursor.execute(sql)
            result = True
        except Exception as e:
            logging.error('ERROR:', e)
        finally:
            cursor.close()
            conn.commit()
            conn.close()
        return result

    def pg_insert_operator(self, sql):
        result = False
        conn = self._pool.connection()
        cursor = conn.cursor()
        try:
            cursor.execute(sql)
            result = True
        except Exception as e:
            logging.error('ERROR:', e)
        finally:
            # 關閉連接錢提交
            cursor.close()
            conn.commit()
            conn.close()
        return result

    def close_pool(self):
        '''
            關閉連接池
        :return:
        '''
        if self._pool != None:
            self._pool.close()


if __name__ == '__main__':
    db_config = {
        "host": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "password": "",
        "db": "testdb"
    }

    obj = DbTool(db_config)
    sql = """
    INSERT INTO `testdb`.`p_user`(`id`, `user_id`, `user_name`, `user_sex`, `user_phone`, `user_email`, `user_pwd`, `isadmin`, `create_time`, `end_time`, `user_origal`) VALUES (12, '10000001', 'admin_x', '1', '13811111112', 'admin_x@163.com', 'pbkdf2:sha256:150000$EU5aTt0N$555f3c7a0d28c092f8b5c3402f0218fffd51b6fa83bab54fed1670f969898c55', '1', '2021-08-01 11:34:07', NULL, NULL);

    """
    print('id:', id(obj))
    try:
        obj.pg_insert_operator(sql)
    except Exception as e:
        print(e)

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM