查看oracle的sid和sevice_name


ORACLE中SID和SERVICE_NAME的區別

數據庫名:數據庫名是存儲在控制文件中的名稱,它代表的是數據庫,也就是數據庫包含的所有的物理文件的總稱。

實例名:用於響應某個數據庫操作的數據庫管理系統的名稱。實例名是由初始化參數文件的參數instance_name決定的。如果這個參數不被指定(即instance_name沒有被指定為任何值),那么實例的名字由該用戶的環境變量ORACLE_SID(注意這里是大寫)決定。一個數據庫可以有多個實例,例如RAC,實例名(SID)用於標識數據庫內的每一個實例的名稱

服務名:SERVICE_NAME指的是listener中的全局數據庫名:這個名字是由listener.ora中GLOBAL_DBNAME參數決定的。SERVICE_NAME是Oracle8i新引進的,8i之前一個數據庫只能有一個實例。8i之后一個數據庫可以對應多個實例,例如RAC。為了充分利用所有實例,並且令客戶端連接配置簡單,ORACLE提出了SERVICE_NAME的概念。該參數直接對應數據庫,而不是某個實例。

SELECT NAME FROM V$DATABASE;        --數據庫名
SELECT instance_name FROM V$INSTANCE;     --實例名
select global_name from global_name;        --服務名

java連接oracle的三種方式

格式一:jdbc:oracle:thin:@//<host>:<port>/<service_name>
格式二:jdbc:oracle:thin:@<host>:<port>:<SID> 
格式三:jdbc:oracle:thin:@<TNSName> 

python連接oracle方法

url = 'oracle://{username}:{passwd}@{host}:{port}/{sid}'.format(**config) #dbname
url = 'oracle://{username}:{passwd}@{host}:{port}/?service_name={service_name}'.format(**config)
create_engine(url, encoding='utf-8')
create_engine("oracle+cx_oracle://scott:tiger@oracle1120/?encoding=UTF-8&nencoding=UTF-8")
create_engine("oracle+cx_oracle://scott:tiger@hostname:1521/dbname?encoding=UTF-8&nencoding=UTF-8")
create_engine("oracle+cx_oracle://user:pass@dsn?encoding=UTF-8&nencoding=UTF-8&mode=SYSDBA&events=true")
create_engine(
  "oracle+cx_oracle://user:pass@dsn",
  connect_args={
    "encoding": "UTF-8",
    "nencoding": "UTF-8",
    "mode": cx_Oracle.SYSDBA,
    "events": True
  }
)

cx_Oracle.makedsn('host',port,service_name='service_name')
cx_Oracle.makedsn('host',port,sid='sid')

cx_Oracle.connect("user", "pass", "ip/orcl")

 

  db_config.py

user = "pythonhol"
pw = "welcome"
dsn = "localhost/orclpdb1"

  連接oracle

import cx_Oracle as orcl
import db_config

con = cx_Oracle.connect(db_config.user, db_config.pw, db_config.dsn)
cur = con.cursor()
cur.execute("select * from dept order by deptno")
res = cur.fetchall()
for row in res:
    print(row)
cur.close()
con.close()

print(cx_Oracle.version)
print("Database version:", con.version)
print("Client version:", cx_Oracle.clientversion())

  oracle連接池 connect_pool.py

import cx_Oracle
import threading
import db_config

#創建Oracle連接池,最小2,最大5個連接
pool = cx_Oracle.SessionPool(db_config.user, db_config.pw, db_config.dsn,
                             min = 2, max = 5, increment = 1, threaded = True)

def Query():
    con = pool.acquire()
    cur = con.cursor()
    for i in range(4):
        cur.execute("select myseq.nextval from dual")
        seqval, = cur.fetchone()   #沒有,則是類似(1,)的元組
        print("Thread", threading.current_thread().name, "fetched sequence =", seqval)

thread1 = threading.Thread(name='#1', target=Query)
thread1.start()

thread2 = threading.Thread(name='#2', target=Query)
thread2.start()

thread1.join()
thread2.join()

'''
numberOfThreads = 2
threadArray = []

for i in range(numberOfThreads):
    thread = threading.Thread(name = '#' + str(i), target = Query)
    threadArray.append(thread)
    thread.start()

for t in threadArray:
    t.join()
'''
print("All done!")

  含DRCP的oracle連接

當數據庫主機沒有足夠的內存來處理所需數量的數據庫服務器進程時,DRCP很有用。但是,如果數據庫主機內存足夠大,則通常建議使用默認的“專用”服務器進程模型。如果啟用了DRCP,則最好將其與cx_Oracle的中間層連接池結合使用。

:pooled 告訴數據庫使用池化服務器,ATTR_PURITY_SELF:可以在連接調用之間保留會話狀態,被不同連接類名稱的應用程序重用時,連接信息將丟棄,如不共享會話信息則使用ATTR_PURITY_NEW

con = cx_Oracle.connect(db_config.user, db_config.pw, db_config.dsn + ":pooled",
                        cclass="PYTHONHOL", purity=cx_Oracle.ATTR_PURITY_SELF)
print("Database version:", con.version)

  含DRCP的oracle連接池

如果收到錯誤消息“ ORA-24459:OCISessionGet()等待池創建新連接超時”或“ ORA-24418:無法打開其他會話”,那是因為在池啟動或增長時正在發出連接請求。將參數添加 getmode = cx_Oracle.SPOOL_ATTRVAL_WAIT到 cx_Oracle.SessionPool()調用中,以便連接請求等待池中的連接可用。

import cx_Oracle
import threading

pool = cx_Oracle.SessionPool(db_config.user, db_config.pw, db_config.dsn + ":pooled",
                             min = 2, max = 5, increment = 1, threaded = True)

def Query():
    con = pool.acquire(cclass = "PYTHONHOL", purity = cx_Oracle.ATTR_PURITY_SELF)
    cur = conn.cursor()
    for i in range(4):
        cur.execute("select myseq.nextval from dual")
        seqval, = cur.fetchone()
        print("Thread", threading.current_thread().name, "fetched sequence =", seqval)

numberOfThreads = 2
threadArray = []

for i in range(numberOfThreads):
    thread = threading.Thread(name = '#' + str(i), target = Query)
    threadArray.append(thread)
    thread.start()

for t in threadArray:
    t.join()

print("All done!")

  增刪改查操作

cur = con.cursor()
cur.execute("select * from dept order by deptno")
for deptno, dname, loc in cur:
    print("Department number: ", deptno)
    print("Department name: ", dname)
    print("Department location:", loc)

for row in cur.execute("select * from dept"):
    print(row)

row = cur.fetchone()
print(row)

#cur.arraysize = 10  #默認100
row = cur.fetchall()
print(row)

res = cur.fetchmany(numRows = 3)
print(res)

cur.scroll(2, mode = "absolute")  # 滾動光標到第二行
print(cur.fetchone())

cur.scroll(-1)                    # 滾動光標到最后一行
print(cur.fetchone())

sql = "select * from dept where deptno = :id order by deptno"
cur.execute(sql, id = 20)
res = cur.fetchall()
print(res)
print(con.stmtcachesize)

#create table mytab (id number, data varchar2(20), constraint my_pk primary key (id));
rows = [ (1, "First" ), (2, "Second" ), (3, "Third" ), (4, "Fourth" ),
         (5, "Fifth" ), (6, "Sixth" ), (7, "Seventh" ) ]
cur.executemany("insert into mytab(id, data) values (:1, :2)", rows)

#違反主鍵唯一性約束,並打印錯誤
rows = [ (1, "First" ), (2, "Second" ), (3, "Third" ), (4, "Fourth" ),
         (5, "Fifth" ), (6, "Sixth" ),(6, "Duplicate" ),(7, "Seventh" ) ]
cur.executemany("insert into mytab(id, data) values (:1, :2)", rows, batcherrors = True)
for error in cur.getbatcherrors():
    print("Error", error.message.rstrip(), "at row offset", error.offset)

#在腳本末尾,cx_Oracle將回滾未提交的事務。如果要提交結果/強制回滾,可以使用:
con.commit()
con.rollback()

  更改輸出類型/格式

1. 更改輸出類型:如果直接執行for循環輸出10,則下述代碼輸出‘10’
def ReturnNumbersAsStrings(cursor, name, defaultType, size, precision, scale):
    if defaultType == cx_Oracle.NUMBER:
        return cursor.var(str, 9, cursor.arraysize)

cur.outputtypehandler = ReturnNumbersAsStrings
for row in cur.execute("select * from dept"):
    print(row)

2. 更改輸出格式:如果直接執行for循環輸出:Value: 0.1 * 3 = 0.30000000000000004, 則下述代碼輸出:Value: 0.1 * 3 = 0.3
import decimal

def ReturnNumbersAsDecimal(cursor, name, defaultType, size, precision, scale):
    if defaultType == cx_Oracle.NUMBER:
        return cursor.var(str, 9, cursor.arraysize, outconverter = decimal.Decimal)
        #該情況下可簡寫為
        #return cursor.var(decimal.Decimal, arraysize = cursor.arraysize)

cur.outputtypehandler = ReturnNumbersAsDecimal
for value, in cur.execute("select 0.1 from dual"):
    print("Value:", value, "* 3 =", value * 3)

3. 輸出列名 rowfactory 
cur.execute("select deptno, dname from dept")
cur.rowfactory = collections.namedtuple("MyClass", ["DeptNumber", "DeptName"])

rows = cur.fetchall()
for row in rows:
    print(row.DeptNumber, "->", row.DeptName)

  命名對象

#命名對象類型,如:Spatial Data Objects (SDO)
#sqlplus>desc MDSYS.SDO_GEOMETRY
# Create table
cur.execute("""begin
                 execute immediate 'drop table testgeometry';
                 exception when others then
                   if sqlcode <> -942 then
                     raise;
                   end if;
               end;""")
cur.execute("""create table testgeometry (
               id number(9) not null,
               geometry MDSYS.SDO_GEOMETRY not null)""")

# Create and populate Oracle objects
typeObj = con.gettype("MDSYS.SDO_GEOMETRY")
elementInfoTypeObj = con.gettype("MDSYS.SDO_ELEM_INFO_ARRAY")
ordinateTypeObj = con.gettype("MDSYS.SDO_ORDINATE_ARRAY")
obj = typeObj.newobject()
obj.SDO_GTYPE = 2003
obj.SDO_ELEM_INFO = elementInfoTypeObj.newobject()
obj.SDO_ELEM_INFO.extend([1, 1003, 3])
obj.SDO_ORDINATES = ordinateTypeObj.newobject()
obj.SDO_ORDINATES.extend([1, 1, 5, 7])
print("Created object", obj)

# Add a new row
print("Adding row to table...")
cur.execute("insert into testgeometry values (1, :objbv)", objbv = obj)
print("Row added!")

# Query the row
print("Querying row just inserted...")
cur.execute("select id, geometry from testgeometry");
for row in cur:
    print(row)

# ------------------------------
# Define a function to dump the contents of an Oracle object
def dumpobject(obj, prefix = "  "):
    if obj.type.iscollection:
        print(prefix, "[")
        for value in obj.aslist():
            if isinstance(value, cx_Oracle.Object):
                dumpobject(value, prefix + "  ")
            else:
                print(prefix + "  ", repr(value))
        print(prefix, "]")
    else:
        print(prefix, "{")
        for attr in obj.type.attributes:
            value = getattr(obj, attr.name)
            if isinstance(value, cx_Oracle.Object):
                print(prefix + "  " + attr.name + " :")
                dumpobject(value, prefix + "    ")
            else:
                print(prefix + "  " + attr.name + " :", repr(value))
        print(prefix, "}")

# Query the row
print("Querying row just inserted...")
cur.execute("select id, geometry from testgeometry")
for id, obj in cur:
    print("Id: ", id)
    dumpobject(obj)

  接上

# Get Oracle type information
objType = con.gettype("MDSYS.SDO_GEOMETRY")
elementInfoTypeObj = con.gettype("MDSYS.SDO_ELEM_INFO_ARRAY")
ordinateTypeObj = con.gettype("MDSYS.SDO_ORDINATE_ARRAY")

# Convert a Python object to MDSYS.SDO_GEOMETRY
def SDOInConverter(value):
    obj = objType.newobject()
    obj.SDO_GTYPE = value.gtype
    obj.SDO_ELEM_INFO = elementInfoTypeObj.newobject()
    obj.SDO_ELEM_INFO.extend(value.elemInfo)
    obj.SDO_ORDINATES = ordinateTypeObj.newobject()
    obj.SDO_ORDINATES.extend(value.ordinates)
    return obj

def SDOInputTypeHandler(cursor, value, numElements):
    if isinstance(value, mySDO):
        return cursor.var(cx_Oracle.OBJECT, arraysize = numElements,
                inconverter = SDOInConverter, typename = objType.name)

sdo = mySDO(2003, [1, 1003, 3], [1, 1, 5, 7])  # Python object
cur.inputtypehandler = SDOInputTypeHandler
cur.execute("insert into testgeometry values (:1, :2)", (1, sdo))

  plsql函數/存儲過程

'''
create table ptab (mydata varchar(20), myid number);

create or replace function myfunc(d_p in varchar2, i_p in number) return number as
  begin
    insert into ptab (mydata, myid) values (d_p, i_p);
    return (i_p * 2);
  end;
/
'''

res = cur.callfunc('myfunc', int, ('abc', 2))
print(res)


'''
create or replace procedure myproc(v1_p in number, v2_p out number) as
begin
  v2_p := v1_p * 2;
end;
/
'''

myvar = cur.var(int)
cur.callproc('myproc', (123, myvar))
print(myvar.getvalue())

  CLOB類型操作

print("Inserting data...")
cur.execute("truncate table testclobs")
longString = ""
for i in range(5):
    char = chr(ord('A') + i)
    longString += char * 250
    cur.execute("insert into testclobs values (:1, :2)",
                   (i + 1, "String data " + longString + ' End of string'))
con.commit()

print("Querying data...")
cur.execute("select * from testclobs where id = :id", {'id': 1})
(id, clob) = cur.fetchone()
print("CLOB length:", clob.size())
clobdata = clob.read()   #clob.read(1,10)   #1開始長度為10
print("CLOB data:", clobdata)


#直接轉換clob對象為字符串格式
def OutputTypeHandler(cursor, name, defaultType, size, precision, scale):
    if defaultType == cx_Oracle.CLOB:
        return cursor.var(cx_Oracle.LONG_STRING, arraysize = cursor.arraysize)

print("Querying data...")
con.outputtypehandler = OutputTypeHandler   #cur.outputtypehandler?
cur.execute("select * from testclobs where id = :id", {'id': 1})
(id, clobdata) = cur.fetchone()
print("CLOB length:", len(clobdata))
print("CLOB data:", clobdata)

  自定義connection子類

class MyConnection(cx_Oracle.Connection):

    def __init__(self):
        print("Connecting to database")
        return super(MyConnection, self).__init__(db_config.user, db_config.pw, db_config.dsn)

    def cursor(self):
        return MyCursor(self)

class MyCursor(cx_Oracle.Cursor):

   def execute(self, statement, args):
       print("Executing:", statement)
       print("Arguments:")
       for argIndex, arg in enumerate(args):
           print("  Bind", argIndex + 1, "has value", repr(arg))
           return super(MyCursor, self).execute(statement, args)

   def fetchone(self):
       print("Fetchone()")
       return super(MyCursor, self).fetchone()

