pymssql包是Python語言用於連接SQL Server數據庫的驅動程序(或者稱作DB API),它是最終和數據庫進行交互的工具。SQLAlchemy包就是利用pymssql包實現和SQL Server數據庫交互的功能的。
一,pymssql包的基本組成
pymssql包由兩個模塊構成:pymssql 和 _mssql,pymssql 是建立在_mssql模塊之上的模塊,相對來說,_mssql性能更高。
pymssql模塊由Connection和Cursor 兩個大類構成:
- Connection類代表MS SQL Sever數據庫的一個連接,
- Cursor類用於向數據庫發送查詢請求,並獲取查詢的的結果。
按照慣例,使用pymssql包查詢數據庫之前,首先創建連接:
import pymssql conn = pymssql.connect(host='host',database='db_name',user='user',password='pwd',charset='utf8')
通過連接創建游標,通過游標執行SQL語句,查詢數據或對數據進行更新操作:
cursor = conn.cursor() cursor.execute("sql statement")
如果執行的是修改操作,需要提交事務;如果執行的是查詢操作,不需要提交:
conn.commit()
在查詢完成之后,關閉連接
conn.close()
二,連接
連接對象用於連接SQL Server引擎,並設置連接的屬性,比如連接超時,字符集等。
1,創建連接對象
pymssql通過類函數來構建連接對,在創建連接對象的同時,打開連接:
class pymssql.Connection(user, password, host, database, timeout, login_timeout, charset, as_dict)
2,構建Cursor對象
在創建連接對象之后,創建Cursor對象,使用Cursor對象向數據庫引擎發送查詢請求,並獲取查詢的結果:
Connection.cursor(as_dict=False)
as_dict是布爾類型,默認值是False,表示返回的數據是元組(tuple)類型;如果設置為True,返回的數據集是字典(dict)類型。
3,提交查詢和自動提交模式
在執行查詢之后,必須提交當前的事務,以真正執行Cursor對象的查詢請求:
Connection.commit()
默認情況下,自動提交模式是關閉的,用戶可以設置自動提交,pymssql自動執行Cursor發送的查詢請求:
Connection.autocommit(status)
status是bool值,True表示打開自動提交模式,False表示關閉自動提交模式,默認值是False。
4,關閉連接
在執行完查詢之后,關閉連接,通常情況下,使用with 語句來自動關閉連接:
Connection.close()
三,Cursor對象
通過打開的連接對象來創建Cursor對象,通過Cursor對象向數據庫引擎發送查詢請求,並獲取查詢的結果。
1,執行查詢
Cursor對象調用execute**()函數來執行查詢請求,
Cursor.execute(operation)
Cursor.execute(operation, params)
Cursor.executemany(operation, params_seq)
參數注釋:
- operation:表示執行的sql語句,
- params :表示sql語句的參數,
- params_seq:參數序列,用於sql語句包含多個參數的情況。
注意,除設置自動提交模式之外,必須在執行查詢之后,通過連接對象來提交查詢。
Connection.commit()
如果sql語句只包含一個參數,那么必須在sql語句中顯式使用%s或%d作為占位符,分別用於引用字符型的參數和數值型的參數。
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
如果sql語句包含多個參數,那么使用list來傳遞參數:
cursor.executemany( "INSERT INTO persons VALUES (%d, %s, %s)", [(1, 'John Smith', 'John Doe'), (2, 'Jane Doe', 'Joe Dog'), (3, 'Mike T.', 'Sarah H.')])
2,獲取查詢結果
Cursor對象調用fetch**()函數來獲取查詢的結果:
Cursor.fetchone() Cursor.fetchmany(size=None) Cursor.fetchall()
fetch**()函數是迭代的:
- fetchone():表示從查詢結果中獲取下一行(next row)
- fetchmany():表示從查詢結果中獲取下面的多行(next batch)
- fetchall():表示從查詢結果中獲取剩余的所有數據行(all remaining)
3,跳過結果集
當查詢的結果包含多個結果集時,可以跳過當前的結果集,跳到下一個結果集:
Cursor.nextset()
如果當前結果集還有數據行未被讀取,那么這些剩余的數據行會被丟棄。
四,使用Cursor對象查詢數據
游標cursor是由連接創建的對象,可以在游標中執行查詢,並設置數據返回的格式。當執行select語句獲取數據時,返回的數據行有兩種格式:元組和字典,行的默認格式是元組。
cursor = conn.cursor(as_dict=True)
pymssql返回的數據集的格式是在創建游標時設置的,當參數 as_dict為True時,返回的行是字典格式,該參數的默認值是False,因此,默認的行格式是元組。
由於游標是一個迭代器,因此,可以使用for語句以迭代方式逐行處理查詢的結果集。
for row in cursor:
1,以元組方式返回數據行
默認情況下,游標返回的每一個數據行,都是一個元組結構:
cursor=connect.cursor() cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print('row = %r' % (row,))
2,以字典方式返回數據行
當設置游標以字典格式返回數據時,每一行都是一個字典結構:
cursor = conn.cursor(as_dict=True) cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name']))
五,使用Cursor對象更新數據
在執行update、delete或insert命令對數據進行更新時,需要顯式提交事務。
1,執行單條語句修改數據
當需要更新數據時,調用游標的execute()函數執行SQL命令來實現,可以以參數化的方式來執行,參數化類似於python的string.format()函數,通過格式化的字符串、占位符和參數來生成TSQL腳本。
cursor.execute(operation) cursor.execute(operation, params)
通過游標的execute()函數來執行TSQL語句,調用 commit() 來提交事務
cursor.execute("sql statement") conn.commit()
或者以參數化的方式來執行:
cursor.execute("update id=1 FROM persons WHERE salesrep='%s'", 'John Doe') conn.commit()
2,執行數據的多行插入
如果要在一個事務中執行多條SQL命令,可以調用游標的executemany()函數:
cursor.executemany(operation, params_seq)
如果需要插入多條記錄,可以使用游標的executemany()函數,該函數包含模板SQL 命令和一個格式化的參數列表,用於在一條事務中插入多條記錄:
args=[(1, 'John Smith', 'John Doe'), (2, 'Jane Doe', 'Joe Dog'), (3, 'Mike T.', 'Sarah H.')] cursor.executemany("INSERT INTO persons VALUES (%d, %s, %s)", args ) conn.commit()
六,調用存儲過程
從pymssql 2.0.0開始,可以使用callproc()函數來執行存儲過程,callproc()函數的語法是:
result_args = cursor.callproc(proc_name, args=())
第一個參數是存儲過程的名稱,第二個參數args是一個元組類型,對於存儲過程的每一個參數,都需要傳遞值。對於OUT參數,也必須傳遞值,通常傳遞0。
callproc()函數返回的是輸入args的修改之后的副本,IN參數在result_args中不變,OUT參數在result_args中代表存儲過程輸出的值。
舉個例子,對於存儲add_num,有兩個IN參數,一個OUT參數:
CREATE PROCEDURE add_num(IN num1 INT, IN num2 INT, OUT sum INT)
調用callproc()函數的格式是:
result_args = (5, 6, 0) # 0 is to hold value of the OUT parameter sum cursor.callproc('add_num', result_args)
以下示例代碼,使用上下文管理器來調用callproc()執行存儲過程:
with pymssql.connect(server, user, password, "tempdb") as conn: with conn.cursor(as_dict=True) as cursor: cursor.callproc('sp_name', ('arg1',)) for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name']))
經過我的測試,我發現不管是使用callproc(),還是使用execute('exec sp_name'),pymssql都不能執行復雜的存儲過程,這讓人很是頭疼。
七,pymssql模塊的基本操作
1,pymssql的基本操作
from os import getenv import pymssql server = getenv("PYMSSQL_TEST_SERVER") user = getenv("PYMSSQL_TEST_USERNAME") password = getenv("PYMSSQL_TEST_PASSWORD") conn = pymssql.connect(server, user, password, "tempdb") cursor = conn.cursor(as_dict=False) cursor.execute("TSQL query") cursor.executemany("INSERT INTO persons VALUES (%d, %s, %s)", [(1, 'John Smith', 'John Doe'), (2, 'Jane Doe', 'Joe Dog'), (3, 'Mike T.', 'Sarah H.')]) # you must call commit() to persist your data if you don't set autocommit to True conn.commit() cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') row = cursor.fetchone() while row: print("ID=%d, Name=%s" % (row[0], row[1])) row = cursor.fetchone() conn.close()
2,以字典集返回數據行
conn = pymssql.connect(server, user, password, "tempdb") cursor = conn.cursor(as_dict=True) cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name'])) conn.close()
3,使用with語句
with是上下文管理器,可以自動關閉上下文。如果使用with語句來創建連接對象和Cursor對象,那么就不需要顯式的關閉連接和Cursor對象,在語句執行完成之后,Python會自動檢測連接對象和Cursor對象的作用域,一旦連接對象或Cursor對象不再有效,Python就會關閉連接或Cursor對象。
with pymssql.connect(server, user, password, "tempdb") as conn: with conn.cursor(as_dict=True) as cursor: cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe') for row in cursor: print("ID=%d, Name=%s" % (row['id'], row['name']))
八,附上代碼庫
附上代碼,以饗讀者。

import pymssql from sqlalchemy import create_engine import pandas as pd from sqlalchemy.sql import text as sql_text class DBHelper: def __init__(self): self.name='DB Helper' self.db_host = r'sql server' self.db_name = 'db name' self.db_user = r'sa' self.db_password = r'pwd' ###################################################### ## data connection ## ###################################################### def get_engine(self): str_format = 'mssql+pymssql://{0}:{1}@{2}/{3}?charset=utf8' connection_str = str_format.format(self.db_user,self.db_password,self.db_host,self.db_name) engine = create_engine(connection_str,echo=False) return engine def get_pymssql_conn(self): conn = pymssql.connect(self.db_host, self.db_user, self.db_password, self.db_name) return conn ###################################################### ## common SQL APIs ## ###################################################### def write_data(self,df,destination,if_exists='append',schema='dbo'): engine = self.get_engine() df.to_sql(destination, con=engine, if_exists=if_exists,index = False, schema=schema , method='multi', chunksize=1000) def read_data(self,sql): engine = self.get_engine() df = pd.read_sql(sql, con=engine) return df def exec_sql(self,sql): engine = self.get_engine() con = engine.connect() with con.begin() as tran: con.execute(sql_text(sql).execution_options(autocommit=True)) def exec_sp(self,sp_name,*paras): with pymssql.connect(self.db_host, self.db_user, self.db_password, database=self.db_name) as conn: with conn.cursor(as_dict=False) as cursor: try: cursor.callproc(sp_name, paras) cursor.nextset() conn.commit() except Exception as e: print(e) def exec_sp_result(self,sp_name,*paras): with pymssql.connect(self.db_host, self.db_user, self.db_password, database=self.db_name) as conn: with conn.cursor(as_dict=True) as cursor: try: cursor.callproc(sp_name, paras) cursor.nextset() result=cursor.fetchall() conn.commit() df=pd.DataFrame.from_records(result) return df except Exception as e: print(e)
參考文檔: