pymssql 讀寫SQL Server數據庫


pymssql包是Python語言用於連接SQL Server數據庫的驅動程序(或者稱作DB API),它是最終和數據庫進行交互的工具。SQLAlchemy包就是利用pymssql包實現和SQL Server數據庫交互的功能的。

一,pymssql包的基本組成

pymssql包由兩個模塊構成:pymssql 和 _mssql,pymssql 是建立在_mssql模塊之上的模塊,相對來說,_mssql性能更高。

pymssql模塊由Connection和Cursor 兩個大類構成:

  • Connection類代表MS SQL Sever數據庫的一個連接,
  • Cursor類用於向數據庫發送查詢請求,並獲取查詢的的結果。

按照慣例,使用pymssql包查詢數據庫之前,首先創建連接:

import pymssql
conn = pymssql.connect(host='host',database='db_name',user='user',password='pwd',charset='utf8')

通過連接創建游標,通過游標執行SQL語句,查詢數據或對數據進行更新操作:

cursor = conn.cursor()
cursor.execute("sql statement") 

如果執行的是修改操作,需要提交事務;如果執行的是查詢操作,不需要提交:

conn.commit()

在查詢完成之后,關閉連接

conn.close()

二,連接

連接對象用於連接SQL Server引擎,並設置連接的屬性,比如連接超時,字符集等。

1,創建連接對象

pymssql通過類函數來構建連接對,在創建連接對象的同時,打開連接:

class pymssql.Connection(user, password, host, database, timeout, login_timeout, charset, as_dict)

2,構建Cursor對象

在創建連接對象之后,創建Cursor對象,使用Cursor對象向數據庫引擎發送查詢請求,並獲取查詢的結果:

Connection.cursor(as_dict=False)

as_dict是布爾類型,默認值是False,表示返回的數據是元組(tuple)類型;如果設置為True,返回的數據集是字典(dict)類型。

3,提交查詢和自動提交模式

在執行查詢之后,必須提交當前的事務,以真正執行Cursor對象的查詢請求:

Connection.commit()

默認情況下,自動提交模式是關閉的,用戶可以設置自動提交,pymssql自動執行Cursor發送的查詢請求:

Connection.autocommit(status)

status是bool值,True表示打開自動提交模式,False表示關閉自動提交模式,默認值是False。

4,關閉連接

在執行完查詢之后,關閉連接,通常情況下,使用with 語句來自動關閉連接:

Connection.close()

三,Cursor對象

通過打開的連接對象來創建Cursor對象,通過Cursor對象向數據庫引擎發送查詢請求,並獲取查詢的結果。

1,執行查詢

Cursor對象調用execute**()函數來執行查詢請求,

Cursor.execute(operation)
Cursor.execute(operation, params)
Cursor.executemany(operation, params_seq)

參數注釋:

  • operation:表示執行的sql語句,
  • params :表示sql語句的參數,
  • params_seq:參數序列,用於sql語句包含多個參數的情況。

注意,除設置自動提交模式之外,必須在執行查詢之后,通過連接對象來提交查詢。

Connection.commit()

如果sql語句只包含一個參數,那么必須在sql語句中顯式使用%s或%d作為占位符,分別用於引用字符型的參數和數值型的參數。

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')

如果sql語句包含多個參數,那么使用list來傳遞參數:

cursor.executemany(
    "INSERT INTO persons VALUES (%d, %s, %s)",
    [(1, 'John Smith', 'John Doe'),
     (2, 'Jane Doe', 'Joe Dog'),
     (3, 'Mike T.', 'Sarah H.')])

2,獲取查詢結果

Cursor對象調用fetch**()函數來獲取查詢的結果:

Cursor.fetchone()
Cursor.fetchmany(size=None)
Cursor.fetchall()

fetch**()函數是迭代的:

  • fetchone():表示從查詢結果中獲取下一行(next row)
  • fetchmany():表示從查詢結果中獲取下面的多行(next batch)
  • fetchall():表示從查詢結果中獲取剩余的所有數據行(all remaining)

