在Python語言中,從SQL Server數據庫讀寫數據,通常情況下,都是使用sqlalchemy 包和 pymssql 包的組合,這是因為大多數數據處理程序都需要用到DataFrame對象,它內置了從數據庫中讀和寫數據的函數:read_sql()和to_sql(),這兩個函數支持的連接類型是由sqlalchemy和pymssql構成的,因此,掌握這兩個包對於查詢SQL Server數據庫十分必要。
SQLAlchemy的架構
在Python語言環境中,當需要和關系型數據進行交互時,SQLAlchemy是事實上的標准包。SQLAlchemy由兩個截然不同的組件組成,稱為Core和ORM(Object Relational Mapper,對象關系映射器),Core是功能齊全的數據庫工具包,使用SQL 腳本來查詢數據庫;ORM是基於Core的可選包,把數據庫對象抽象成表、列、關系等實體。但是SQLAlchemy本身無法操作數據庫,需要pymsql等第三方數據庫API(Database API ),簡寫為 DBAPI,根據數據庫類型而調用不同的數據庫API。
從上圖可以看出,SQLAlchemy的基礎是使用DBAPI跟數據庫進行交互,而DBAPI不是一個package,而是一個規范,是一個抽象的接口,pymssql是實現該規范的一個工具包。
SQLAlchemy的Core組件使用DBAPI來和數據庫進行交互,當使用SQL腳本對數據庫執行查詢和修改操作時,必須用到SQLAlchemy的Engine 對象和Dialect對象。Engine 對象用於創建連接,連接到SQL Server,而Dialect對象(通常是Cursor對象)代表執行上下文,表示向SQL Server發送的請求和返回的結果。
本文主要分享使用Core組件來和數據庫進行交互。
一,SQLAlchemy的Engine和Connection
使用SQLAlchemy從數據庫中讀寫數據的基本用法:通過SQL 語句更新數據,通過DataFrame的read_sql()函數從數據庫中讀取數據,通過to_sql()函數把數據寫入到數據表中。
1,創建Engine
在對數據庫執行讀寫操作之前,必須連接到數據庫。SQLAlchemy通過 create_engine () 函數創建Engine,使用Engine管理DBAPI的連接,DBAPI的連接僅僅表示一種連接資源。應用Engine最有效率的方式是在模塊級別創建一次,而不是按照對象或函數來調用。
import pymssql import sqlalchemy from sqlalchemy import create_engine connection_format = 'mssql+pymssql://{0}:{1}@{2}/{3}?charset=utf8' connection_str = connection_format.format(db_user,db_password,db_host,db_name) engine = create_engine(connection_str,echo=False)
連接字符串的URL格式是:
dialect[+driver]://user:password@host/dbname
?charset=utf8
其中,dialect 代表數據庫類型,比如 mssql、mysql等,driver 代表DBAPI的類型,比如 psycopg2、pymysql等。
當echo參數為True時,會顯示執行的SQL語句,推薦把echo設置False,關閉日記功能。
Engine對象可以直接用於向數據庫發送SQL 腳本,調用Engine.execute()函數執行SQL腳本:
2,Engine和Connection
最通用的方法是通過Engine.connect()方法獲得連接資源,connection 是Connection的一個實例,是DBAPI連接的一個代理對象。
connection = engine.connect() result = connection.execute("select username from users") for row in result: print("username:", row['username']) connection.close()
result是ResultProxy的一個實例,該實例引用DBAPI的cursor。如果執行SELECT命令,當把所有的數據行都返回時,ResultProxy將自動關閉DBAPI的游標。如果執行UPDATE命令,不返回任何數據行,在命令執行之后,游標立即釋放資源。
當connection顯式調用close()函數時,應用的DBAPI 連接會被釋放到連接池(connection pool)。
可以使用Engine對象的execute()函數,以一種簡單的方式執行上述過程:
result = engine.execute("select username from users") for row in result: print("username:", row['username'])
二,顯式使用事務
Connection對象提供begin()函數顯式開始一個事務(Transaction)對象,該對象通常用於try/except代碼塊中,以保證調用Transaction.rollback() 或 Transaction.commit()。
connection = engine.connect() tran = connection.begin() try: connection.execute('sql statement') tran.commit() except: tran.rollback() raise
或者使用上下文管理器編寫更簡單的代碼:
with connection.begin() as tran: connection.execute('sql statement')
三,自動提交事務
SQLAlchemy 實現了autocommit 功能,在當前沒有顯式開啟事務的情況下,如果SQLAlchemy 檢測(Detect)到執行的數據修改命令(比如 INSERT、UPDATE、DELETE)或數據定義命令(比如,CREATE TABLE, ALTER TABLE),那么Connection對象會自動提交事務。
conn = engine.connect() conn.execute("INSERT INTO users VALUES (1, 'john')") # autocommits
如果設置選項autocommit=True(默認為True),那么檢測會自動進行。如果執行的純文本的SQL語句,並且語句中包含數據修改和數據定義命令,那么自動提交事務。
可以使用Connection.execution_options()方法來設置autocommit選項,實現自動提交的完全控制:
engine.execute(text("SELECT my_mutating_procedure()").execution_options(autocommit=True))
四,執行SELECT查詢
使用Engine 或 Connection的execute()函數執行select查詢,返回游標變量。游標標量是一個迭代器,每次迭代返回的結果都是一個數據行,數據行是由字段構成的元組:
>>> cursor = engine.execute(' select * from dbo.vic_test') >>> for row in cursor: ... do_something
也可以使用DataFrame對象的read_sql()函數,把數據讀取到DataFrame對象中,或者調用DataFrame對象的to_sql()函數,把DataFrame對象中的數據寫入到關系表中。
五,使用原始的DBAPI連接
在某些情況下,SQLAlchemy 無法提供訪問某些DBAPI函數的通用方法,例如,調用存儲以及處理多個結果集,在這種情況下,直接使用原始DBAPI的連接。
dbapi_conn = engine.raw_connection()
該dbapi_conn是一種代理形式,當調用dbapi的close()方法時,實際上並沒有關閉DBAPI的連接,而是把其釋放會連接池:
dbapi_conn.close()
例如,使用DBAPI的原始連接來調用存儲過程,通過DBAPI級別的callpro()函數來執行存儲過程:
connection = engine.raw_connection() try: cursor = connection.cursor() cursor.callproc("my_procedure", ['x', 'y', 'z']) results = list(cursor.fetchall()) cursor.close() connection.commit() finally: connection.close()
pymssql 包
pymssql包是Python語言用於連接SQL Server數據庫的驅動程序(或者稱作DBAPI),它是最終和數據庫進行交互的工具。SQLAlchemy包就是利用pymssql包實現和SQL Server數據庫交互的功能的。
按照慣例,使用pymssql包查詢數據庫之前,首先創建連接:
import pymssql conn = pymssql.connect(host='host',database='db_name',user='user',password='pwd',charset='utf8')
通過連接創建游標,通過游標執行SQL語句,查詢數據或對數據進行更新操作:
cursor = conn.cursor() cursor.execute("sql statement")
如果執行的是修改操作,需要提交事務;如果執行的是查詢操作,不需要提交:
conn.commit()
在查詢完成之后,關閉連接
conn.close()
六,使用游標查詢數據
游標cursor是由連接創建的對象,可以在游標中執行查詢,並設置數據返回的格式。
當執行select語句獲取數據時,返回的數據行有兩種格式:元組和字典,行的默認格式是元組。
cursor = conn.cursor(as_dict=True)
pymssql返回的數據集的格式是在創建游標時設置的,當參數 as_dict為True時,返回的行是字典格式,該參數的默認值是False,因此,默認的行格式是元組。
由於游標是一個迭代器,因此,可以使用for語句以迭代方式逐行處理查詢的結果集。
for row in cursor:
1,以元組方式返回數據行
默認情況下,游標返回的每一個數據行,都是一個元組結構:
cursor=connect.cursor() cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print('row = %r' % (row,))
2,以字典方式返回數據行
當設置游標以字典格式返回數據時,每一行都是一個字典結構:
cursor = conn.cursor(as_dict=True)
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name']))
七,使用游標更新數據
在執行update、delete或insert命令對數據進行更新時,需要顯式提交事務。
1,執行單條語句修改數據
當需要更新數據時,調用游標的execute()函數執行SQL命令來實現,可以以參數化的方式來執行,參數化類似於python的string.format()函數,通過格式化的字符串、占位符和參數來生成TSQL腳本。
cursor.execute(operation)
cursor.execute(operation, params)
通過游標的execute()函數來執行TSQL語句,調用 commit() 來提交事務
cursor.execute(""" sql statement """) conn.commit()
或者以參數化的方式來執行:
cursor.execute("update id=1 FROM persons WHERE salesrep='%s'", 'John Doe') conn.commit()
2,執行數據的多行插入
如果要在一個事務中執行多條SQL命令,可以調用游標的executemany()函數:
cursor.executemany(operation, params_seq)
如果需要插入多條記錄,可以使用游標的executemany()函數,該函數包含模板SQL 命令和一個格式化的參數列表,用於在一條事務中插入多條記錄:
args=[(1, 'John Smith', 'John Doe'), (2, 'Jane Doe', 'Joe Dog'), (3, 'Mike T.', 'Sarah H.')] cursor.executemany("INSERT INTO persons VALUES (%d, %s, %s)", args ) conn.commit()
八,關閉連接
在一個連接中,用戶可以提交多個事務,執行多個操作。當查詢完成之后,一定要關閉連接:
conn.close()
通常情況下,使用with來自動關閉連接:
with pymssql.connect(host='host',database='db_name', user='user', password='pwd',charset='utf8') as conn: with conn.cursor(as_dict=True) as cursor: cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name']))
九,調用存儲過程
從pymssql 2.0.0開始,可以使用callproc函數來執行存儲過程,callproc()函數的語法是:
result_args = cursor.callproc(proc_name, args=())
args參數是一個序列,對於存儲過程的每一個參數,都需要傳遞值。對於OUT參數,也必須傳遞值,通常傳遞0。
callproc()函數返回的是輸入args的修改之后的副本,IN參數在result_args中不變,OUT參數在result_args中代表存儲過程輸出的值。
舉個例子,對於存儲add_num,有兩個IN參數,一個OUT參數:
CREATE PROCEDURE add_num(IN num1 INT, IN num2 INT, OUT sum INT)
調用callproc()函數的格式是:
result_args = (5, 6, 0) # 0 is to hold value of the OUT parameter sum cursor.callproc('add_num', result_args)
以下示例代碼,使用上下文管理器來調用callproc()執行存儲過程:
with pymssql.connect(server, user, password, "tempdb") as conn: with conn.cursor(as_dict=True) as cursor: cursor.callproc('sp_name', ('arg1',)) for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name']))
參考文檔: