python連接oracle -- sqlalchemy
import cx_Oracle as orcl import pandas as pd from sqlalchemy import create_engine # 數據庫連接 db = create_engine('oracle://qmcb:qmcb@localhost:1521/tqmcbdb') #查詢 sql_select = ''' ...''' df = pd.read_sql_query(sql_select, db) #執行 db.execute('truncate table {}'.format(ttb)) #保存 df.to_sql() #太慢 #插入 conn = db.raw_connection() cursor = conn.cursor() col = ', '.join(df.columns.tolist()) s = ', '.join([':'+str(i) for i in range(1, df.shape[1]+1)]) sql = 'insert into {}({}) values({})'.format(ttb, col, s) cursor.executemany(sql, df.values.tolist()) conn.commit() cursor.close()
python連接oracle -- cx_Oracle
oracle 數據庫表字段的缺失值統計 -- 基於python
cx_Oracle模塊的安裝和使用 看下連接oracle的各種方式
import cx_Oracle as orcl #定義oracle連接 orcl.connect('load/123456@localhost/ora11g') orcl.connect('username','password','host:port/orcl') tnsname = orcl.makedsn('host', 'port', 'sid') db = orcl.connect('username', 'password', tnsname) #數據庫參數輸出 db.autocommit = False # 關閉自動提交 orcl.clientversion() db.version db.ping() db.username db.password db.dsn #dsn=data source name db.tnsentry cursor.rowcount #定義游標 #orcl.Cursor(db) cursor = db.cursor() #查詢數據 + 執行代碼(如建表,清空表) sql = "SELECT ENAME, trunc(SAL,0) FROM EMP WHERE deptno = :deptno and sal > :value" cursor.execute(sql, deptno = 30, value = 1000) dict1 = {'deptno':30, 'sal':3000} cursor.execute(sql, dict1) cursor.execute(sql) # cursor.fetchone() cursor.fetchall() cursor.fetchmany(10) #查10條數據 #插入數據 #mysql自帶load data infile 'D:/../...txt' into table user(var1, ., .) cursor.execute('insert into demo(v) values(:1)',['nice']) data = [['a',1], ['b',2], ...] cursor.executemany(sql, data) #查詢列名 columns = [i[0] for i in cursor.description] #出錯回滾 -- 一般如插入數據時出錯回滾, try:...except:db.rollback() db.rollback() #關閉數據庫 cursor.close() db.close()
定義ConnectOracle對象
import cx_Oracle as orcl class ConnectOracle: #scott xx 192.168.32.200 1521 oracle #sqlplus, system as sysdba, select instance_name from V$instance; #oracle://qmcbrt:qmcbrt@localhost:1521/tqmcbdb #cx_Oracle.connection('hr', 'hrpwd', 'localhost:1521/XE') ##cx_Oracle.connection('qmcb:qmcb@localhost:1521/tqmcbdb') def __init__(self, username, passwd, host, port='1521', sid='oracle'): # dsn = data source name self.login = {} self.db = None self.cursor = None self.login['username'] = username self.login['passwd'] = passwd self.login['dsn'] = orcl.makedsn(host, port, sid) print(orcl.clientversion()) def connect_oracle(self): try: #orcl.connect('load/123456@localhost/ora11g') #orcl.connect('username','password','host/orcl') self.db = orcl.connect(self.login['username'], self.login['passwd'], self.login['dsn']) # 登錄內搜數據庫 self.db.autocommit = False # 關閉自動提交 #orcl.Cursor(self.db) self.cursor = self.db.cursor() # 設置cursor光標 #print(self.db.dsn) #print(self.db.password) #print(self.db.ping()) #print(self.db.tnsentry) #print(self.db.username); #print(self.db.version) #print(self.cursor.rowcount) return True except: print('can not connect oracle') return False def close_oracle(self): self.cursor.close() self.db.close() def select_oracle(self, sql, size=0, params=None): if self.connect_oracle(): if params: # 執行多條sql命令 # results = cur.executemany("select field from table where field = %s", ('value1', 'value2', 'value3')) #sql = "SELECT ENAME, trunc(SAL,0) FROM EMP \ # WHERE deptno = :deptno and sal > :value" #cursor.execute(sql, deptno = 30, value = 1000); #params = {'deptno':30, 'sal':3000} #cur.execute(sql, params) self.cursor.executemany(sql, params) else: self.cursor.execute(sql) if size: content = self.cursor.fetchmany(size) else: content = self.cursor.fetchall() # 字段名 #ss = sql.lower() #find默認查找從左向右第一個位置 #string = sql[ss.find('select'), ss.find('from')] columns = [i[0] for i in self.cursor.description] self.close_oracle() #if len(content)==1: # return pd.Series(content[0], index=columns) return pd.DataFrame(content, columns=columns) return False def insert_oracle(self, sql, data=None): try: self.connect_oracle() if data: # 批量插入, [(),..]、((),..) # mysql自帶load data infile 'D:/../...txt' into table user(var1, ., .) #self.cursor.execute('insert into demo(v) values(:1)',['nice']) self.cursor.executemany(sql, data) else: self.cursor.execute(sql, data) except: print("insert異常") self.db.rollback() # 回滾 finally: self.db.commit() # 事務提交 self.close_oracle()
定義讀取oracle函數
def read_db(orcl_engien, ftb, snum, enum): db = create_engine(orcl_engien) #不需要close() query_sql = ''' select * from {} where row_num > {} and row_num <= {} and is_normal='1' and is_below_16='0' and is_xs='0' and is_cj='0' and is_bzsw='0' and is_dc='0' and is_px='0' '''.format(ftb, snum, enum) return pd.read_sql_query(query_sql, db)
定義connectOracle
class ConnectOracle: def __init__(self, username, passwd, host, port='1521', sid='oracle'): # dsn = data source name self.login = {} self.db = None self.cursor = None self.login['username'] = username self.login['passwd'] = passwd self.login['dsn'] = orcl.makedsn(host, port, sid) print(orcl.clientversion()) def connect(self): try: #orcl.connect('load/123456@localhost/ora11g') self.db = orcl.connect(self.login['username'], self.login['passwd'], self.login['dsn']) # 登錄內搜數據庫 self.db.autocommit = False # 關閉自動提交 self.cursor = self.db.cursor() # 設置cursor光標 return True except Exception as e: print(e) return False def close(self): self.cursor.close() self.db.close() def execute(self, sql, params=None): try: if params: self.cursor.execute(sql, params) else: self.cursor.execute(sql) return True except Exception as e: print(e) self.db.rollback() return False def select(self, sql, size=0, params=None): if self.connect(): if self.execute(sql): if size: data_list = self.cursor.fetchmany(size) else: data_list = self.cursor.fetchall() columns = [i[0].lower() for i in self.cursor.description] #if len(content)==1: # return pd.Series(content[0], index=columns) df = pd.DataFrame(data_list, columns=columns) return df else: print('代碼執行錯誤') self.close() else: print('數據庫連接錯誤') return None def insert(self, sql, data=None): if self.connect(): try: if data: if isinstance(data[0], (list,tuple,)): self.cursor.executemany(sql, data) else: self.cursor.execute(sql, data) else: self.cursor.execute(sql) except Exception as e: print(e) print("insert異常") self.db.rollback() #回滾 finally: self.db.commit() #事務提交 self.close() else: print('數據庫連接錯誤')
定義插入oracle函數
import retry #backoff=1, jitter=0, , logger=retry.logging @retry.retry(exceptions=Exception, tries=5, delay=5, backoff=1, max_delay=10) def insert_db(data, orcl_engien, ttb): # dtyp = {col:types.VARCHAR(df[col].str.len().max()) # for col in df.columns[df.dtypes == 'object'].tolist()} # 太慢!! #dtyp = {'aac147':types.VARCHAR(18),'score':types.NUMBER(6,2)} #return data.to_sql(ttb, con=db, dtype=dtyp, if_exists='append', index=False) #output = io.StringIO() #data.to_csv(output, sep='\t', index=False, header=False) #output.getvalue() #output.seek(0) db = create_engine(orcl_engien) #不需要close() conn = db.raw_connection() #db = cx_Oracle.connect(orcl_engien[9:]) cursor = conn.cursor() #cursor.copy_from(output, ttb, null='') col = ', '.join(data.columns.tolist()) s = ', '.join([':'+str(i) for i in range(1, data.shape[1]+1)]) sql = 'insert into {}({}) values({})'.format(ttb, col, s) #TypeError: expecting float, cursor.executemany(sql, data.values.tolist()) conn.commit() cursor.close() #try: #except Exception as e: # print(e) #finally:
定義重試裝飾器
#定義一個重試修飾器,默認重試一次 def retry(num_retries=1, delay=2, random_sec_max=3): #用來接收函數 import time import numpy as np def wrapper(func): #用來接收函數的參數 def wrapper(*args,**kwargs): #為了方便看拋出什么錯誤定義一個錯誤變量 #last_exception =None #循環執行包裝的函數 for _ in range(num_retries): try: #如果沒有錯誤就返回包裝的函數,這樣跳出循環 return func(*args, **kwargs) except Exception as err: #捕捉到錯誤不要return,不然就不會循環了 #last_exception = e #np.random.randint(0,random_sec_max+1,size=10) time.sleep(delay + np.random.random()*random_sec_max) print(err) else: #如果要看拋出錯誤就可以拋出 # raise last_exception raise 'ERROR: 超過retry指定執行次數!!' print('未執行參數為:', *args, **kwargs) return wrapper return wrapper