python連接Greenplum數據庫


配置greenplum客戶端認證

配置pg_hba.conf 

cd /home/gpadmin/gpdbdata/master/gpseg-1
vim pg_hba.conf 
增加
host    all         gpadmin          10.1.201.55/32    trust

[gpadmin@ gpseg-1]$ export PGDATA=/home/gpadmin/gpdbdata/master/gpseg-1
[gpadmin@ gpseg-1]$ pg_ctl reload -D $PGDATA
server signaled

使用Psycopg2訪問數據庫

Psycopg2 是 Python 語言下最常用的連接PostgreSQL數據庫連接庫,Psycopg2 的底層是由 C 語言封裝 PostgreSQL 的標准庫  libpq 實現的,

運行速度非常快,Psycopg2支持大型多線程應用的大量並發Insert和Update操作,Psycopg2完全兼容 DB API 2.0 

安裝Psycopg2 

pip install psycopg2

Psycopg2使用參考文檔

http://initd.org/psycopg/docs/index.html

Psycopg2 連接PostgreSQL數據庫接口

 Psycopg2提供的操作數據庫的兩個重要類是ConnectionCursor,和獲取數據庫連接的快捷函數connect()

psycopg2.connect(host="localhost", port="5432", dbname="testdb", user="gpadmin", password="123456")

常用關鍵詞參數說明如下:

  • host:主機名或 IP 地址
  • port:連接PostgreSQL數據庫使用的端口
  • dbname:連接的數據庫,默認為與用戶名同名的數據庫
  • user:連接數據庫的用戶
  • password:連接數據庫用戶的密碼

Connection類方法說明

Connection類用於獲取到PostgreSQL數據庫的連接,以下介紹Connection類常用的方法,詳細內容閱讀 Psycopg2 Connection 類

  • Connection()構造函數

    用於構造一個到PostgreSQL數據庫的連接,常用的是使用connect()快捷函數構造數據庫連接

  • connection.cursor()

    用於從當前的數據庫連接中獲取一個Cursor對象(游標),用於執行SQL語句。

  • cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

    這里創建的是一個字典Cursor, 這樣返回的數據, 都是字典的形式, 方便使用

  • connection.close()

    關閉當前的數據庫連接

  • connection.commit()

    此方法提交當前事務。如果不調用這個方法,無論做了什么修改,數據不能保存到數據庫中。

  • connection.rollback()

    此方法會回滾任何更改數據庫數據

Cursor類方法說明

Cursor類用於執行SQL語句,並返回執行結果,以下介紹Cursor類常用的方法,詳細內容閱讀 Psycopg2 Cursor 類

  • cursor.execute(query)

    用於執行SQL語句 query

  • cursor.mogrify(query)

    會返回生成的sql腳本, 用以查看生成的sql是否正確

  • cursor.fetchall()

    獲取SQL執行結果中的所有記錄,返回值是一個元組的列表,每一條記錄是一個元組

  • cursor.fetchmany(([size=cursor.arraysize]))

    獲取SQL執行結果中指定條數的記錄,記錄數由size指定,當不指定size值時,默認為arraysize屬性的值,arraysize屬性的默認值是1;返回值是一個元組的列表,每一條記錄是一個元組

  • cursor.fetchone()

    獲取執行結果中的一條記錄

  • cursor.close()

    關閉當前連接的游標

  • curosr.callproc(procname[, parameters])

    這個程序執行的存儲數據庫程序給定的名稱。該程序預計為每一個參數,參數的順序必須包含一個條目

  • cursor.rowcount

    只讀屬性,它返回數據庫中的行的總數已修改,插入或刪除最后 execute*()

psycopg2.pool模塊說明

提供了一些純Python類直接在客戶端應用程序實現簡單的連接池。

class psycopg2.pool.AbstractConnectionPool(minconn, maxconn, *args, **kwargs)

基類實現通用的基於密鑰池代碼。

      自動創建新的minconn連接。池將支持的最大maxconn連接。* args,* * kwargs傳遞到connect()函數。

      以下預計將由子類實現方法:

getconn(key=None)

   得到一個空連接,並將其分配給key如果不是沒有。

putconn(conn, key=None, close=False)

     put away 一個連接。

  如果關閉是真的,則放棄池中的連接。

 closeall()

      關閉池處理的所有連接。

      請注意所有的連接都關閉,包括最終在應用程序中使用的連接。

 

 下面的類,子類可以使用abstractconnectionpool。

class psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwargs)

      不能在不同線程中共享的連接池。

      請注意,這個池類僅用於單線程應用程序。

 

class psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwargs)

      一個連接池與線程模塊一起工作。

      注意這個池類可以安全地應用在多線程應用程序中。

Psycopg2中可用的異常錯誤類

異常錯誤類的繼承關系如下:

StandardError
|__Warning
|__Error
   |__InterfaceError
   |__DatabaseError
      |__DataError
      |__OperationalError
         |__ psycopg2.extensions.QueryCanceledError
         |__ psycopg2.extensions.TransactionRollbackError
      |__IntegrityError
      |__InternalError
      |__ProgrammingError
      |__NotSupportedError

 

異常 描述
psycopg2.Warning 當有嚴重警告時觸發,例如插入數據是被截斷等等。
psycopg2.Error 警告以外所有其他錯誤類。
psycopg2.InterfaceError 當有數據庫接口模塊本身的錯誤(而不是數據庫的錯誤)發生時觸發。
psycopg2.DatabaseError 和數據庫有關的錯誤發生時觸發。
psycopg2.DataError 當有數據處理時的錯誤發生時觸發,例如:除零錯誤,數據超范圍等等。
psycopg2.OperationalError 指非用戶控制的,而是操作數據庫時發生的錯誤。例如:連接意外斷開、 數據庫名未找到、事務處理失敗、內存分配錯誤等等操作數據庫是發生的錯誤。
psycopg2.IntegrityError 完整性相關的錯誤,例如外鍵檢查失敗等。
psycopg2.InternalError 數據庫的內部錯誤,例如游標(cursor)失效了、事務同步失敗等等。
psycopg2.ProgrammingError 程序錯誤,例如數據表(table)沒找到或已存在、SQL語句語法錯誤、 參數數量錯誤等等。
psycopg2.NotSupportedError 不支持錯誤,指使用了數據庫不支持的函數或API等。例如在連接對象上 使用.rollback()函數,然而數據庫並不支持事務或者事務已關閉。

Psycopg2使用舉例

簡單的增加,查詢記錄

import psycopg2
import psycopg2.extras
import time

'''
    連接數據庫
    returns:db
'''
def gp_connect():
    try:
        db = psycopg2.connect(dbname="testdb",
                              user="gpadmin",
                              password="gpadmin",
                              host="10.1.208.42",
                              port="5432")
        # connect()也可以使用一個大的字符串參數,
        # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
        return db
    except psycopg2.DatabaseError as e:
        print("could not connect to Greenplum server",e)


if __name__ == '__main__':
    conn = gp_connect()
    print(conn)
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    # 這里創建的是一個字典Cursor, 這樣返回的數據, 都是字典的形式, 方便使用
    ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
    conn.commit()
    # 提交到數據庫中
    print(ret)
    ret = cur.execute("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);",(300, "abc'def"))

    conn.commit()
    # 提交到數據庫中
    print(cur.rowcount)  # 1
    # 返回數據庫中的行的總數已修改,插入或刪除最后 execute*().

    ret_sql = cur.mogrify("select * from pg_tables where tablename = %s;", ('gp_test',))
    # 返回生成的sql腳本, 用以查看生成的sql是否正確.
    # sql腳本必須以;結尾, 不可以省略.其次, 不管sql中有幾個參數, 都需要用 % s代替, 只有 % s, 不管值是字符還是數字, 一律 % s.
    # 最后, 第二個參數中, 一定要傳入元組, 哪怕只有一個元素, 像我剛才的例子一樣, ('gp_test')這樣是不行的.
    print(ret_sql.decode('utf-8'))  # select * from pg_tables where tablename = E'gp_test';

    cur.execute("select * from gp_test where num = %s;", (300,))
    pg_obj = cur.fetchone()
    print(pg_obj) # {'id': 1, 'num': 300, 'data': "abc'def"}

    conn.close() # 關閉連接

批量插入,查詢

    conn = gp_connect()
    print(conn)
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    # # 這里創建的是一個字典Cursor, 這樣返回的數據, 都是字典的形式, 方便使用
    # ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
    # conn.commit()
    # # 提交到數據庫中
    # print(ret)
    gp_list = []
    for i in range(200):
        gp_list.append((i,'abc%s'%i))
    # print(gp_list)
    # 批量提交數據
    ret = cur.executemany("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);", gp_list)
    conn.commit()
    # 提交到數據庫中
    print(cur.query)  # 查看上一條執行的腳本
    print(cur.rowcount)  # 200
    # 返回數據庫中的行的總數已修改,插入或刪除最后 execute*().
    cur.execute("select  count(*) num from gp_test")
    pg_obj = cur.fetchone()
    print(pg_obj)  # {'num': 200}

    conn.close()  # 關閉連接

使用連接池,執行高性能的批量插入與查詢

import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime

'''
    連接數據庫
    使用數據庫連接池
    returns:db
'''
def gp_connect():
    try:
        simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5,dbname="testdb",
                              user="gpadmin",
                              password="gpadmin",
                              host="10.1.208.42",
                              port="5432")
        # connect()也可以使用一個大的字符串參數,
        # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
        # 從數據庫連接池獲取連接
        conn = simple_conn_pool.getconn()
        return conn
    except psycopg2.DatabaseError as e:
        print("could not connect to Greenplum server",e)


if __name__ == '__main__':
    conn = gp_connect()
    print(conn)
    cur = conn.cursor()
    # 批量查詢大小
    batch_size = 1000
    gp_list = []
    for i in range(2000, 100000):
        gp_list.append((i,'abc%s'%i))
    # print(gp_list)

    # 開始時間
    start_time = datetime.now()
    # 批量提交數據execute_values性能大於executemany
    psycopg2.extras.execute_values(cur, "INSERT INTO public.gp_test (num, data) VALUES %s", gp_list)
    conn.commit()
    # 提交到數據庫中
    cur.execute("select  *  from gp_test order by id")
    count = 0

    while True:
        count = count + 1
        # 每次獲取時會從上次游標的位置開始移動size個位置,返回size條數據
        data = cur.fetchmany(batch_size)
        # 數據為空的時候中斷循環
        if not data:
            break
        else:
            print(data[-1])  # 得到最后一條(通過元祖方式返回)
        print('獲取%s到%s數據成功' % ((count - 1) * batch_size, count * batch_size))
    print('insert到fetchmany獲取全量數據所用時間:', (datetime.now() - start_time).seconds) # 16s
conn.close()  # 關閉連接

執行高性能的批量更新與查詢

import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime

'''
    連接數據庫
    使用數據庫連接池
    returns:db
'''
def gp_connect():
    ……略

if __name__ == '__main__':
    conn = gp_connect()
    print(conn)
    cur = conn.cursor()
    # 批量查詢大小
    batch_size = 1000
    gp_uplist = [] # 更新列表
    for i in range(2000, 10000):
        gp_uplist.append((i,'def%s'%i))
    print(gp_uplist)

    # 開始時間
    start_time = datetime.now()
    # 批量提交數據execute_values性能大於executemany

    sql = "UPDATE public.gp_test SET data = TEST.data  " \
          "FROM (VALUES %s) AS TEST(num, data) " \
          "WHERE public.gp_test.num = TEST.num"
    # 批量更新語句模版 UPDATE TABLE SET TABLE.COL = XX.col
    # FROM (VALUES %s) AS XX(id_col,col)
    # WHERE TABLE.id_col = XX.id_col 
    # XX為別名
    psycopg2.extras.execute_values(cur, sql, gp_uplist, page_size=100)
    print(cur.query)
    conn.commit()
    # 提交到數據庫中
    cur.execute("select  *  from gp_test order by id")
    count = 0

    while True:
        count = count + 1
        # 每次獲取時會從上次游標的位置開始移動size個位置,返回size條數據
        data = cur.fetchmany(batch_size)
        # 數據為空的時候中斷循環
        if not data:
            break
        else:
            print(data[-1])  # 得到最后一條(通過元祖方式返回)
        print('獲取%s到%s數據成功' % ((count - 1) * batch_size, count * batch_size))
    print('update到fetchmany獲取全量數據所用時間:', (datetime.now() - start_time).seconds) # 16s
conn.close()  # 關閉連接

 使用服務端游標

當執行一個數據庫查詢時,Pscopg cursor通常將查詢到的所有數據返回給客戶端,如果返回的數據過大,則將占用客戶端大量的內存。因此,psycopg提供了一種成為server side curosr機制,每次返回可控制數量的數據。

Server side cursor是使用PostgreSQL的DECLARE命令創建,並經過MOVE、FETCH和CLOSE命令處理的。

Psycopg通過命名的cursors裝飾server side cursor的,而命名cursor是通過對cursor()方法指定name參數而創建的。

server side cursor允許用戶在數據集中使用scroll()移動游標,並通過fetchone()和fetchmany()方法獲取數據。

  • scrollable:控制游標是否可以向后移動
  • itersize:控制每次可以獲取多少條數據,默認是2000
#逐條處理
with psycopg2.connect(database_connection_string) as conn:
    with conn.cursor(name='name_of_cursor') as cursor:

        cursor.itersize = 20000

        query = "SELECT * FROM ..."
        cursor.execute(query)

        for row in cursor:
         # process row

#2 一次處理多條
while True:
    rows = cursor.fetchmany(100)
    if len(rows) > 0:
        for row in rows:
            # process row
    else:
        break

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM