配置greenplum客戶端認證
配置pg_hba.conf
cd /home/gpadmin/gpdbdata/master/gpseg-1 vim pg_hba.conf 增加 host all gpadmin 10.1.201.55/32 trust [gpadmin@ gpseg-1]$ export PGDATA=/home/gpadmin/gpdbdata/master/gpseg-1 [gpadmin@ gpseg-1]$ pg_ctl reload -D $PGDATA server signaled
使用Psycopg2訪問數據庫
Psycopg2 是 Python 語言下最常用的連接PostgreSQL數據庫連接庫,Psycopg2 的底層是由 C 語言封裝 PostgreSQL 的標准庫 libpq 實現的,
運行速度非常快,Psycopg2支持大型多線程應用的大量並發Insert和Update操作,Psycopg2完全兼容 DB API 2.0
安裝Psycopg2
pip install psycopg2
Psycopg2使用參考文檔
http://initd.org/psycopg/docs/index.html
Psycopg2 連接PostgreSQL數據庫接口
Psycopg2提供的操作數據庫的兩個重要類是Connection
,Cursor
,和獲取數據庫連接的快捷函數connect()
psycopg2.connect(host="localhost", port="5432", dbname="testdb", user="gpadmin", password="123456")
常用關鍵詞參數說明如下:
- host:主機名或 IP 地址
- port:連接PostgreSQL數據庫使用的端口
- dbname:連接的數據庫,默認為與用戶名同名的數據庫
- user:連接數據庫的用戶
- password:連接數據庫用戶的密碼
Connection類方法說明
Connection類用於獲取到PostgreSQL數據庫的連接,以下介紹Connection類常用的方法,詳細內容閱讀 Psycopg2 Connection 類。
- Connection()構造函數
用於構造一個到PostgreSQL數據庫的連接,常用的是使用
connect()
快捷函數構造數據庫連接 - connection.cursor()
用於從當前的數據庫連接中獲取一個Cursor對象(游標),用於執行SQL語句。
-
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
這里創建的是一個字典Cursor, 這樣返回的數據, 都是字典的形式, 方便使用
- connection.close()
關閉當前的數據庫連接
- connection.commit()
此方法提交當前事務。如果不調用這個方法,無論做了什么修改,數據不能保存到數據庫中。
- connection.rollback()
此方法會回滾任何更改數據庫數據
Cursor類方法說明
Cursor類用於執行SQL語句,並返回執行結果,以下介紹Cursor類常用的方法,詳細內容閱讀 Psycopg2 Cursor 類。
- cursor.execute(query)
用於執行SQL語句 query
-
cursor.mogrify(query)
會返回生成的sql腳本, 用以查看生成的sql是否正確
-
cursor.fetchall()
獲取SQL執行結果中的所有記錄,返回值是一個元組的列表,每一條記錄是一個元組
-
cursor.fetchmany(([size=cursor.arraysize]))
獲取SQL執行結果中指定條數的記錄,記錄數由
size
指定,當不指定size
值時,默認為arraysize
屬性的值,arraysize
屬性的默認值是1
;返回值是一個元組的列表,每一條記錄是一個元組 -
cursor.fetchone()
獲取執行結果中的一條記錄
- cursor.close()
關閉當前連接的游標
- curosr.callproc(procname[, parameters])
這個程序執行的存儲數據庫程序給定的名稱。該程序預計為每一個參數,參數的順序必須包含一個條目
- cursor.rowcount
只讀屬性,它返回數據庫中的行的總數已修改,插入或刪除最后 execute*()
psycopg2.pool模塊說明
提供了一些純Python類直接在客戶端應用程序實現簡單的連接池。
class psycopg2.pool.AbstractConnectionPool(minconn, maxconn, *args, **kwargs)
基類實現通用的基於密鑰池代碼。
自動創建新的minconn連接。池將支持的最大maxconn連接。* args,* * kwargs傳遞到connect()函數。
以下預計將由子類實現方法:
getconn(key=None)
得到一個空連接,並將其分配給key如果不是沒有。
putconn(conn, key=None, close=False)
put away 一個連接。
如果關閉是真的,則放棄池中的連接。
closeall()
關閉池處理的所有連接。
請注意所有的連接都關閉,包括最終在應用程序中使用的連接。
下面的類,子類可以使用abstractconnectionpool。
class psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwargs)
不能在不同線程中共享的連接池。
請注意,這個池類僅用於單線程應用程序。
class psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwargs)
一個連接池與線程模塊一起工作。
注意這個池類可以安全地應用在多線程應用程序中。
Psycopg2中可用的異常錯誤類
異常錯誤類的繼承關系如下:
StandardError |__Warning |__Error |__InterfaceError |__DatabaseError |__DataError |__OperationalError |__ psycopg2.extensions.QueryCanceledError |__ psycopg2.extensions.TransactionRollbackError |__IntegrityError |__InternalError |__ProgrammingError |__NotSupportedError
異常 | 描述 |
---|---|
psycopg2.Warning | 當有嚴重警告時觸發,例如插入數據是被截斷等等。 |
psycopg2.Error | 警告以外所有其他錯誤類。 |
psycopg2.InterfaceError | 當有數據庫接口模塊本身的錯誤(而不是數據庫的錯誤)發生時觸發。 |
psycopg2.DatabaseError | 和數據庫有關的錯誤發生時觸發。 |
psycopg2.DataError | 當有數據處理時的錯誤發生時觸發,例如:除零錯誤,數據超范圍等等。 |
psycopg2.OperationalError | 指非用戶控制的,而是操作數據庫時發生的錯誤。例如:連接意外斷開、 數據庫名未找到、事務處理失敗、內存分配錯誤等等操作數據庫是發生的錯誤。 |
psycopg2.IntegrityError | 完整性相關的錯誤,例如外鍵檢查失敗等。 |
psycopg2.InternalError | 數據庫的內部錯誤,例如游標(cursor)失效了、事務同步失敗等等。 |
psycopg2.ProgrammingError | 程序錯誤,例如數據表(table)沒找到或已存在、SQL語句語法錯誤、 參數數量錯誤等等。 |
psycopg2.NotSupportedError | 不支持錯誤,指使用了數據庫不支持的函數或API等。例如在連接對象上 使用.rollback()函數,然而數據庫並不支持事務或者事務已關閉。 |
Psycopg2使用舉例
簡單的增加,查詢記錄
import psycopg2 import psycopg2.extras import time ''' 連接數據庫 returns:db ''' def gp_connect(): try: db = psycopg2.connect(dbname="testdb", user="gpadmin", password="gpadmin", host="10.1.208.42", port="5432") # connect()也可以使用一個大的字符串參數, # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test” return db except psycopg2.DatabaseError as e: print("could not connect to Greenplum server",e) if __name__ == '__main__': conn = gp_connect() print(conn) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # 這里創建的是一個字典Cursor, 這樣返回的數據, 都是字典的形式, 方便使用 ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);") conn.commit() # 提交到數據庫中 print(ret) ret = cur.execute("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);",(300, "abc'def")) conn.commit() # 提交到數據庫中 print(cur.rowcount) # 1 # 返回數據庫中的行的總數已修改,插入或刪除最后 execute*(). ret_sql = cur.mogrify("select * from pg_tables where tablename = %s;", ('gp_test',)) # 返回生成的sql腳本, 用以查看生成的sql是否正確. # sql腳本必須以;結尾, 不可以省略.其次, 不管sql中有幾個參數, 都需要用 % s代替, 只有 % s, 不管值是字符還是數字, 一律 % s. # 最后, 第二個參數中, 一定要傳入元組, 哪怕只有一個元素, 像我剛才的例子一樣, ('gp_test')這樣是不行的. print(ret_sql.decode('utf-8')) # select * from pg_tables where tablename = E'gp_test'; cur.execute("select * from gp_test where num = %s;", (300,)) pg_obj = cur.fetchone() print(pg_obj) # {'id': 1, 'num': 300, 'data': "abc'def"} conn.close() # 關閉連接
批量插入,查詢
conn = gp_connect() print(conn) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # # 這里創建的是一個字典Cursor, 這樣返回的數據, 都是字典的形式, 方便使用 # ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);") # conn.commit() # # 提交到數據庫中 # print(ret) gp_list = [] for i in range(200): gp_list.append((i,'abc%s'%i)) # print(gp_list) # 批量提交數據 ret = cur.executemany("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);", gp_list) conn.commit() # 提交到數據庫中 print(cur.query) # 查看上一條執行的腳本 print(cur.rowcount) # 200 # 返回數據庫中的行的總數已修改,插入或刪除最后 execute*(). cur.execute("select count(*) num from gp_test") pg_obj = cur.fetchone() print(pg_obj) # {'num': 200} conn.close() # 關閉連接
使用連接池,執行高性能的批量插入與查詢
import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime
'''
連接數據庫
使用數據庫連接池
returns:db
'''
def gp_connect():
try:
simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5,dbname="testdb",
user="gpadmin",
password="gpadmin",
host="10.1.208.42",
port="5432")
# connect()也可以使用一個大的字符串參數,
# 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
# 從數據庫連接池獲取連接
conn = simple_conn_pool.getconn()
return conn
except psycopg2.DatabaseError as e:
print("could not connect to Greenplum server",e)
if __name__ == '__main__':
conn = gp_connect()
print(conn)
cur = conn.cursor()
# 批量查詢大小
batch_size = 1000
gp_list = []
for i in range(2000, 100000):
gp_list.append((i,'abc%s'%i))
# print(gp_list)
# 開始時間
start_time = datetime.now()
# 批量提交數據execute_values性能大於executemany
psycopg2.extras.execute_values(cur, "INSERT INTO public.gp_test (num, data) VALUES %s", gp_list)
conn.commit()
# 提交到數據庫中
cur.execute("select * from gp_test order by id")
count = 0
while True:
count = count + 1
# 每次獲取時會從上次游標的位置開始移動size個位置,返回size條數據
data = cur.fetchmany(batch_size)
# 數據為空的時候中斷循環
if not data:
break
else:
print(data[-1]) # 得到最后一條(通過元祖方式返回)
print('獲取%s到%s數據成功' % ((count - 1) * batch_size, count * batch_size))
print('insert到fetchmany獲取全量數據所用時間:', (datetime.now() - start_time).seconds) # 16s
conn.close() # 關閉連接
執行高性能的批量更新與查詢
import psycopg2 import psycopg2.extras import psycopg2.pool from datetime import datetime ''' 連接數據庫 使用數據庫連接池 returns:db ''' def gp_connect(): ……略 if __name__ == '__main__': conn = gp_connect() print(conn) cur = conn.cursor() # 批量查詢大小 batch_size = 1000 gp_uplist = [] # 更新列表 for i in range(2000, 10000): gp_uplist.append((i,'def%s'%i)) print(gp_uplist) # 開始時間 start_time = datetime.now() # 批量提交數據execute_values性能大於executemany sql = "UPDATE public.gp_test SET data = TEST.data " \ "FROM (VALUES %s) AS TEST(num, data) " \ "WHERE public.gp_test.num = TEST.num" # 批量更新語句模版 UPDATE TABLE SET TABLE.COL = XX.col # FROM (VALUES %s) AS XX(id_col,col) # WHERE TABLE.id_col = XX.id_col # XX為別名 psycopg2.extras.execute_values(cur, sql, gp_uplist, page_size=100) print(cur.query) conn.commit() # 提交到數據庫中 cur.execute("select * from gp_test order by id") count = 0 while True: count = count + 1 # 每次獲取時會從上次游標的位置開始移動size個位置,返回size條數據 data = cur.fetchmany(batch_size) # 數據為空的時候中斷循環 if not data: break else: print(data[-1]) # 得到最后一條(通過元祖方式返回) print('獲取%s到%s數據成功' % ((count - 1) * batch_size, count * batch_size)) print('update到fetchmany獲取全量數據所用時間:', (datetime.now() - start_time).seconds) # 16s conn.close() # 關閉連接
使用服務端游標
當執行一個數據庫查詢時,Pscopg cursor通常將查詢到的所有數據返回給客戶端,如果返回的數據過大,則將占用客戶端大量的內存。因此,psycopg提供了一種成為server side curosr機制,每次返回可控制數量的數據。
Server side cursor是使用PostgreSQL的DECLARE命令創建,並經過MOVE、FETCH和CLOSE命令處理的。
Psycopg通過命名的cursors裝飾server side cursor的,而命名cursor是通過對cursor()方法指定name參數而創建的。
server side cursor允許用戶在數據集中使用scroll()移動游標,並通過fetchone()和fetchmany()方法獲取數據。
- scrollable:控制游標是否可以向后移動
- itersize:控制每次可以獲取多少條數據,默認是2000
#逐條處理 with psycopg2.connect(database_connection_string) as conn: with conn.cursor(name='name_of_cursor') as cursor: cursor.itersize = 20000 query = "SELECT * FROM ..." cursor.execute(query) for row in cursor: # process row #2 一次處理多條 while True: rows = cursor.fetchmany(100) if len(rows) > 0: for row in rows: # process row else: break