con = MyConnection()
cur = con.cursor()

cur.execute("select count(*) from emp where deptno = :bv", (10,))
count, = cur.fetchone()
print("Number of rows:", count)

  Oracle Advanced Queuing (AQ) 

BOOK_TYPE_NAME = "UDT_BOOK"
QUEUE_NAME = "BOOKS"
QUEUE_TABLE_NAME = "BOOK_QUEUE_TABLE"

# Cleanup
cur.execute(
    """begin
         dbms_aqadm.stop_queue('""" + QUEUE_NAME + """');
         dbms_aqadm.drop_queue('""" + QUEUE_NAME + """');
         dbms_aqadm.drop_queue_table('""" + QUEUE_TABLE_NAME + """');
         execute immediate 'drop type """ + BOOK_TYPE_NAME + """';
         exception when others then
           if sqlcode <> -24010 then
             raise;
           end if;
       end;""")

# Create a type
print("Creating books type UDT_BOOK...")
cur.execute("""
        create type %s as object (
            title varchar2(100),
            authors varchar2(100),
            price number(5,2)
        );""" % BOOK_TYPE_NAME)

# Create queue table and queue and start the queue
print("Creating queue table...")
cur.callproc("dbms_aqadm.create_queue_table",
        (QUEUE_TABLE_NAME, BOOK_TYPE_NAME))
cur.callproc("dbms_aqadm.create_queue", (QUEUE_NAME, QUEUE_TABLE_NAME))
cur.callproc("dbms_aqadm.start_queue", (QUEUE_NAME,))

booksType = con.gettype(BOOK_TYPE_NAME)
queue = con.queue(QUEUE_NAME, booksType)

# Enqueue a few messages
print("Enqueuing messages...")

BOOK_DATA = [
    ("The Fellowship of the Ring", "Tolkien, J.R.R.", decimal.Decimal("10.99")),
    ("Harry Potter and the Philosopher's Stone", "Rowling, J.K.",
            decimal.Decimal("7.99"))
]

for title, authors, price in BOOK_DATA:
    book = booksType.newobject()
    book.TITLE = title
    book.AUTHORS = authors
    book.PRICE = price
    print(title)
    queue.enqOne(con.msgproperties(payload=book))
    con.commit()

# Dequeue the messages
print("\nDequeuing messages...")
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
while True:
    props = queue.deqOne()
    if not props:
        break
    print(props.payload.TITLE)
    con.commit()

print("\nDone.")

  json操作

soda = con.getSodaDatabase()
collection = soda.createCollection("friends")

content = {'name': 'Jared', 'age': 35, 'address': {'city': 'Melbourne'}}
doc = collection.insertOneAndGet(content)
key = doc.key

doc = collection.find().key(key).getOne()
content = doc.getContent()
print('Retrieved SODA document dictionary is:')
print(content)

myDocs = [
    {'name': 'Gerald', 'age': 21, 'address': {'city': 'London'}},
    {'name': 'David', 'age': 28, 'address': {'city': 'Melbourne'}},
    {'name': 'Shawn', 'age': 20, 'address': {'city': 'San Francisco'}}
]
collection.insertMany(myDocs)

#----------
filterSpec = { "address.city": "Melbourne" }
myDocuments = collection.find().filter(filterSpec).getDocuments()

print('Melbourne people:')
for doc in myDocuments:
    print(doc.getContent()["name"])

#----------
filterSpec = {'age': {'$lt': 25}}
myDocuments = collection.find().filter(filterSpec).getDocuments()

print('Young people:')
for doc in myDocuments:
    print(doc.getContent()["name"])

  

參考鏈接

https://oracle.github.io/python-cx_Oracle/samples/tutorial/Python-and-Oracle-Database-Scripting-for-the-Future.html

cx_Oracle官方文檔:https://cx-oracle.readthedocs.io/en/latest/index.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM