調用數據庫存儲過程及其他感興趣的高級 Python 編程功能。
對於涉及數據庫的軟件開發來說,有兩種主流開發方法:一種是在應用程序中(對於三層體系結構,也可以是在中間件中)實現所有業務邏輯,另一種是在數據庫內部實現所有業務邏輯。本教程不討論這兩種解決方案的優缺點;不過,使用 Oracle 數據庫方法還是會為面向數據庫的應用程序帶來某些好處。
用 PL/SQL 嵌入所有業務邏輯可大大減少應用程序與數據庫之間的往返次數,從而此使處理都在服務器端進行。PL/SQL 與 SQL 緊密集成,並與 Python 類似,提供了大量的標准程序包庫:從安排數據庫作業時間 (DBMS_SCHEDULER),到自動查詢調優 (DBMS_SQLTUNE) 和閃回 (DBMS_FLASHBACK),再到線性代數 (UTL_NLA) 和 LDAP 訪問 (DBMS_LDAP)。
本教程介紹使用 cx_Oracle 模塊在 Python 中調用 Oracle 數據庫內部的 PL/SQL 存儲過程和函數的各種方法,同時還介紹一些使用 PL/SQL 無法實現或者實現起來非常復雜的編程功能。在本文的最后,我們將簡單介紹 Oracle Berkeley DB,它作為即取即用插件內置在 Python 中。
IN-OUT 方式
Oracle 過程和函數是將 SQL 功能與編程語言功能相結合一些數據庫對象。過程(從現在開始也稱其為函數)的參數可以是以下三種類型之一:
- IN:傳遞給過程,但不能寫入到過程內部
- OUT:從過程返回,在過程主體內部可寫
- IN OUT:傳遞給過程,在過程內部完全可寫
默認情況下,參數都是 IN 類型。
為了說明 Python 和 Oracle 過程之間的交互,我們考慮將以下程序包安裝在 Oracle Database XE 實例的 HR 模式中。
CREATE OR REPLACE PACKAGE pkg_hr AS PROCEDURE add_department( p_department_id OUT NUMBER, p_department_name IN VARCHAR2, p_manager_id IN NUMBER, p_location_id IN NUMBER ); FUNCTION get_employee_count( p_department_id IN NUMBER ) RETURN NUMBER; PROCEDURE find_employees( p_query IN VARCHAR2, p_results OUT SYS_REFCURSOR ); END pkg_hr; / CREATE OR REPLACE PACKAGE BODY pkg_hr AS PROCEDURE add_department( p_department_id OUT NUMBER, p_department_name IN VARCHAR2, p_manager_id IN NUMBER, p_location_id IN NUMBER ) AS BEGIN INSERT INTO departments(department_id, department_name, manager_id, location_id) VALUES (departments_seq.nextval, p_department_name, p_manager_id, p_location_id) RETURNING department_id INTO p_department_id; COMMIT; END add_department; FUNCTION get_employee_count( p_department_id IN NUMBER ) RETURN NUMBER AS l_count NUMBER; BEGIN SELECT COUNT(*) INTO l_count FROM employees WHERE department_id= p_department_id; RETURN l_count; END get_employee_count; PROCEDURE find_employees( p_query IN VARCHAR2, p_results OUT SYS_REFCURSOR ) AS BEGIN OPEN p_results FOR SELECT * FROM employees WHERE UPPER(first_name||' '||last_name||' '||email) LIKE '%'||UPPER(p_query)||'%'; END find_employees; END pkg_hr; /
上面的示例中引入了三種不同的訪問方法:一個帶有 IN 參數和 OUT 參數的過程、一個返回數字的函數以及一個帶有 OUT REF CURSOR 參數的過程。每個過程需要不同的調用方法,如下所示。
import cx_Oracle class HR: def __enter__(self): self.__db = cx_Oracle.Connection("hr/hrpwd@//localhost:1521/XE") self.__cursor = self.__db.cursor() return self def __exit__(self, type, value, traceback): self.__cursor.close() self.__db.close() def add_department(self, p_department_name, p_manager_id, p_location_id): l_department_id = self.__cursor.var(cx_Oracle.NUMBER) self.__cursor.callproc("PKG_HR.ADD_DEPARTMENT", [l_department_id, p_department_name, p_manager_id, p_location_id]) # there are no OUT parameters in Python, regular return here return l_department_id def get_employee_count(self, p_department_id): l_count = self.__cursor.callfunc("PKG_HR.GET_EMPLOYEE_COUNT", cx_Oracle.NUMBER, [p_department_id]) return l_count def find_employees(self, p_query): # as it comes to all complex types we need to tell Oracle Client # what type to expect from an OUT parameter l_cur = self.__cursor.var(cx_Oracle.CURSOR) l_query, l_emp = self.__cursor.callproc("PKG_HR.FIND_EMPLOYEES", [p_query, l_cur]) return list(l_emp)
從上面的示例可以看到,通過一些基本規則規定了從 Python 調用存儲過程的方法:
- 使用 cx_Oracle.Cursor.callproc(proc, [params]) 調用過程,使用 cx_Oracle.Cursor.callfunc(proc, returnType, [params]) 調用函數。需要預先定義函數的返回類型 — get_employee_count() 方法聲明了從 PKG_HR.GET_EMPLOYEE_COUNT 返回的類型為 cx_Oracle.NUMBER。
- 使用 cx_Oracle 變量對象作為 callproc/callfunc 調用的參數可以返回類似 REF CURSOR 這樣的復雜類型。
使用 arrayvar 傳遞數組
cx_Oracle 中 DB API 2.0 的另一個擴展允許在存儲過程調用中使用數組作為參數。當前支持使用 INDEX BY 子句的 PL/SQL 數組。作為一個使用 arrayvar 對象的示例,要確保下面的 DDL 找到自己進入數據庫的道路。
CREATE OR REPLACE PACKAGE pkg_arrayvar AS TYPE num_array IS TABLE OF NUMBER INDEX BY PLS_INTEGER; FUNCTION sum(p_list IN NUM_ARRAY) RETURN NUMBER; END pkg_arrayvar; / CREATE OR REPLACE PACKAGE BODY pkg_arrayvar AS FUNCTION sum(p_list IN NUM_ARRAY) RETURN NUMBER AS l_sum NUMBER := 0; BEGIN FOR i IN 1..p_list.COUNT LOOP l_sum := l_sum+p_list(i); END LOOP i; RETURN l_sum; END sum; END pkg_arrayvar; /
現在,對 Python 對象的聲明和對函數的實際調用如下(后跟一個斷言來驗證結果):
>>> db = cx_Oracle.connect("hr/hrpwd@//localhost:1521/XE") >>> cursor = db.cursor() >>> L = cursor.arrayvar(cx_Oracle.NUMBER, [1, 2, 3]) >>> sum_result = cursor.callfunc("pkg_arrayvar.sum", cx_Oracle.NUMBER, [L]) >>> assert sum_result==6
上面提到過,在 Python 中調用函數需要顯式聲明返回類型,這可能會造成混淆,因為 callproc() 只需要兩個參數,但是這是必經之路。
轉到 Python
PL/SQL 是一種功能強大的語言,它盡可能地結合了 Oracle 數據庫的功能,可顯著減少開發工作量,並使您可以利用數據庫的大多數特性。但某些編程功能無法通過 PL/SQL 的數據庫中固有特性表達,或者無法通過 PL/SQL 的數據庫中固有特性使用。因此,需要用其他編程語言對其進行補充時,Python 是一個不錯的選擇,它可以縮短開發時間,加快開發完成。
Multiprocessing
從 2.6 版開始,Python 中的並行處理不再受 GIL(全局解釋器鎖)的限制。隨標准庫一起提供的 threading 模塊被限定為一次只運行一個操作。通過用操作系統進程替換線程,現在可將所有 CPU 提供給應用程序,因此能夠真正執行並行計算。Multiprocessing 模塊讓應用程序生成新的進程、鎖定對象、在內存中共享對象,而且,所有這些既可本地進行,也可遠程(不同的計算機上)進行。
下面是一個簡單數據庫基准測試實用程序的示例。
import cx_Oracle import os import time from multiprocessing import Pool from optparse import OptionParser def benchmark(options): params = eval(options.bind) if options.bind else {} with cx_Oracle.connect(options.db) as db: try: cursor = db.cursor() before = time.clock() for i in xrange(options.requests): cursor.execute(options.sql, params) return (time.clock()-before)/options.requests except KeyboardInterrupt: pass finally: cursor.close() class Orabench: def __init__(self, options): self.options = options print "Requests=%d, Concurrency=%d" % (self.options.requests, self.options.concurrency) def run(self): pool = Pool(processes=self.options.concurrency) result = pool.map_async(benchmark, [self.options]*self.options.concurrency) L = result.get() avg = sum(L)/len(L) print "Average=%.4f (%.4f requests per second)" % (avg, 1/avg) if __name__ == "__main__": opt = OptionParser() opt.add_option("-d", "--database", help="EZCONNECT string", action="store", type="string", dest="db") opt.add_option("-n", "--requests", help="number of requests", action="store", type="int", dest="requests", default=10) opt.add_option("-c", "--concurrency", help="number of concurrent connections", action="store", type="int", dest="concurrency", default=1) opt.add_option("-s", "--sql", help="SQL query or PL/SQL block", action="store", type="string", dest="sql") opt.add_option("-b", "--bind", help="dictionary of bind parameters", action="store", type="string", dest="bind") (options, args) = opt.parse_args() bench = Orabench(options) bench.run()
利用 optparse 模塊(該模塊可以很好地解析命令行參數),該工具在使用“--help”開關參數時會自動生成使用說明。
pp@oel:~$ python26 orabench.py --help Usage: orabench.py [options] Options: -h, --help show this help message and exit -d DB, --database=DB EZCONNECT string -n REQUESTS, --requests=REQUESTS number of requests -c CONCURRENCY, --concurrency=CONCURRENCY number of concurrent connections -s SQL, --sql=SQL SQL query or PL/SQL block -b BIND, --bind=BIND dictionary of bind parameters
然后,使用 HR 模式在 10 個進程中執行 1000 次查詢的基准測試:
pp@oel:~$ python26 orabench.py -d hr/hrpwd@//localhost:1521/XE -n 1000 -c 10 -s "select count(*) from employees" Requests=1000, Concurrency=10 Average=0.0006 (1667.7460 requests per second)
數據庫外部的 GROUP BY(函數式編程)
對各種函數式編程功能,很少有模塊能優於 itertools。它包含許多可以生成自定義、優化迭代器的遍歷函數。簡單提示一下,迭代器是一些對象,其中 __iter__() 方法返回迭代器本身,next() 方法步進到后續元素,或者引發結束迭代的 StopIteration 異常。通過遍歷大型數據集,您可以發現使用迭代器與使用列表或字節組的差別,因為迭代器避免了在內存中提交整個集合。
import cx_Oracle import itertools from operator import itemgetter with cx_Oracle.connect("hr/hrpwd@//localhost:1521/XE") as db: cursor = db.cursor() # fetch all employee data into local variable, no aggregation here employees = cursor.execute("select * from employees").fetchall() D = {} for dept, emp in itertools.groupby(employees, itemgetter(10)): D[dept] = len(list(emp))
operator 模塊包括原生對象使用的所有核心運算符,這意味着無論您何時運行 2+2,operator.add() 方法都會處理這一計算。因為 itertools.groupby() 方法接受兩個參數:可迭代的變量和主函數,我們需要使用 itemgetter(10) 從所有行中提取 department_id,itemgetter(10) 僅返回一個集合中的第 10 個元素。對 itertools 的結果進行遍歷與您對列表、字節組和字典的遍歷非常類似。我們為每個部門生成部門 ID 及該部門的所有員工數 (SELECT department_id, COUNT(*) FROM employees GROUP BY department_id)。
序列化數據
在 Python 中,用 pickle 模塊及其 C 中對應的 cPickle(比原生 Python 的 pickle 實現最多快 1000 倍)處理數據的序列化和反序列化。“序列化”對象意味着將對象轉換為可逆的字節表示:
>>> import pickle >>> A = {'a':1, 'b':2, 'c':3} >>> B = pickle.dumps(A) >>> print B "(dp0\nS'a'\np1\nI1\nsS'c'\np2\nI3\nsS'b'\np3\nI2\ns." >>> C = pickle.loads(B) >>> assert A==C
緊鄰關系數據存儲復雜結構時,序列化顯得尤為有用,這樣,我們就可以將常規 Python 對象當作數據庫本來就支持的對象進行讀寫操作。
至於 pickle 支持的類型,只有很少的限制,因為它能處理的內容非常廣泛,從字典和字節組,到數據集和函數,再到類和實例。其中一個不能序列化的對象是 cx_Oracle.Connection 對象,原因顯而易見。
惰性化緩存 (Sleepy Cache)
Oracle Berkeley DB 是一個事務性鍵值存儲解決方案,具有細粒度鎖定、高可用性和復制功能。它可以應對需要極端效率及完整關系數據庫開銷過高情況下的所有問題。(直到 2.6 版,Python 才以 bsddb 模塊的形式包括了針對 Oracle Berkeley DB 的內置接口。Python 的新版本(從 3.0 開始)使用一個需要單獨安裝的外部模塊 PyBSDDB。 )
下面,我們將利用 Python 2.6 附帶的內置驅動程序將值從 Oracle 數據庫緩存到 Oracle Berkeley DB 中:
import bsddb import cx_Oracle import pickle class Cache: def __init__(self, tab): self.__db = cx_Oracle.connect("hr/hrpwd@//localhost:1521/XE") self.__cursor = self.__db.cursor() self.__bdb = bsddb.hashopen(None) self.__cursor.execute("select * from employees") d = self.__cursor.description for row in self.__cursor: rowdict = dict((d[i][0].lower(), row[i]) for i in xrange(len(d))) self.__bdb[str(row[0])] = pickle.dumps(rowdict) def __del__(self): self.__cursor.close() self.__db.close() def __getitem__(self, name): try: return pickle.loads(self.__bdb[str(name)]) except KeyError: raise Warning, "No such employee with ID %s" % name if __name__ == "__main__": emp = Cache("EMPLOYEES")
現在可以像使用“emp[100]”一樣方便地訪問員工了,emp[100] 可以訪問快速的內存中散列表,並對序列化的員工數據進行反序列化。您可以使用任一內置服務器(SimpleHTTPServer、SimpleXMLRPCServer、wsgiref.simple_server)輕松包裝這樣的緩存,或者使用 Twisted 框架使緩存變得更強健。
總結
這次我們介紹了 Oracle 和 Python 結合使用時的幾個核心領域,包括 PL/SQL 存儲過程調用以及如何處理 PL/SQL 函數結果。談到 Python,現在您應對 multiprocessing 模塊要點有所了解,它是該語言新添加的最重要的模塊。最后,介紹了 Oracle Berkeley DB 在概念驗證的內存緩存中的使用情況。