3,跳過結果集

當查詢的結果包含多個結果集時,可以跳過當前的結果集,跳到下一個結果集:

Cursor.nextset()

如果當前結果集還有數據行未被讀取,那么這些剩余的數據行會被丟棄。

四,使用Cursor對象查詢數據

游標cursor是由連接創建的對象,可以在游標中執行查詢,並設置數據返回的格式。當執行select語句獲取數據時,返回的數據行有兩種格式:元組和字典,行的默認格式是元組。

cursor = conn.cursor(as_dict=True) 

pymssql返回的數據集的格式是在創建游標時設置的,當參數 as_dict為True時,返回的行是字典格式,該參數的默認值是False,因此,默認的行格式是元組。

由於游標是一個迭代器,因此,可以使用for語句以迭代方式逐行處理查詢的結果集。

for row in cursor:

1,以元組方式返回數據行

默認情況下,游標返回的每一個數據行,都是一個元組結構:

cursor=connect.cursor()
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
for row in cursor:
    print('row = %r' % (row,))

2,以字典方式返回數據行

當設置游標以字典格式返回數據時,每一行都是一個字典結構:

 cursor = conn.cursor(as_dict=True)
 cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
 for row in cursor:
     print("ID=%d, Name=%s" % (row['id'], row['name']))

五,使用Cursor對象更新數據

在執行update、delete或insert命令對數據進行更新時,需要顯式提交事務。

1,執行單條語句修改數據

當需要更新數據時,調用游標的execute()函數執行SQL命令來實現,可以以參數化的方式來執行,參數化類似於python的string.format()函數,通過格式化的字符串、占位符和參數來生成TSQL腳本。

cursor.execute(operation)
cursor.execute(operation, params)

通過游標的execute()函數來執行TSQL語句,調用 commit() 來提交事務

cursor.execute("sql statement")  
conn.commit()

或者以參數化的方式來執行:

cursor.execute("update id=1 FROM persons WHERE salesrep='%s'", 'John Doe')
conn.commit()

2,執行數據的多行插入

如果要在一個事務中執行多條SQL命令,可以調用游標的executemany()函數:

cursor.executemany(operation, params_seq)

如果需要插入多條記錄,可以使用游標的executemany()函數,該函數包含模板SQL 命令和一個格式化的參數列表,用於在一條事務中插入多條記錄:

args=[(1, 'John Smith', 'John Doe'),
     (2, 'Jane Doe', 'Joe Dog'),
     (3, 'Mike T.', 'Sarah H.')]

cursor.executemany("INSERT INTO persons VALUES (%d, %s, %s)", args )
conn.commit()

六,調用存儲過程

從pymssql 2.0.0開始,可以使用callproc()函數來執行存儲過程,callproc()函數的語法是:

result_args = cursor.callproc(proc_name, args=())

第一個參數是存儲過程的名稱,第二個參數args是一個元組類型,對於存儲過程的每一個參數,都需要傳遞值。對於OUT參數,也必須傳遞值,通常傳遞0。

callproc()函數返回的是輸入args的修改之后的副本,IN參數在result_args中不變,OUT參數在result_args中代表存儲過程輸出的值。

舉個例子,對於存儲add_num,有兩個IN參數,一個OUT參數:

CREATE PROCEDURE add_num(IN num1 INT, IN num2 INT, OUT sum INT)

調用callproc()函數的格式是:

result_args = (5, 6, 0) # 0 is to hold value of the OUT parameter sum
cursor.callproc('add_num', result_args)

以下示例代碼,使用上下文管理器來調用callproc()執行存儲過程:

with pymssql.connect(server, user, password, "tempdb") as conn:
    with conn.cursor(as_dict=True) as cursor:
        cursor.callproc('sp_name', ('arg1',))
        for row in cursor:
        print("ID=%d, Name=%s" % (row['id'], row['name']))

經過我的測試,我發現不管是使用callproc(),還是使用execute('exec sp_name'),pymssql都不能執行復雜的存儲過程,這讓人很是頭疼。

七,pymssql模塊的基本操作

 1,pymssql的基本操作

from os import getenv
import pymssql

server = getenv("PYMSSQL_TEST_SERVER")
user = getenv("PYMSSQL_TEST_USERNAME")
password = getenv("PYMSSQL_TEST_PASSWORD")

conn = pymssql.connect(server, user, password, "tempdb")
cursor = conn.cursor(as_dict=False)
cursor.execute("TSQL query")
cursor.executemany("INSERT INTO persons VALUES (%d, %s, %s)",
    [(1, 'John Smith', 'John Doe'),
     (2, 'Jane Doe', 'Joe Dog'),
     (3, 'Mike T.', 'Sarah H.')])
# you must call commit() to persist your data if you don't set autocommit to True
conn.commit()

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
row = cursor.fetchone()
while row:
    print("ID=%d, Name=%s" % (row[0], row[1]))
    row = cursor.fetchone()

conn.close()

2,以字典集返回數據行

conn = pymssql.connect(server, user, password, "tempdb")
cursor = conn.cursor(as_dict=True)

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
for row in cursor:
    print("ID=%d, Name=%s" % (row['id'], row['name']))

conn.close()

3,使用with語句

with是上下文管理器,可以自動關閉上下文。如果使用with語句來創建連接對象和Cursor對象,那么就不需要顯式的關閉連接和Cursor對象,在語句執行完成之后,Python會自動檢測連接對象和Cursor對象的作用域,一旦連接對象或Cursor對象不再有效,Python就會關閉連接或Cursor對象。

with pymssql.connect(server, user, password, "tempdb") as conn:
    with conn.cursor(as_dict=True) as cursor:
        cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
        for row in cursor:
            print("ID=%d, Name=%s" % (row['id'], row['name']))

八,附上代碼庫

附上代碼,以饗讀者。

import pymssql
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy.sql import text as sql_text

class DBHelper:
    def __init__(self):
        self.name='DB Helper'
        self.db_host = r'sql server'
        self.db_name = 'db name'
        self.db_user = r'sa' 
        self.db_password = r'pwd'

######################################################
##                   data connection                ##
######################################################

    def get_engine(self):
        str_format = 'mssql+pymssql://{0}:{1}@{2}/{3}?charset=utf8'
        connection_str = str_format.format(self.db_user,self.db_password,self.db_host,self.db_name)
        engine = create_engine(connection_str,echo=False)
        return engine

    def get_pymssql_conn(self):
        conn = pymssql.connect(self.db_host, self.db_user, self.db_password, self.db_name)
        return conn


######################################################
##                common SQL APIs                   ##
######################################################

    def write_data(self,df,destination,if_exists='append',schema='dbo'):
        engine = self.get_engine()
        df.to_sql(destination, con=engine, if_exists=if_exists,index = False, schema=schema
                  , method='multi', chunksize=1000)

    def read_data(self,sql):
        engine = self.get_engine()
        df = pd.read_sql(sql, con=engine)
        return df

    def exec_sql(self,sql):
        engine = self.get_engine()
        con = engine.connect()
        with con.begin() as tran: 
            con.execute(sql_text(sql).execution_options(autocommit=True))

    def exec_sp(self,sp_name,*paras):
        with pymssql.connect(self.db_host, self.db_user, self.db_password, database=self.db_name) as conn:
            with conn.cursor(as_dict=False) as cursor:
                try:
                    cursor.callproc(sp_name, paras)
                    cursor.nextset()
                    conn.commit()
                except Exception as e:
                    print(e)

    def exec_sp_result(self,sp_name,*paras):
        with pymssql.connect(self.db_host, self.db_user, self.db_password, database=self.db_name) as conn:
            with conn.cursor(as_dict=True) as cursor:
                try:
                    cursor.callproc(sp_name, paras)
                    cursor.nextset()
                    result=cursor.fetchall()

                    conn.commit()
                    df=pd.DataFrame.from_records(result)

                    return df
                except Exception as e:
                    print(e)
View Code

 

參考文檔:

pymssql introduction

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM