Python 學習 第17篇:從SQL Server數據庫讀寫數據


在Python語言中,從SQL Server數據庫讀寫數據,通常情況下,都是使用sqlalchemy 包和 pymssql 包的組合,這是因為大多數數據處理程序都需要用到DataFrame對象,它內置了從數據庫中讀和寫數據的函數:read_sql()和to_sql(),這兩個函數支持的連接類型是由sqlalchemy和pymssql構成的,因此,掌握這兩個包對於查詢SQL Server數據庫十分必要。

SQLAlchemy的架構

在Python語言環境中,當需要和關系型數據進行交互時,SQLAlchemy是事實上的標准包。SQLAlchemy由兩個截然不同的組件組成,稱為Core和ORM(Object Relational Mapper,對象關系映射器),Core是功能齊全的數據庫工具包,使用SQL 腳本來查詢數據庫;ORM是基於Core的可選包,把數據庫對象抽象成表、列、關系等實體。但是SQLAlchemy本身無法操作數據庫,需要pymsql等第三方數據庫API(Database API ),簡寫為 DBAPI,根據數據庫類型而調用不同的數據庫API。

從上圖可以看出,SQLAlchemy的基礎是使用DBAPI跟數據庫進行交互,而DBAPI不是一個package,而是一個規范,是一個抽象的接口,pymssql是實現該規范的一個工具包。

SQLAlchemy的Core組件使用DBAPI來和數據庫進行交互,當使用SQL腳本對數據庫執行查詢和修改操作時,必須用到SQLAlchemy的Engine 對象和Dialect對象。Engine 對象用於創建連接,連接到SQL Server,而Dialect對象(通常是Cursor對象)代表執行上下文,表示向SQL Server發送的請求和返回的結果。

本文主要分享使用Core組件來和數據庫進行交互。

一,SQLAlchemy的Engine和Connection

使用SQLAlchemy從數據庫中讀寫數據的基本用法:通過SQL 語句更新數據,通過DataFrame的read_sql()函數從數據庫中讀取數據,通過to_sql()函數把數據寫入到數據表中。

1,創建Engine

在對數據庫執行讀寫操作之前,必須連接到數據庫。SQLAlchemy通過 create_engine () 函數創建Engine,使用Engine管理DBAPI的連接,DBAPI的連接僅僅表示一種連接資源。應用Engine最有效率的方式是在模塊級別創建一次,而不是按照對象或函數來調用。

import pymssql
import sqlalchemy
from sqlalchemy import create_engine

connection_format = 'mssql+pymssql://{0}:{1}@{2}/{3}?charset=utf8'
connection_str = connection_format.format(db_user,db_password,db_host,db_name)
engine = create_engine(connection_str,echo=False) 

連接字符串的URL格式是:

dialect[+driver]://user:password@host/dbname?charset=utf8

其中,dialect 代表數據庫類型,比如 mssql、mysql等,driver 代表DBAPI的類型,比如 psycopg2、pymysql等。

當echo參數為True時,會顯示執行的SQL語句,推薦把echo設置False,關閉日記功能。

Engine對象可以直接用於向數據庫發送SQL 腳本,調用Engine.execute()函數執行SQL腳本:

2,Engine和Connection

最通用的方法是通過Engine.connect()方法獲得連接資源,connection 是Connection的一個實例,是DBAPI連接的一個代理對象。

connection = engine.connect()
result = connection.execute("select username from users")
for row in result:
    print("username:", row['username'])
connection.close()

result是ResultProxy的一個實例,該實例引用DBAPI的cursor。如果執行SELECT命令,當把所有的數據行都返回時,ResultProxy將自動關閉DBAPI的游標。如果執行UPDATE命令,不返回任何數據行,在命令執行之后,游標立即釋放資源。

當connection顯式調用close()函數時,應用的DBAPI 連接會被釋放到連接池(connection pool)。

可以使用Engine對象的execute()函數,以一種簡單的方式執行上述過程:

result = engine.execute("select username from users")
for row in result:
    print("username:", row['username'])

二,顯式使用事務

Connection對象提供begin()函數顯式開始一個事務(Transaction)對象,該對象通常用於try/except代碼塊中,以保證調用Transaction.rollback() 或 Transaction.commit()。

connection = engine.connect()
tran = connection.begin()
try:
    connection.execute('sql statement')
    tran.commit()
except:
    tran.rollback()
    raise

或者使用上下文管理器編寫更簡單的代碼:

with connection.begin() as tran:
    connection.execute('sql statement')

三,自動提交事務

SQLAlchemy 實現了autocommit 功能,在當前沒有顯式開啟事務的情況下,如果SQLAlchemy 檢測(Detect)到執行的數據修改命令(比如 INSERT、UPDATE、DELETE)或數據定義命令(比如,CREATE TABLE, ALTER TABLE),那么Connection對象會自動提交事務。

conn = engine.connect()
conn.execute("INSERT INTO users VALUES (1, 'john')")  # autocommits

如果設置選項autocommit=True(默認為True),那么檢測會自動進行。如果執行的純文本的SQL語句,並且語句中包含數據修改和數據定義命令,那么自動提交事務。

可以使用Connection.execution_options()方法來設置autocommit選項,實現自動提交的完全控制:

engine.execute(text("SELECT my_mutating_procedure()").execution_options(autocommit=True))

四,執行SELECT查詢

使用Engine 或 Connection的execute()函數執行select查詢,返回游標變量。游標標量是一個迭代器,每次迭代返回的結果都是一個數據行,數據行是由字段構成的元組:

>>> cursor = engine.execute(' select * from dbo.vic_test')
>>> for row in cursor:
...     do_something

也可以使用DataFrame對象的read_sql()函數,把數據讀取到DataFrame對象中,或者調用DataFrame對象的to_sql()函數,把DataFrame對象中的數據寫入到關系表中。

五,使用原始的DBAPI連接

在某些情況下,SQLAlchemy 無法提供訪問某些DBAPI函數的通用方法,例如,調用存儲以及處理多個結果集,在這種情況下,直接使用原始DBAPI的連接。

dbapi_conn = engine.raw_connection()

該dbapi_conn是一種代理形式,當調用dbapi的close()方法時,實際上並沒有關閉DBAPI的連接,而是把其釋放會連接池:

dbapi_conn.close()

例如,使用DBAPI的原始連接來調用存儲過程,通過DBAPI級別的callpro()函數來執行存儲過程:

connection = engine.raw_connection()
try:
    cursor = connection.cursor()
    cursor.callproc("my_procedure", ['x', 'y', 'z'])
    results = list(cursor.fetchall())
    cursor.close()
    connection.commit()
finally:
    connection.close()

pymssql 包

pymssql包是Python語言用於連接SQL Server數據庫的驅動程序(或者稱作DBAPI),它是最終和數據庫進行交互的工具。SQLAlchemy包就是利用pymssql包實現和SQL Server數據庫交互的功能的。

按照慣例,使用pymssql包查詢數據庫之前,首先創建連接:

import pymssql
conn = pymssql.connect(host='host',database='db_name',user='user',password='pwd',charset='utf8')

通過連接創建游標,通過游標執行SQL語句,查詢數據或對數據進行更新操作:

cursor = conn.cursor()
cursor.execute("sql statement") 

如果執行的是修改操作,需要提交事務;如果執行的是查詢操作,不需要提交:

conn.commit()

在查詢完成之后,關閉連接

conn.close()

六,使用游標查詢數據

游標cursor是由連接創建的對象,可以在游標中執行查詢,並設置數據返回的格式。

當執行select語句獲取數據時,返回的數據行有兩種格式:元組和字典,行的默認格式是元組。

cursor = conn.cursor(as_dict=True) 

pymssql返回的數據集的格式是在創建游標時設置的,當參數 as_dict為True時,返回的行是字典格式,該參數的默認值是False,因此,默認的行格式是元組。

由於游標是一個迭代器,因此,可以使用for語句以迭代方式逐行處理查詢的結果集。

for row in cursor:

1,以元組方式返回數據行

默認情況下,游標返回的每一個數據行,都是一個元組結構:

cursor=connect.cursor()
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
for row in cursor:
    print('row = %r' % (row,))

2,以字典方式返回數據行

當設置游標以字典格式返回數據時,每一行都是一個字典結構:

 cursor = conn.cursor(as_dict=True)
 cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name']))

七,使用游標更新數據

在執行update、delete或insert命令對數據進行更新時,需要顯式提交事務。

1,執行單條語句修改數據

當需要更新數據時,調用游標的execute()函數執行SQL命令來實現,可以以參數化的方式來執行,參數化類似於python的string.format()函數,通過格式化的字符串、占位符和參數來生成TSQL腳本。

cursor.execute(operation)
cursor.execute(operation, params)

通過游標的execute()函數來執行TSQL語句,調用 commit() 來提交事務

cursor.execute("""
sql statement
""")  
conn.commit()

或者以參數化的方式來執行:

cursor.execute("update id=1 FROM persons WHERE salesrep='%s'", 'John Doe')
conn.commit()

2,執行數據的多行插入

如果要在一個事務中執行多條SQL命令,可以調用游標的executemany()函數:

cursor.executemany(operation, params_seq)

如果需要插入多條記錄,可以使用游標的executemany()函數,該函數包含模板SQL 命令和一個格式化的參數列表,用於在一條事務中插入多條記錄:

args=[(1, 'John Smith', 'John Doe'),
     (2, 'Jane Doe', 'Joe Dog'),
     (3, 'Mike T.', 'Sarah H.')]

cursor.executemany("INSERT INTO persons VALUES (%d, %s, %s)", args )
conn.commit()

八,關閉連接

在一個連接中,用戶可以提交多個事務,執行多個操作。當查詢完成之后,一定要關閉連接:

conn.close()

通常情況下,使用with來自動關閉連接:

with pymssql.connect(host='host',database='db_name', user='user', password='pwd',charset='utf8') as conn:
    with conn.cursor(as_dict=True) as cursor:  
        cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
        for row in cursor:
            print("ID=%d, Name=%s" % (row['id'], row['name']))

九,調用存儲過程

從pymssql 2.0.0開始,可以使用callproc函數來執行存儲過程,callproc()函數的語法是:

result_args = cursor.callproc(proc_name, args=())

args參數是一個序列,對於存儲過程的每一個參數,都需要傳遞值。對於OUT參數,也必須傳遞值,通常傳遞0。

callproc()函數返回的是輸入args的修改之后的副本,IN參數在result_args中不變,OUT參數在result_args中代表存儲過程輸出的值。

舉個例子,對於存儲add_num,有兩個IN參數,一個OUT參數:

CREATE PROCEDURE add_num(IN num1 INT, IN num2 INT, OUT sum INT)

調用callproc()函數的格式是:

result_args = (5, 6, 0) # 0 is to hold value of the OUT parameter sum
cursor.callproc('add_num', result_args)

以下示例代碼,使用上下文管理器來調用callproc()執行存儲過程:

with pymssql.connect(server, user, password, "tempdb") as conn:
    with conn.cursor(as_dict=True) as cursor:
        cursor.callproc('sp_name', ('arg1',))
        for row in cursor:
        print("ID=%d, Name=%s" % (row['id'], row['name']))

 

參考文檔:

Working with Engines and Connections

SQLAlchemy

pymssql introduction

Python中從SQL型數據庫讀寫dataframe型數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM