python操作mysql之pymysql


前言

①pymsql是Python中操作MySQL的模塊,其使用方法和MySQLdb幾乎相同。但目前pymysql支持python3.x而后者不支持3.x版本。

②程序在運行時,數據都是在內存中的。當程序終止時,通常需要將數據保存在磁盤上。

使用詳解

1、使用 connect 函數創建連接對象,此連接對象提供關閉數據庫、事務回滾等操作。

connect = pymysql.connect(host='127.0.0.1',
                     user='root',
                     password='pwd',
                     database='database_name')

connect連接對象的常見方法:


2、操作數據庫主要使用 cursor 游標對象,對數據庫進行操作:

cursor游標對象的常見方法:

3、查詢場景

查詢主要獲取結果集,注意fetchone 得到的是元組, fetchmany(size),fetchall()得到的是元組的嵌套。

注意:如果既要使用python的字符串解析%,sql語句中又包含sql的模糊查詢占位符%,模糊查詢使用%%即可,這樣sql語句不會報錯。

import pymysql

# 創建數據庫連接對象
connect = pymysql.connect(host='127.0.0.1',
                          user='root',
                          password='pwd',
                          database='database_name')
# 創建游標對象
cursor = connect.cursor()

table_name = 'map_point'
sql = "SELECT * FROM %s WHERE username LIKE 'DL%%' " % table_name
try:
    cursor.execute(sql)  # 執行sql語句,也可執行數據庫命令,如:show tables
    result = cursor.fetchall()  # 所有結果
    print(result)
except Exception as e:
    connect.rollback()
    print("查詢失敗")
    print(e)
finally:
    cursor.close()  # 關閉當前游標
    connect.close()  # 關閉數據庫連接

注意:

循環查詢:
在一些場景中,需要進行循環查詢,判斷,此時在執行完execute查詢語句后,務必進行commit提交,否則每次查詢到的數據都是之前查詢結果的快照,也就是舊數據。
或者采用第二種方案,創建connect連接對象時,增添 autocommit=True 屬性,自動進行commit事務提交。

4、增、刪、改場景

增刪改需要有提交事務的操作,查不需要提交事務,但如果循環查詢,務必提交事務,否則結果都是重復的。

創建連接對象——connect = pymysql()

創建cursor游標對象——cursor = connect.cursor()

數據操作——cursor.execute(sql)

提交連接事務——connect.commit()

關閉cursor游標對象——cursor.close()

關閉連接對象——connect.close()

數據update操作

import pymysql

# 創建數據庫連接對象
connect = pymysql.connect(host='127.0.0.1',
                          user='root',
                          password='pwd',
                          database='database_name')
# 創建游標對象
cursor = connect.cursor()

table_name = 'table_name '
user_id = 'yyy'
user_no = 'xxx'
sql = "UPDATE %s SET user_no = '%s' WHERE user_id = '%s'" % (table_name, user_no, user_id)
try:
    cursor.execute(sql)  # 執行sql語句,也可執行數據庫命令,如:show tables
    connect.commit()  # 增刪改,必須執行事務
    print("數據更新成功")
except Exception as e:
    connect.rollback()  # 若出現失敗,進行回滾
    print("數據更新失敗")
    print(e)
finally:
    cursor.close()  # 關閉當前游標
    connect.close()  # 關閉數據庫連接

數據循環update操作

import pymysql

connect = pymysql.connect(host='127.0.0.1',
                          user='root',
                          password='pwd',
                          database='database_name')

table_name = 'table_name'
update_list = ['xxx2', 'xxxx3']
condition_list = ['xxx', 'xxx1']
# 條件集合,更新集合長度相等時可使用,可根據其他情況重新編寫
cursor = connect.cursor()
for i in range(len(condition_list)):
    sql = "UPDATE %s SET user_no = '%s' WHERE user_id = '%s'" % (table_name, update_list[i],
                                                                 condition_list[i])
    print('執行sql語句:' + sql)
    try:
        cursor.execute(sql)
        connect.commit()
        print("數據更新成功" + str(i + 1) + '')
    except Exception as e:
        connect.rollback()
        print("數據更新失敗")
        print(e)
cursor.close()
connect.close()

使用executemany(query,  param) 批量更新

  • params參數為每條記錄的維度,可為嵌套數組和元組。
  • sql語句中需要更改的數據不管什么類型,統一使用%s作為占位符,不需要加引號。

# 創建數據庫連接對象
import pymysql

connect = pymysql.connect(host='127.0.0.1',
                          user='root',
                          password='pwd',
                          database='database_name')
# 創建游標對象
cursor = connect.cursor()
update_list = ['a', 'b']
condition_list = ['a1', 'b1']
# 條件數組和需要更新的數據數組是分開的,因為param需要以每條的維度,所以可以使用拉鏈函數合並
params = zip(update_list, condition_list) # 或 param = [['a', 'a1'], ['b', 'b1']]
sql = "UPDATE map_point SET storageLocation_no = %s WHERE position_id = %s"  # 注意 這里的 占位%s 都不用加引號
try:
    cursor.executemany(sql, params)  # 執行sql語句
    connect.commit()  # 執行事務
    print("數據批量更新成功")
except Exception as e:
    connect.rollback()
    print("數據更新失敗")
    print(e)
finally:
    cursor.close()  # 關閉當前游標
    connect.close()  # 關閉數據庫連接

數據insert操作

數據庫創建表EMPLOYEE:

import pymysql

# 打開數據庫連接
connect = pymysql.connect(host='localhost',
                          user='root',
                          password='xxxxxx',
                          database='text')

# 使用 cursor() 方法創建一個游標對象 cursor
cursor = connect.cursor()

# 使用預處理語句創建表
sql = """CREATE TABLE EMPLOYEE (
         FIRST_NAME  CHAR(20) NOT NULL,
         LAST_NAME  CHAR(20),
         AGE INT,  
         SEX CHAR(1),
         INCOME FLOAT )"""

# 執行sql語句
cursor.execute(sql)

# 關閉不使用的游標對象
cursor.close()
# 關閉數據庫連接
connect.close()

向表EMPLOYEE插入數據:

import pymysql

# 打開數據庫連接
connect = pymysql.connect(host='localhost',
                          user='root',
                          password='xxxxxx',
                          database='text')

# 使用 cursor() 方法創建一個游標對象 cursor
cursor = connect.cursor()

# SQL 插入語句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
         LAST_NAME, AGE, SEX, INCOME)
         VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
    # 執行sql語句
    cursor.execute(sql)
    # 提交到數據庫執行
    connect.commit()
except Exception as e:
    # 如果發生錯誤則回滾
    connect.rollback()
    print("數據插入失敗")
    print(e)
finally:
    # 關閉不使用的游標對象
    cursor.close()
    # 關閉數據庫連接
    connect.close()

使用變量傳值的方式插入數據

注意:sql語句中數字類型使用%s或者%d,字符串類型使用'%s'。
import pymysql

# 打開數據庫連接
connect = pymysql.connect(host='localhost',
                          user='root',
                          password='245074472',
                          database='text')

# 使用 cursor() 方法創建一個游標對象 cursor
cursor = connect.cursor()

first_name = 'dahe'
last_name = 'dahezhiquan'
age = 22
sex = 'M'
income = 18000

# SQL 插入語句
sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \
       LAST_NAME, AGE, SEX, INCOME) \
       VALUES ('%s', '%s',  %s,  '%s',  %s)" % \
      (first_name, last_name, age, sex, income)
try:
    # 執行sql語句
    cursor.execute(sql)
    # 提交到數據庫執行
    connect.commit()
except Exception as e:
    # 如果發生錯誤則回滾
    connect.rollback()
    print("數據插入失敗")
    print(e)
finally:
    # 關閉不使用的游標對象
    cursor.close()
    # 關閉數據庫連接
    connect.close()

數據delete操作

import pymysql

# 打開數據庫連接

connect = pymysql.connect(host='localhost',
                          user='root',
                          password='xxxxxx',
                          database='text')

# 使用 cursor() 方法創建一個游標對象 cursor
cursor = connect.cursor()

# SQL 刪除語句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (22)

try:
    # 執行sql語句
    cursor.execute(sql)
    # 提交到數據庫執行
    connect.commit()
except Exception as e:
    # 如果發生錯誤則回滾
    connect.rollback()
    print("數據刪除失敗")
    print(e)
finally:
    # 關閉不使用的游標對象
    cursor.close()
    # 關閉數據庫連接
    connect.close()

使用實例

一、安裝

pip install pymysql

二、使用操作

#!/usr/bin/env pytho
# -*- coding:utf-8 -*-
import pymysql
  
# 創建連接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1', charset='utf8')

# 創建游標
cursor = conn.cursor()
  
# 執行SQL,並返回收影響行數
effect_row = cursor.execute("select * from tb7")
  
# 執行SQL,並返回受影響行數
effect_row = cursor.execute("update tb7 set pass = '123' where nid = %s", (11,))
  
# 執行SQL,並返回受影響行數,執行多次
effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u1","u1pass","11111"),("u2","u2pass","22222")])
  
  
# 提交,不然無法保存新建或者修改的數據
conn.commit()
  
# 關閉游標
cursor.close()
# 關閉連接
conn.close()

注意:存在中文的時候,連接需要添加charset='utf8',否則中文顯示亂碼。

2、獲取查詢數據

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
cursor.execute("select * from tb7")

# 獲取剩余結果的第一行數據
row_1 = cursor.fetchone()
print row_1
# 獲取剩余結果前n行數據
row_2 = cursor.fetchmany(3)

# 獲取剩余結果所有數據
row_3 = cursor.fetchall()

conn.commit()
cursor.close()
conn.close()

3、獲取新創建數據自增ID

可以獲取到最新自增的ID,也就是最后插入的一條數據ID

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u3","u3pass","11113"),("u4","u4pass","22224")])
conn.commit()
cursor.close()
conn.close()
#獲取自增id new_id = cursor.lastrowid print new_id

4、移動游標

操作都是靠游標,那對游標的控制也是必須的

注:在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:

cursor.scroll(1,mode='relative')  # 相對當前位置移動
cursor.scroll(2,mode='absolute')  # 相對絕對位置移動

5、fetch數據類型

關於默認獲取的數據是元祖類型,如果想要或者字典類型的數據,即:

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
#游標設置為字典類型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute("select * from tb7")

row_1 = cursor.fetchone()
print row_1  #{u'licnese': 213, u'user': '123', u'nid': 10, u'pass': '213'}

conn.commit()
cursor.close()
conn.close()

6、調用存儲過程

a、調用無參存儲過程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
#游標設置為字典類型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
#無參數存儲過程
cursor.callproc('p2')    #等價於cursor.execute("call p2()")

row_1 = cursor.fetchone()
print row_1


conn.commit()
cursor.close()
conn.close()

b、調用有參存儲過程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

cursor.callproc('p1', args=(1, 22, 3, 4))
#獲取執行完存儲的參數,參數@開頭
cursor.execute("select @p1,@_p1_1,@_p1_2,@_p1_3")   #{u'@_p1_1': 22, u'@p1': None, u'@_p1_2': 103, u'@_p1_3': 24}
row_1 = cursor.fetchone()
print row_1


conn.commit()
cursor.close()
conn.close()

三、關於pymysql防注入

 1、字符串拼接查詢,造成注入

正常查詢語句:

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
user="u1"
passwd="u1pass"

#正常構造語句的情況
sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)
#或者
#sql=select user,pass from tb7 where user='u1' and pass='u1pass'
row_count=cursor.execute(sql) row_1 = cursor.fetchone() print row_count,row_1 conn.commit() cursor.close() conn.close()

構造注入語句:

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()

user="u1' or '1'-- "
passwd="u1pass"
sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)

#拼接語句被構造成下面這樣,永真條件,此時就注入成功了。因此要避免這種情況需使用pymysql提供的參數化查詢。
#select user,pass from tb7 where user='u1' or '1'-- ' and pass='u1pass'

row_count=cursor.execute(sql)
row_1 = cursor.fetchone()
print row_count,row_1


conn.commit()
cursor.close()
conn.close()

 2、避免注入,使用pymysql提供的參數化語句

正常參數化查詢

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
user="u1"
passwd="u1pass"
#執行參數化查詢
row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))
row_1 = cursor.fetchone()
print row_count,row_1

conn.commit()
cursor.close()
conn.close()

構造注入,參數化查詢注入失敗。

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()

user="u1' or '1'-- "
passwd="u1pass"
#執行參數化查詢
row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))
#內部執行參數化生成的SQL語句,對特殊字符進行了加\轉義,避免注入語句生成。
# sql=cursor.mogrify("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))
# print sql
#select user,pass from tb7 where user='u1\' or \'1\'-- ' and pass='u1pass'被轉義的語句。

row_1 = cursor.fetchone()
print row_count,row_1

conn.commit()
cursor.close()
conn.close()

 結論:excute執行SQL語句的時候,必須使用參數化的方式,否則必然產生SQL注入漏洞。

3、使用存mysql存儲過程動態執行SQL防注入

使用MYSQL存儲過程自動提供防注入,動態傳入SQL到存儲過程執行語句。

定義動態調用sql的存儲過程:

delimiter \\
DROP PROCEDURE IF EXISTS proc_sql \\
CREATE PROCEDURE proc_sql (
    in nid1 INT,
    in nid2 INT,
    in callsql VARCHAR(255)
    )
BEGIN
    set @nid1 = nid1;
    set @nid2 = nid2;
    set @callsql = callsql;
        PREPARE myprod FROM @callsql;
--     PREPARE prod FROM 'select * from tb2 where nid>? and  nid<?';    傳入的值為字符串,?為占位符
--     用@p1,和@p2填充占位符
        EXECUTE myprod USING @nid1,@nid2;
    DEALLOCATE prepare myprod; 

END\\
delimiter ;

定義動態調用sql的存儲過程

在sql中調用:

set @nid1=12;
set @nid2=15;
set @callsql = 'select * from tb7 where nid>? and nid<?';
CALL proc_sql(@nid1,@nid2,@callsql)

pymsql中調用 :

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
mysql="select * from tb7 where nid>? and nid<?"
cursor.callproc('proc_sql', args=(11, 15, mysql))

rows = cursor.fetchall()
print rows  #((12, 'u1', 'u1pass', 11111), (13, 'u2', 'u2pass', 22222), (14, 'u3', 'u3pass', 11113))
conn.commit()
cursor.close()
conn.close()

四、使用with簡化連接過程

每次都連接關閉很麻煩,使用上下文管理,簡化連接過程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"

import pymysql
import contextlib
#定義上下文管理器,連接后自動關閉連接
@contextlib.contextmanager
def mysql(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1',charset='utf8'):
    conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    try:
        yield cursor
    finally:
        conn.commit()
        cursor.close()
        conn.close()

# 執行sql
with mysql() as cursor:
   print(cursor)
   row_count = cursor.execute("select * from tb7")
   row_1 = cursor.fetchone()
   print row_count, row_1

五、封裝pymysql連接

a.封裝函數執行sql語句獲取結果

# encoding=utf-8
import traceback

from pymysql import *


class MysqlHelper:
    def __init__(self, host='10HW173532', user='finmsg_dev', password='zxcZXC$12', db='msgrule', charset='utf8',
                 connect_timeout=120):  # 注意這里有默認值的變量一定要放在沒有默認值變量的后面
        self.host = host
        self.user = user
        self.password = password
        self.db = db
        self.charset = charset
        self.connect_timeout = connect_timeout

    def open(self):
        self.conn = connect(host=self.host, user=self.user, password=self.password, db=self.db, charset=self.charset,
                            connect_timeout=self.connect_timeout)
        self.cursor = self.conn.cursor()
        print("數據庫已連接")

    def close(self):
        self.cursor.close()
        self.conn.close()
        print("數據庫已關閉")

    def rollback(self):
        self.conn.rollback()
        self.cursor.close()
        self.conn.close()
        print("數據庫執行回滾操作")

    def search_one(self, sql, params=None):
        try:
            self.open()
            self.cursor.execute(sql, params)
            result = self.cursor.fetchone()
            print("查詢成功")
            self.close()
            return result
        except Exception as e:
            print(e)
            print("查詢失敗")

    def search_all(self, sql, params=None):  # 查詢獲取多個值
        try:
            self.open()
            self.cursor.execute(sql, params)
            result = self.cursor.fetchall()
            print("查詢成功")
            self.close()
            return result
        except Exception as e:
            print(e)
            return "查詢失敗"

    def insert(self, sql, params=None):  # 添加數據
        try:
            self.open()
            self.cursor.execute(sql, params)
            try:
                self.conn.commit()
                print('添加成功')
                self.close()
                return "添加成功"
            except:
                self.rollback()
                return "添加失敗"
        except Exception as e:
            print(e)
            return "添加失敗"

    def update(self, sql, params=None):  # 修改數據
        try:
            self.open()
            self.cursor.execute(sql, params)
            try:
                self.conn.commit()
                print('修改成功')
                self.close()
                return "修改成功"
            except:
                self.rollback()
                print("修改失敗")
                return "修改失敗"
        except Exception as e:
            print(e)
            return "修改失敗"

    def delete(self, sql, params=None):  # 刪除數據
        try:
            self.open()
            self.cursor.execute(sql, params)
            try:
                self.conn.commit()
                print('刪除成功')
                self.close()
                return "刪除成功"
            except:
                self.rollback()
                print('刪除失敗')
                return "刪除失敗"
        except Exception as e:
            print(e)
            return "刪除失敗"

    def is_connected(self):  # 檢查數據庫是否處於連接中
        """Check if the server is alive"""
        try:
            self.conn.ping(reconnect=True)
            print("db is connecting")
        except:
            traceback.print_exc()
            self.conn = self.open()
            print("db reconnect")

b.封裝函數獲取數據庫中的對象

"""
    作用:完成數據庫相關工具類封裝
    分析:
        1. 主要方法
            get_sql_one(sql)
            get_sql_all(sql)
        2. 輔助方法
            1. 獲取連接對象
            2. 獲取游標對象
            3. 關閉游標對象方法
            4. 關閉連接對象方法
"""
from config.project import DataBase
from tools.read_config import ReadConfig
import pymysql


# 新建工具類 數據庫
class ReadDB:
    # 定義連接對象 類方法
    conn = None

    def __init__(self, database):  # 構造函數-->注意這里有默認值的變量一定要放在沒有默認值變量的后面
        self.host = ReadConfig().read_config(section=database, option="host")
        # 從config.ini配置文件讀取出來的option為字符串類型,這里需要將port強轉為int類型
        self.port = int(ReadConfig().read_config(section=database, option="port"))
        self.user = ReadConfig().read_config(section=database, option="user")
        self.password = ReadConfig().read_config(section=database, option="password")
        self.db_name = ReadConfig().read_config(section=database, option="db_name")
        self.charset = 'utf8'
        self.connect_timeout = 120

    # 獲取連對象方法封裝
    def get_conn(self):
        if self.conn is None:
            self.conn = pymysql.connect(host=self.host,
                                        port=self.port,
                                        user=self.user,
                                        password=self.password,
                                        db=self.db_name,
                                        charset=self.charset,
                                        connect_timeout=self.connect_timeout)
        print('數據庫連接成功')
        # 返回連接對象
        return self.conn

    # 獲取游標對象方法封裝
    def get_cursor(self):
        return self.get_conn().cursor()

    # 將游標設置為字典類型
    def get_dict_cursor(self):
        return self.get_conn().cursor(cursor=pymysql.cursors.DictCursor)

    # 關閉游標對象方法封裝
    @staticmethod
    def close_cursor(cursor):
        if cursor:
            cursor.close()

    # 關閉連接對象方法封裝
    def close_conn(self):
        if self.conn:
            self.conn.close()
            # 注意:關閉連接對象后,對象還存在內存中,需要手工設置為None
            self.conn = None
            print("數據庫關閉成功")

    # 主要 執行方法 獲取單條結果 -> 在外界調用次方法就可以完成數據相應的操作【獲取的數據庫單條數據的格式為python元組類型】
    def get_sql_one(self, sql_statement):
        # 定義游標對象及數據變量
        cursor = None
        data = None
        try:
            # 獲取游標對象
            cursor = self.get_cursor()
            # 調用執行方法
            cursor.execute(sql_statement)
            # 獲取結果
            data = cursor.fetchone()
        except Exception as e:
            print("get_sql_one error:", e)
        finally:
            # 關閉游標對象
            self.close_cursor(cursor)
            # 關閉連接對象
            self.close_conn()
            # 返回執行結果
            return data

        # 主要 執行方法 獲取單條結果 -> 在外界調用次方法就可以完成數據相應的操作【獲取的數據庫單條數據的格式為python字典類型】

    def get_sql_byDict(self, sql_statement):
        # 定義游標對象及數據變量
        cursor = None
        data = None
        try:
            # 獲取游標對象
            cursor = self.get_dict_cursor()
            # 調用執行方法
            cursor.execute(sql_statement)
            # 獲取結果
            data = cursor.fetchone()
        except Exception as e:
            print("get_sql_one error:", e)
        finally:
            # 關閉游標對象
            self.close_cursor(cursor)
            # 關閉連接對象
            self.close_conn()
            # 返回執行結果
            return data

    # 獲取指定條數的數據庫結果集【獲取的數據庫多條數據集格式為元組嵌套元組】
    def get_sql_many(self, sql_statement, n):
        # 定義游標對象及數據變量
        cursor = None
        data = None
        try:
            # 獲取游標對象
            cursor = self.get_cursor()
            # 調用執行方法
            cursor.execute(sql_statement)
            # 獲取結果
            data = cursor.fetchmany(n)
        except Exception as e:
            print("get_sql_one error:", e)
        finally:
            # 關閉游標對象
            self.close_cursor(cursor)
            # 關閉連接對象
            self.close_conn()
            # 返回執行結果
            return data

    # 獲取 所有數據庫結果集
    def get_sql_all(self, sql_statement):
        # 定義游標對象及數據變量
        cursor = None
        data = None
        try:
            # 獲取游標對象
            cursor = self.get_cursor()
            # 調用執行方法
            cursor.execute(sql_statement)
            # 獲取所有結果
            data = cursor.fetchall()
        except Exception as e:
            print("get_sql_all error:", e)
        finally:
            # 關閉游標對象
            self.close_cursor(cursor)
            # 關閉連接對象
            self.close_conn()
            # 返回執行結果
            return data

    # 修改、刪除、新增
    def update_sql(self, sql_statement):
        # 定義游標對象
        cursor = None
        effect_row = None
        try:
            # 獲取游標對象
            cursor = self.get_cursor()
            # 調用執行方法,返回受影響行數
            effect_row = cursor.execute(sql_statement)
            # 提交事務
            self.conn.commit()
        except Exception as e:
            # 事務回滾
            self.conn.rollback()
            print("get_sql_one error:", e)
        finally:
            # 關閉游標對象
            self.close_cursor(cursor)
            # 關閉連接對象
            self.close_conn()
            # 返回受影響行數
            return effect_row

    def is_connected(self):  # 檢查數據庫是否處於連接中
        """Check if the server is alive"""
        try:
            self.conn.ping(reconnect=True)
            print("db is connecting")
        except Exception as error:
            print("db connect error:%s" % error)
            self.get_conn()
            self.get_dict_cursor()
            print("db reconnect")


if __name__ == '__main__':
    sql = "SELECT * FROM bomt_clue_db.bo_clue_orig order by CREATE_TIME desc limit 1 "
    print(ReadDB(database=DataBase.LeaveInformation).get_sql_byDict(sql))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM