數據庫名:數據庫名是存儲在控制文件中的名稱,它代表的是數據庫,也就是數據庫包含的所有的物理文件的總稱。
實例名:用於響應某個數據庫操作的數據庫管理系統的名稱。實例名是由初始化參數文件的參數instance_name決定的。如果這個參數不被指定(即instance_name沒有被指定為任何值),那么實例的名字由該用戶的環境變量ORACLE_SID(注意這里是大寫)決定。一個數據庫可以有多個實例,例如RAC,實例名(SID)用於標識數據庫內的每一個實例的名稱
服務名:SERVICE_NAME指的是listener中的全局數據庫名:這個名字是由listener.ora中GLOBAL_DBNAME參數決定的。SERVICE_NAME是Oracle8i新引進的,8i之前一個數據庫只能有一個實例。8i之后一個數據庫可以對應多個實例,例如RAC。為了充分利用所有實例,並且令客戶端連接配置簡單,ORACLE提出了SERVICE_NAME的概念。該參數直接對應數據庫,而不是某個實例。
SELECT NAME FROM V$DATABASE; --數據庫名
SELECT instance_name FROM V$INSTANCE; --實例名
select global_name from global_name; --服務名
java連接oracle的三種方式:
格式一:jdbc:oracle:thin:@//<host>:<port>/<service_name>
格式二:jdbc:oracle:thin:@<host>:<port>:<SID>
格式三:jdbc:oracle:thin:@<TNSName>
python連接oracle方法:
url = 'oracle://{username}:{passwd}@{host}:{port}/{sid}'.format(**config) #dbname
url = 'oracle://{username}:{passwd}@{host}:{port}/?service_name={service_name}'.format(**config)
create_engine(url, encoding='utf-8')
create_engine("oracle+cx_oracle://scott:tiger@oracle1120/?encoding=UTF-8&nencoding=UTF-8")
create_engine("oracle+cx_oracle://scott:tiger@hostname:1521/dbname?encoding=UTF-8&nencoding=UTF-8")
create_engine("oracle+cx_oracle://user:pass@dsn?encoding=UTF-8&nencoding=UTF-8&mode=SYSDBA&events=true")
create_engine(
"oracle+cx_oracle://user:pass@dsn",
connect_args={
"encoding": "UTF-8",
"nencoding": "UTF-8",
"mode": cx_Oracle.SYSDBA,
"events": True
}
)
cx_Oracle.makedsn('host',port,service_name='service_name')
cx_Oracle.makedsn('host',port,sid='sid')
cx_Oracle.connect("user", "pass", "ip/orcl")
db_config.py
user = "pythonhol" pw = "welcome" dsn = "localhost/orclpdb1"
import cx_Oracle as orcl import db_config con = cx_Oracle.connect(db_config.user, db_config.pw, db_config.dsn) cur = con.cursor() cur.execute("select * from dept order by deptno") res = cur.fetchall() for row in res: print(row) cur.close() con.close() print(cx_Oracle.version) print("Database version:", con.version) print("Client version:", cx_Oracle.clientversion())
oracle連接池 connect_pool.py
import cx_Oracle import threading import db_config #創建Oracle連接池,最小2,最大5個連接 pool = cx_Oracle.SessionPool(db_config.user, db_config.pw, db_config.dsn, min = 2, max = 5, increment = 1, threaded = True) def Query(): con = pool.acquire() cur = con.cursor() for i in range(4): cur.execute("select myseq.nextval from dual") seqval, = cur.fetchone() #沒有,則是類似(1,)的元組 print("Thread", threading.current_thread().name, "fetched sequence =", seqval) thread1 = threading.Thread(name='#1', target=Query) thread1.start() thread2 = threading.Thread(name='#2', target=Query) thread2.start() thread1.join() thread2.join() ''' numberOfThreads = 2 threadArray = [] for i in range(numberOfThreads): thread = threading.Thread(name = '#' + str(i), target = Query) threadArray.append(thread) thread.start() for t in threadArray: t.join() ''' print("All done!")
含DRCP的oracle連接
當數據庫主機沒有足夠的內存來處理所需數量的數據庫服務器進程時,DRCP很有用。但是,如果數據庫主機內存足夠大,則通常建議使用默認的“專用”服務器進程模型。如果啟用了DRCP,則最好將其與cx_Oracle的中間層連接池結合使用。
:pooled 告訴數據庫使用池化服務器,ATTR_PURITY_SELF:可以在連接調用之間保留會話狀態,被不同連接類名稱的應用程序重用時,連接信息將丟棄,如不共享會話信息則使用ATTR_PURITY_NEW
con = cx_Oracle.connect(db_config.user, db_config.pw, db_config.dsn + ":pooled", cclass="PYTHONHOL", purity=cx_Oracle.ATTR_PURITY_SELF) print("Database version:", con.version)
含DRCP的oracle連接池
如果收到錯誤消息“ ORA-24459:OCISessionGet()等待池創建新連接超時”或“ ORA-24418:無法打開其他會話”,那是因為在池啟動或增長時正在發出連接請求。將參數添加 getmode = cx_Oracle.SPOOL_ATTRVAL_WAIT
到 cx_Oracle.SessionPool()
調用中,以便連接請求等待池中的連接可用。
import cx_Oracle import threading pool = cx_Oracle.SessionPool(db_config.user, db_config.pw, db_config.dsn + ":pooled", min = 2, max = 5, increment = 1, threaded = True) def Query(): con = pool.acquire(cclass = "PYTHONHOL", purity = cx_Oracle.ATTR_PURITY_SELF) cur = conn.cursor() for i in range(4): cur.execute("select myseq.nextval from dual") seqval, = cur.fetchone() print("Thread", threading.current_thread().name, "fetched sequence =", seqval) numberOfThreads = 2 threadArray = [] for i in range(numberOfThreads): thread = threading.Thread(name = '#' + str(i), target = Query) threadArray.append(thread) thread.start() for t in threadArray: t.join() print("All done!")
增刪改查操作
cur = con.cursor() cur.execute("select * from dept order by deptno") for deptno, dname, loc in cur: print("Department number: ", deptno) print("Department name: ", dname) print("Department location:", loc) for row in cur.execute("select * from dept"): print(row) row = cur.fetchone() print(row) #cur.arraysize = 10 #默認100 row = cur.fetchall() print(row) res = cur.fetchmany(numRows = 3) print(res) cur.scroll(2, mode = "absolute") # 滾動光標到第二行 print(cur.fetchone()) cur.scroll(-1) # 滾動光標到最后一行 print(cur.fetchone()) sql = "select * from dept where deptno = :id order by deptno" cur.execute(sql, id = 20) res = cur.fetchall() print(res) print(con.stmtcachesize) #create table mytab (id number, data varchar2(20), constraint my_pk primary key (id)); rows = [ (1, "First" ), (2, "Second" ), (3, "Third" ), (4, "Fourth" ), (5, "Fifth" ), (6, "Sixth" ), (7, "Seventh" ) ] cur.executemany("insert into mytab(id, data) values (:1, :2)", rows) #違反主鍵唯一性約束,並打印錯誤 rows = [ (1, "First" ), (2, "Second" ), (3, "Third" ), (4, "Fourth" ), (5, "Fifth" ), (6, "Sixth" ),(6, "Duplicate" ),(7, "Seventh" ) ] cur.executemany("insert into mytab(id, data) values (:1, :2)", rows, batcherrors = True) for error in cur.getbatcherrors(): print("Error", error.message.rstrip(), "at row offset", error.offset) #在腳本末尾,cx_Oracle將回滾未提交的事務。如果要提交結果/強制回滾,可以使用: con.commit() con.rollback()
更改輸出類型/格式
1. 更改輸出類型:如果直接執行for循環輸出10,則下述代碼輸出‘10’ def ReturnNumbersAsStrings(cursor, name, defaultType, size, precision, scale): if defaultType == cx_Oracle.NUMBER: return cursor.var(str, 9, cursor.arraysize) cur.outputtypehandler = ReturnNumbersAsStrings for row in cur.execute("select * from dept"): print(row) 2. 更改輸出格式:如果直接執行for循環輸出:Value: 0.1 * 3 = 0.30000000000000004, 則下述代碼輸出:Value: 0.1 * 3 = 0.3 import decimal def ReturnNumbersAsDecimal(cursor, name, defaultType, size, precision, scale): if defaultType == cx_Oracle.NUMBER: return cursor.var(str, 9, cursor.arraysize, outconverter = decimal.Decimal) #該情況下可簡寫為 #return cursor.var(decimal.Decimal, arraysize = cursor.arraysize) cur.outputtypehandler = ReturnNumbersAsDecimal for value, in cur.execute("select 0.1 from dual"): print("Value:", value, "* 3 =", value * 3) 3. 輸出列名 rowfactory cur.execute("select deptno, dname from dept") cur.rowfactory = collections.namedtuple("MyClass", ["DeptNumber", "DeptName"]) rows = cur.fetchall() for row in rows: print(row.DeptNumber, "->", row.DeptName)
命名對象
#命名對象類型,如:Spatial Data Objects (SDO) #sqlplus>desc MDSYS.SDO_GEOMETRY # Create table cur.execute("""begin execute immediate 'drop table testgeometry'; exception when others then if sqlcode <> -942 then raise; end if; end;""") cur.execute("""create table testgeometry ( id number(9) not null, geometry MDSYS.SDO_GEOMETRY not null)""") # Create and populate Oracle objects typeObj = con.gettype("MDSYS.SDO_GEOMETRY") elementInfoTypeObj = con.gettype("MDSYS.SDO_ELEM_INFO_ARRAY") ordinateTypeObj = con.gettype("MDSYS.SDO_ORDINATE_ARRAY") obj = typeObj.newobject() obj.SDO_GTYPE = 2003 obj.SDO_ELEM_INFO = elementInfoTypeObj.newobject() obj.SDO_ELEM_INFO.extend([1, 1003, 3]) obj.SDO_ORDINATES = ordinateTypeObj.newobject() obj.SDO_ORDINATES.extend([1, 1, 5, 7]) print("Created object", obj) # Add a new row print("Adding row to table...") cur.execute("insert into testgeometry values (1, :objbv)", objbv = obj) print("Row added!") # Query the row print("Querying row just inserted...") cur.execute("select id, geometry from testgeometry"); for row in cur: print(row) # ------------------------------ # Define a function to dump the contents of an Oracle object def dumpobject(obj, prefix = " "): if obj.type.iscollection: print(prefix, "[") for value in obj.aslist(): if isinstance(value, cx_Oracle.Object): dumpobject(value, prefix + " ") else: print(prefix + " ", repr(value)) print(prefix, "]") else: print(prefix, "{") for attr in obj.type.attributes: value = getattr(obj, attr.name) if isinstance(value, cx_Oracle.Object): print(prefix + " " + attr.name + " :") dumpobject(value, prefix + " ") else: print(prefix + " " + attr.name + " :", repr(value)) print(prefix, "}") # Query the row print("Querying row just inserted...") cur.execute("select id, geometry from testgeometry") for id, obj in cur: print("Id: ", id) dumpobject(obj)
接上
# Get Oracle type information objType = con.gettype("MDSYS.SDO_GEOMETRY") elementInfoTypeObj = con.gettype("MDSYS.SDO_ELEM_INFO_ARRAY") ordinateTypeObj = con.gettype("MDSYS.SDO_ORDINATE_ARRAY") # Convert a Python object to MDSYS.SDO_GEOMETRY def SDOInConverter(value): obj = objType.newobject() obj.SDO_GTYPE = value.gtype obj.SDO_ELEM_INFO = elementInfoTypeObj.newobject() obj.SDO_ELEM_INFO.extend(value.elemInfo) obj.SDO_ORDINATES = ordinateTypeObj.newobject() obj.SDO_ORDINATES.extend(value.ordinates) return obj def SDOInputTypeHandler(cursor, value, numElements): if isinstance(value, mySDO): return cursor.var(cx_Oracle.OBJECT, arraysize = numElements, inconverter = SDOInConverter, typename = objType.name) sdo = mySDO(2003, [1, 1003, 3], [1, 1, 5, 7]) # Python object cur.inputtypehandler = SDOInputTypeHandler cur.execute("insert into testgeometry values (:1, :2)", (1, sdo))
plsql函數/存儲過程
''' create table ptab (mydata varchar(20), myid number); create or replace function myfunc(d_p in varchar2, i_p in number) return number as begin insert into ptab (mydata, myid) values (d_p, i_p); return (i_p * 2); end; / ''' res = cur.callfunc('myfunc', int, ('abc', 2)) print(res) ''' create or replace procedure myproc(v1_p in number, v2_p out number) as begin v2_p := v1_p * 2; end; / ''' myvar = cur.var(int) cur.callproc('myproc', (123, myvar)) print(myvar.getvalue())
CLOB類型操作
print("Inserting data...") cur.execute("truncate table testclobs") longString = "" for i in range(5): char = chr(ord('A') + i) longString += char * 250 cur.execute("insert into testclobs values (:1, :2)", (i + 1, "String data " + longString + ' End of string')) con.commit() print("Querying data...") cur.execute("select * from testclobs where id = :id", {'id': 1}) (id, clob) = cur.fetchone() print("CLOB length:", clob.size()) clobdata = clob.read() #clob.read(1,10) #1開始長度為10 print("CLOB data:", clobdata) #直接轉換clob對象為字符串格式 def OutputTypeHandler(cursor, name, defaultType, size, precision, scale): if defaultType == cx_Oracle.CLOB: return cursor.var(cx_Oracle.LONG_STRING, arraysize = cursor.arraysize) print("Querying data...") con.outputtypehandler = OutputTypeHandler #cur.outputtypehandler? cur.execute("select * from testclobs where id = :id", {'id': 1}) (id, clobdata) = cur.fetchone() print("CLOB length:", len(clobdata)) print("CLOB data:", clobdata)
自定義connection子類
class MyConnection(cx_Oracle.Connection): def __init__(self): print("Connecting to database") return super(MyConnection, self).__init__(db_config.user, db_config.pw, db_config.dsn) def cursor(self): return MyCursor(self) class MyCursor(cx_Oracle.Cursor): def execute(self, statement, args): print("Executing:", statement) print("Arguments:") for argIndex, arg in enumerate(args): print(" Bind", argIndex + 1, "has value", repr(arg)) return super(MyCursor, self).execute(statement, args) def fetchone(self): print("Fetchone()") return super(MyCursor, self).fetchone() con = MyConnection() cur = con.cursor() cur.execute("select count(*) from emp where deptno = :bv", (10,)) count, = cur.fetchone() print("Number of rows:", count)
Oracle Advanced Queuing (AQ)
BOOK_TYPE_NAME = "UDT_BOOK" QUEUE_NAME = "BOOKS" QUEUE_TABLE_NAME = "BOOK_QUEUE_TABLE" # Cleanup cur.execute( """begin dbms_aqadm.stop_queue('""" + QUEUE_NAME + """'); dbms_aqadm.drop_queue('""" + QUEUE_NAME + """'); dbms_aqadm.drop_queue_table('""" + QUEUE_TABLE_NAME + """'); execute immediate 'drop type """ + BOOK_TYPE_NAME + """'; exception when others then if sqlcode <> -24010 then raise; end if; end;""") # Create a type print("Creating books type UDT_BOOK...") cur.execute(""" create type %s as object ( title varchar2(100), authors varchar2(100), price number(5,2) );""" % BOOK_TYPE_NAME) # Create queue table and queue and start the queue print("Creating queue table...") cur.callproc("dbms_aqadm.create_queue_table", (QUEUE_TABLE_NAME, BOOK_TYPE_NAME)) cur.callproc("dbms_aqadm.create_queue", (QUEUE_NAME, QUEUE_TABLE_NAME)) cur.callproc("dbms_aqadm.start_queue", (QUEUE_NAME,)) booksType = con.gettype(BOOK_TYPE_NAME) queue = con.queue(QUEUE_NAME, booksType) # Enqueue a few messages print("Enqueuing messages...") BOOK_DATA = [ ("The Fellowship of the Ring", "Tolkien, J.R.R.", decimal.Decimal("10.99")), ("Harry Potter and the Philosopher's Stone", "Rowling, J.K.", decimal.Decimal("7.99")) ] for title, authors, price in BOOK_DATA: book = booksType.newobject() book.TITLE = title book.AUTHORS = authors book.PRICE = price print(title) queue.enqOne(con.msgproperties(payload=book)) con.commit() # Dequeue the messages print("\nDequeuing messages...") queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT while True: props = queue.deqOne() if not props: break print(props.payload.TITLE) con.commit() print("\nDone.")
json操作
soda = con.getSodaDatabase() collection = soda.createCollection("friends") content = {'name': 'Jared', 'age': 35, 'address': {'city': 'Melbourne'}} doc = collection.insertOneAndGet(content) key = doc.key doc = collection.find().key(key).getOne() content = doc.getContent() print('Retrieved SODA document dictionary is:') print(content) myDocs = [ {'name': 'Gerald', 'age': 21, 'address': {'city': 'London'}}, {'name': 'David', 'age': 28, 'address': {'city': 'Melbourne'}}, {'name': 'Shawn', 'age': 20, 'address': {'city': 'San Francisco'}} ] collection.insertMany(myDocs) #---------- filterSpec = { "address.city": "Melbourne" } myDocuments = collection.find().filter(filterSpec).getDocuments() print('Melbourne people:') for doc in myDocuments: print(doc.getContent()["name"]) #---------- filterSpec = {'age': {'$lt': 25}} myDocuments = collection.find().filter(filterSpec).getDocuments() print('Young people:') for doc in myDocuments: print(doc.getContent()["name"])
參考鏈接:
https://oracle.github.io/python-cx_Oracle/samples/tutorial/Python-and-Oracle-Database-Scripting-for-the-Future.html
cx_Oracle官方文檔:https://cx-oracle.readthedocs.io/en/latest/index.html