python-- 使用 psycopg2 操作 PostgreSQL 或使用連接池 DBUtils.PooledDB 操作 PostgreSQL


 

需要安裝的軟件和第三方包
python need install pip and petl DButils psycopg2 libs

command is:

yum install python-pip
pip install psycopg2
pip install petl
pip install DButils

yum install postgresql-devel*

 

1、使用 psycopg2 操作 PostgreSQL

Python:使用psycopg2模塊操作PostgreSQL
python無網安裝psycopg2(操作postgresql,附操作代碼)


2、使用連接池 DBUtils.PooledDB 操作 PostgreSQL

參考代碼:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import sys
import os
import configparser
import logging
import psycopg2

from  DBUtils.PooledDB import PooledDB

class  DatabaseOperator(object):
    '''class for database operator
    '''
    def __init__(self, database_config_path, database_config=None):
        '''Constructor
        '''
        self._database_config_path = database_config_path
        # load database configuration
        if not database_config :
            self._database_config = self.parse_postgresql_config(database_config_path)
        else:
            self._database_config = database_config
        self._pool = None

    def database_config_empty(self):
        if self._database_config:
            return False
        else:
            return True
        
    def parse_postgresql_config(self, database_config_path=None):
        '''解析數據庫配置文件參數
        database_config_path: 數據庫配置文件路徑
        --------
        返回值: 解析配置屬性, 以 dict 類型返回數據庫配置參數 config
        '''
        if database_config_path == None and self._database_config_path != None:
            database_config_path = self._database_config_path
        if not os.path.isfile(database_config_path):
            sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path))
        parser = configparser.SafeConfigParser()
        parser.read(database_config_path)
        config = {}
        config['database'] = parser.get('testdb', 'Database')
        config['db_user'] = parser.get('testdb', 'UserName')
        config['db_passwd'] = parser.get('testdb', 'Password')
        config['db_port'] = parser.getint('testdb', 'Port')
        config['db_host'] = parser.get('testdb', 'Servername')
        self._database_config = config
        return config  
    
    def get_pool_conn(self):
        if not self._pool:
            self.init_pgsql_pool()
        return self._pool.connection()
        
    def init_pgsql_pool(self):
        '''初始化連接池
        '''
        # 字典 config 是否為空
        config = self.parse_postgresql_config()
        POSTGREIP = config['db_host']
        POSTGREPORT = config['db_port']
        POSTGREDB = config['database']
        POSTGREUSER = config['db_user']
        POSTGREPASSWD = config['db_passwd']
        try:
            logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(POSTGREIP, datetime.datetime.now()))
            pool = PooledDB(
                creator = psycopg2,  # 使用連接數據庫的模塊 psycopg2
                maxconnections = 6,  # 連接池允許的最大連接數,0 和 None 表示不限制連接數
                mincached = 1,       # 初始化時,鏈接池中至少創建的空閑的鏈接,0 表示不創建
                maxcached = 4,       # 鏈接池中最多閑置的鏈接,0 和 None 不限制
                blocking = True,     # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
                maxusage = None,     # 一個鏈接最多被重復使用的次數,None 表示無限制
                setsession = [],     # 開始會話前執行的命令列表
                host = POSTGREIP,
                port = POSTGREPORT,
                user = POSTGREUSER,
                password = POSTGREPASSWD,
                database = POSTGREDB)
            self._pool = pool
            logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(POSTGREIP, datetime.datetime.now()))
                    
        except Exception as e:
            logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(POSTGREIP, datetime.datetime.now()))
            self.close_db_cursor()
            sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))
            
    def pg_select_operator(self, sql):
        '''進行查詢操作,函數返回前關閉 cursor, conn
        '''
        # 執行查詢
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()      
            cursor.execute(sql)
            result = cursor.fetchall()
        except Exception as e:
            logging.error('ERROR: execute {0} causes error'.format(sql))
            sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.close()       
        return result

    def exec_sql(self, sql):
        result = False
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()      
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()    
        return result
    
    def pg_update_operator(self, sql): 
        result = False
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()      
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()    
        return result

    def pg_delete_operator(self, sql):
        result = False
        # 執行查詢
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()   
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()       
        return result
    
    def close_pool(self):
        '''關閉 pool
        '''
        if self._pool != None:
            self._pool.close()

    def test_pool_select(self):
        sql = 'select * from student'
        result = self.pg_select_operator(sql)
        print(result)

    def test_pool_exec(self, sqlscript):
        result = self.exec_sql(sqlscript)
        print(result)
        return result
            
if __name__ == '__main__':
    path = "sql_conf_test.conf"
    # 測試 select
    db = DatabaseOperator(database_config_path=path)
    db.test_pool_select()

    # 測試 insert
    db.test_pool_exec("insert into student(name, age) values('test', 20);")
    # 關閉 pool
    db.close_pool()

  sql_conf_test.conf

[testdb]
Database = test
UserName = xxx
Password = xxx
Port = 5432
Servername = 127.0.0.1

 

  執行 python .\DatabaseOperator.py,結果:

 

  查看數據庫

 

===========================================================

def pg_insert_operator(self, sql):
    '''進行插入操作,函數返回前關閉 cursor, conn
        返回:影響的記錄條數
    '''
    result = 0
    try:
        conn = self.get_pool_conn()
        cursor = conn.cursor()      
        cursor.execute(sql)
    except Exception as e:
        logging.error('ERROR: execute  {0} causes error'.format(sql))
        sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
    finally:
        conn.commit()
        result = cursor.rowcount
        cursor.close()
        conn.close()    
    
    return result

 

---


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM