使用pymysql


安裝

pip3 install pymysql

 

連接、執行sql、關閉(游標)

import pymysql

mysql_connect_dict={
    'host':'127.0.0.1',
    'port':3306,
    'user':'yycenter',
    'password':'qwe123',
    'db':'testmysql',
    'charset':'utf8'
}
# 連接數據庫
# conn = pymysql.connect(**mysql_connect_dict)
# 指定以dict形式返回,默認以元祖形式
conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor)
print(conn)

# 創建游標
cursor = conn.cursor()

# 執行SQL,並返回收影響行數
effect_row = cursor.execute("select * from test1")
print(effect_row)  # 6條記錄


# 關閉游標
cursor.close()
# 關閉連接
conn.close()

  

execute之sql注入

注意:符號--會注釋掉它之后的sql,正確的語法:--后至少有一個任意字符

根本原理:就根據程序的字符串拼接name='%s',我們輸入一個xxx' -- haha,用我們輸入的xxx加'在程序中拼接成一個判斷條件name='xxx' -- haha'

 

sql='select * from userinfo where name="%s" and password="%s"' %(user,pwd) #注意%s需要加引號

#1、sql注入之:用戶存在,繞過密碼
egon' -- 任意字符

#2、sql注入之:用戶不存在,繞過用戶與密碼
xxx' or 1=1 -- 任意字符

  

 

  解決方法:

# 原來是我們對sql進行字符串拼接
# sql="select * from userinfo where name='%s' and password='%s'" %(user,pwd)
# print(sql)
# res=cursor.execute(sql)

#改寫為(execute幫我們做字符串拼接,我們無需且一定不能再為%s加引號了)
sql="select * from userinfo where name=%s and password=%s" #!!!注意%s需要去掉引號,因為pymysql會自動為我們加上
res=cursor.execute(sql,[user,pwd]) #pymysql模塊自動幫我們解決sql注入的問題,只要我們按照pymysql的規矩來。

  

 增、刪、改

conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor)
print(conn)

# 創建游標
cursor = conn.cursor()

# 執行SQL,並返回收影響行數
effect_row = cursor.execute("select * from test1")
print(effect_row)  # 6條記錄

# #執行sql語句
# #part1
sql="insert into test1(name,grade) values('egon',99)"
res=cursor.execute(sql)  # 執行sql語句,返回sql影響成功的行數
print(res) # 1
#
# #part2
sql='insert into test1(name,grade) values(%s,%s);'
res=cursor.execute(sql, ("alex",100)) # 執行sql語句,返回sql影響成功的行數
print(res)# 1
#
# # part3
sql = 'insert into test1(name,grade) values(%s,%s);'
res = cursor.executemany(sql,[('egon1',99),('egon2',88),('egon3',77)])  # 執行sql語句,返回sql影響成功的行數
print(res) # 3  執行sql語句,返回sql影響成功的行數
#
conn.commit()  # 提交后才發現表中插入記錄成功


# part4
sql = 'update test1 set grade = %s where name= %s;'
res = cursor.execute(sql,(66,'egon'))  # 執行sql語句,返回sql影響成功的行數
print(res) # 1  執行sql語句,返回sql影響成功的行數
conn.commit()  # 提交后才發現表中插入記錄成功


# part5
sql = 'delete  from test1  where name= %s;'
res = cursor.execute(sql,('egon2'))  # 執行sql語句,返回sql影響成功的行數
print(res) # 1  執行sql語句,返回sql影響成功的行數

conn.commit()  # 提交后才發現表中插入記錄成功


# 關閉游標
cursor.close()
# 關閉連接
conn.close()

  

查詢

# 指定以dict形式返回,默認以元祖形式
conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor)
print(conn)

# 創建游標
cursor = conn.cursor()

# 執行SQL,並返回收影響行數
effect_row = cursor.execute("select * from test1")
print(effect_row)  #

res1=cursor.fetchone() # 獲取剩余結果的第一行數據

res2=cursor.fetchmany(2) # 獲取剩余結果前n行數據

res3=cursor.fetchall() # 獲取剩余結果所有數據
print(res1)
#{'id': 1, 'name': 'aa', 'grade': 99}
print(res2)
#[{'id': 2, 'name': 'bb', 'grade': 55}, {'id': 3, 'name': 'cc', 'grade': 88}]
print(res3)
# [{'id': 9, 'name': 'y', 'grade': 44}, {'id': 10, 'name': 'y', 'grade': 99}, {'id': 11, 'name': 'y', 'grade': 55}]

res4=cursor.fetchall() # 獲取剩余結果所有數據
print(res4) # 空,數據已經取完

# 在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置
cursor.scroll(0, mode='absolute') # 相對絕對位置移動
res5=cursor.fetchmany(2) # 獲取剩余結果前2行數據
print(res5)
# [{'id': 1, 'name': 'aa', 'grade': 99}, {'id': 2, 'name': 'bb', 'grade': 55}]

cursor.scroll(2, mode='relative')  # 相對當前位置移動2條記錄,總記錄數還剩下2條
res6=cursor.fetchall() # 獲取剩余結果所有數據
print(res6)
# [{'id': 10, 'name': 'y', 'grade': 99}, {'id': 11, 'name': 'y', 'grade': 55}]
# 關閉游標
cursor.close()
# 關閉連接
conn.close()

 

獲取新創建數據自增ID

可以獲取到最新自增的ID,也就是最后插入的一條數據ID  

# conn = pymysql.connect(**mysql_connect_dict)
# 指定以dict形式返回,默認以元祖形式
conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor)
print(conn)

# 創建游標
cursor = conn.cursor()

# 執行SQL,並返回收影響行數
effect_row = cursor.execute("select * from test1")


sql = 'insert into test1(name,grade) values(%s,%s);'
res = cursor.executemany(sql,[('egon1',99),('egon2',88),('egon3',77)])  # 執行sql語句,返回sql影響成功的行數
print(res) # 3  執行sql語句,返回sql影響成功的行數

conn.commit()

#獲取自增id
new_id = cursor.lastrowid
print(new_id) #31
# 關閉游標
cursor.close()
# 關閉連接
conn.close()

  

 

調用存儲過程

調用無參存儲過程

# 指定以dict形式返回,默認以元祖形式
conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor)
print(conn)

# 創建游標
cursor = conn.cursor()

# 執行SQL,並返回收影響行數
effect_row = cursor.execute("select * from test1")

# 無參數存儲過程
cursor.callproc('p1')  # 等價於cursor.execute("call p1()")
# create procedure p1()
# BEGIN
#     INSERT into test1(name,grade) values('egon4',100);
# 		commit;
# END
row_1 = cursor.fetchone()
print(row_1)
conn.commit()
# 關閉游標
cursor.close()
# 關閉連接
conn.close()

b、調用有參存儲過程

# 指定以dict形式返回,默認以元祖形式
conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor)
print(conn)

# 創建游標
cursor = conn.cursor()

# 執行SQL,並返回收影響行數
effect_row = cursor.execute("select * from test1")

# 有參數存儲過程
cursor.callproc('p2',args=('egon6',99))  # 等價於cursor.execute("call p1()")
# delimiter //
# CREATE PROCEDURE p2(IN p_in1 VARCHAR(20),IN p_in2 int)
# BEGIN
#     INSERT into test1(name,grade) values(p_in1,p_in2);
# 		commit;
# END //
# delimiter ;
# 獲取執行完存儲的參數,參數@開頭
cursor.execute("select @_p2_0,@_p2_1;") #@p2_0代表第一個參數,@p2_1代表第二個參數,即返回值
row_1 = cursor.fetchone()
print(row_1)  # '@_p2_0': 'egon6', '@_p2_1': 99}
conn.commit()
# 關閉游標
cursor.close()
# 關閉連接
conn.close()

  

使用with簡化連接過程

每次都連接關閉很麻煩,使用上下文管理,簡化連接過程

import pymysql
import contextlib
# 定義上下文管理器,連接后自動關閉連接
@contextlib.contextmanager
def mysql():
    mysql_connect_dict={
        'host':'127.0.0.1',
        'port':3306,
        'user':'yycenter',
        'password':'qwe123',
        'db':'testmysql',
        'charset':'utf8'
    }
    # 連接數據庫
    # conn = pymysql.connect(**mysql_connect_dict)
    # 指定以dict形式返回,默認以元祖形式
    conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor)
    print(conn)

    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    try:
        yield cursor
    finally:
        conn.commit()
        cursor.close()
        conn.close()

# 執行sql
with mysql() as cursor:
    print(cursor)
    row_count = cursor.execute("select * from test1")
    row_1 = cursor.fetchone()
    print (row_count, row_1) # 8 {'id': 1, 'name': 'aa', 'grade': 99}

 

使用POOLDB連接mysql

 

import pymysql
from DBUtils.PooledDB import PooledDB
from utils.mylog_set import mylog
from utils.time_tool import run_time
from conf import settings


def escape_quotes(val):
    '轉換單引號和雙引號'
    if isinstance(val, str):
        return val.replace("'", "\\\'")
    return val


def stitch_sequence(seq=None, is_field=True, suf=None):
    '''
    序列拼接方法, 用於將序列拼接成字符串
    - :seq: 拼接序列
    - :suf: 拼接后綴(默認使用 ",")
    - :is_field: 是否為數據庫字段序列
    '''
    if seq is None:
        raise Exception("Parameter seq is None")
    suf = suf or ","
    res = str()
    for item in seq:
        res += '`%s`%s' % (item, suf) if is_field else '%s%s' % (item, suf)
    return res[:-len(suf)]


class MysqlUtil(object):
    """
       簡便的數據庫操作
       初始化參數如下:
       - :creator: 創建連接對象(默認: pymysql)
       - :host: 連接數據庫主機地址(默認: localhost)
       - :port: 連接數據庫端口(默認: 3306)
       - :user: 連接數據庫用戶名(默認: None), 如果為空,則會拋異常
       - :password: 連接數據庫密碼(默認: None), 如果為空,則會拋異常
       - :database: 連接數據庫(默認: None), 如果為空,則會拋異常
       - :chatset: 編碼(默認: utf8)
       初始化該數據庫下所有表的信息
       """

    def __init__(self, creator=pymysql, host=settings.mysqldb.get("host"), port=3306, user=settings.mysqldb.get("user"), password=settings.mysqldb.get("password"),
                 database=settings.mysqldb.get("database"), charset="utf8"):
        if host is None:
            raise ValueError("Parameter [host] is None.")
        if port is None:
            raise ValueError("Parameter [port] is None.")
        if user is None:
            raise ValueError("Parameter [user] is None.")
        if password is None:
            raise ValueError("Parameter [password] is None.")
        if database is None:
            raise ValueError("Parameter [database] is None.")
        self.logger = mylog
        # 執行初始化
        self._config = dict({
            "creator": creator, "charset": charset, "host": host, "port": port,
            "user": user, "password": password, "database": database
        })
        self._database = database
        self._table = None
        self._pool = None
        self._init_connect()
        self._init_params()

    def __del__(self):
        '重寫類被清除時調用的方法'
        if self._cursor:
            self._cursor.close()
        if self._conn:
            self._conn.close()

    def commit(self):
        # 提交
        self._conn.commit()

    def rollback(self):
        # 回滾
        self._conn.rollback()

    def _init_connect(self):
        '初始化連接'
        try:
            if self._pool is None:
                self._pool = PooledDB(
                    **self._config,
                    mincached=5,  # 啟動時開啟的閑置連接數量(缺省值 0 以為着開始時不創建連接)
                    maxcached=20,  # 連接池中允許的閑置的最多連接數量(缺省值 0 代表不閑置連接池大小)
                    maxshared=20, # 共享連接數允許的最大數量(缺省值 0 代表所有連接都是專用的)如果達到了最大數量,被請求為共享的連接將會被共享使用
                    maxusage=100) # 單個連接的最大允許復用次數(缺省值 0 或 False 代表不限制的復用).當達到最大數時,連接會自動重新連接(關閉和重新打開)
            # 獲得連接池
            self._conn = self._pool.connection()
            # 建立連接
            self._cursor = self._conn.cursor(
                cursor=pymysql.cursors.DictCursor)  # 使用字典方式
            self.logger.info("[{0}] 數據庫初始化成功。".format(self._database))
        except Exception as e:
            self.logger.info.error(e)

    @run_time
    def execute_query(self, sql=None, args=(), single=False):
        '''執行查詢 SQL 語句
        - :sql: sql 語句
        - :single: 是否查詢單個結果集,默認False
        '''
        try:
            if sql is None:
                raise Exception("Parameter sql is None.")
            self.logger.debug(
                "[{}] SQL >>> [{}] args =[{}]" .format(
                    self._database, sql, args))
            self._cursor.execute(sql, args)
            return self._cursor.fetchone() if single else self._cursor.fetchall()

        except Exception as e:
            self.logger.error(e)

    @run_time
    def execute_update(self, sql=None, args=()):
        '''執行更新 SQL 語句
        - :sql: sql 語句
        '''
        try:
            if sql is None:
                raise Exception("Parameter sql is None.")
            self.logger.debug("[%s] SQL >>> [%s]" % (self._database, sql))
            result = self._cursor.execute(sql, args)
            return result
        except Exception as e:
            self.logger.error(e)
            self._conn.rollback()

    def _init_params(self):
        '初始化參數'
        self._table_dict = {}
        self._information_schema_columns = []
        self._table_column_dict_list = {}
        self._init_table_dict_list()
        self._init_table_column_dict_list()

    def _init_information_schema_columns(self):
        "查詢 information_schema.`COLUMNS` 中的列"
        sql = """   SELECT COLUMN_NAME
                    FROM information_schema.`COLUMNS`
                    WHERE TABLE_SCHEMA='information_schema' AND TABLE_NAME='COLUMNS'
                """
        result_list = self.execute_query(sql)
        # self.logger.debug(result_list)
        column_list = [r['COLUMN_NAME'] for r in result_list]
        self.logger.debug('column_list:>>> {}.'.format(column_list))
        self._information_schema_columns = column_list

    def _init_table_dict(self, table_name):
        '初始化表'
        if not self._information_schema_columns:
            self._init_information_schema_columns()
        stitch_str = stitch_sequence(self._information_schema_columns)
        sql = """   SELECT %s FROM information_schema.`COLUMNS`
                    WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'
                """ % (stitch_str, self._database, table_name)
        column_list = self.execute_query(sql)
        column_dict = {}
        for column in column_list:
            column_dict[column["COLUMN_NAME"]] = column

        self._table_dict[table_name] = column_dict

    def _init_table_dict_list(self):
        "初始化表字典對象"

        sql = "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA='%s'" % (
            self._database)
        table_list = self.execute_query(sql)
        self._table_dict = {t['TABLE_NAME']: {} for t in table_list}
        self.logger.debug('_table_dicts {}'.format(self._table_dict))
        for table in [t['TABLE_NAME'] for t in table_list]:
            self._init_table_dict(table)

    def _init_table_column_dict_list(self):
        '''初始化表字段字典列表'''
        """
        example:{'test1': ['id', 'name', 'grade']
        """
        for table, column_dict in self._table_dict.items():
            column_list = [column for column in column_dict.keys()]
            self._table_column_dict_list[table] = column_list
        self.logger.debug(
            "table_dict info: {}".format(
                self._table_column_dict_list))

    # 根據表自動創建參數字典
    def create_params(self, table, args={}):
        col_list = self._table_column_dict_list[table]
        params = {}
        for k in col_list:
            if args.__contains__(k):
                params[k] = args[k]
        return params

    def _parse_result(self, result):
        '用於解析單個查詢結果,返回字典對象'
        if result is None:
            return None
        obj = {key: value for key, value in zip(self._column_list, result)}
        return obj

    def _parse_results(self, results):
        '用於解析多個查詢結果,返回字典列表對象'
        if results is None:
            return None
        objs = [self._parse_result(result) for result in results]
        return objs

    def _get_primary_key(self, table_name):
        '獲取表對應的主鍵字段'
        if self._table_dict.get(table_name) is None:
            raise Exception(table_name, "is not exist.")
        for column, column_dict in self._table_dict[table_name].items():
            if column_dict["COLUMN_KEY"] == "PRI":
                return column

    def _get_table_column_list(self, table_name=None):
        '查詢表的字段列表, 將查詢出來的字段列表存入 __fields 中'
        return self._table_column_dict_list[table_name]

    def _check_table_name(self, table_name):
        '''驗證 table_name 參數'''
        if table_name is None:
            raise Exception("Parameter [table_name] is None.")
        else:
            self._table = table_name
            self._column_list = self._table_column_dict_list[self._table]

    def count(self, table_name=None):
        '''統計記錄數'''
        self._check_table_name(table_name)
        sql = "SELECT count(*) FROM %s" % (self._table)
        result = self.execute_query(sql, True)
        return result[0]

    @run_time
    def executemany(self, sql, args):
        # 批量執行
        try:
            self.logger.debug('executemany sql:{}'.format(sql))
            return self.cursor.executemany(sql, args)
        except Exception as e:
            self.close()
            raise e

    # 執行sql,參數一:table,參數二:查詢列'col1,col2' 參數三:參數字典{'字段1':'值1','字段2':'值2'}
    def queryByTable(self, table, col='*', cond_dict={}):
        # self.execute(sql, args)
        self._check_table_name(table)
        cond_dict = self.create_params(table, cond_dict)
        cond_stmt = ' and '.join(['%s=%%s' % k for k in cond_dict.keys()])
        # del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'
        if not cond_dict:
            query_sql = 'select %(col)s FROM %(table)s'
        else:
            query_sql = 'select %(col)s FROM %(table)s where %(cond_stmt)s'

    # 執行sql,參數一:sql
    def queryBySql(self, query_sql):
        # self.execute(sql, args)
        '''驗證 table_name 參數'''
        if query_sql is None:
            raise Exception("Parameter [query_sql] is None.")
        return self.execute_query(query_sql)

    def insertByTable(self, table_name=None, obj={}):
        '''保存方法
        - @param table_name 表名
        - @param obj 對象
        - @return 影響行數
        ("test1", {'name': 'x', 'grade': 99}
        '''
        self._check_table_name(table_name)
        if obj is None:
            obj = {}
        primary_key = self._get_primary_key(self._table)
        if primary_key not in obj.keys():
            obj[primary_key] = None
        stitch_str = stitch_sequence(obj.keys())
        # print(stitch_str)
        value_list = []
        for key, value in obj.items():
            if self._table_dict[self._table][key]["COLUMN_KEY"] != "PKI":
                value = "null" if value is None else '"%s"' % value
            value_list.append(escape_quotes(value))
        stitch_value_str = stitch_sequence(value_list, False)

        sql = 'INSERT INTO `%s` (%s) VALUES(%s)' % (
            self._table, stitch_str, stitch_value_str)
        return self.execute_update(sql)

    def deleteByTable(self, table_name=None, cond_dict={}):
        '''刪除
        - @param table_name 表名
        - @param  cond_dict = {}: #參數二:用於where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}
        - @return 影響行數
        '''
        self._check_table_name(table_name)

        cond_dict = self.create_params(table_name, cond_dict)
        cond_stmt = ' and '.join(["%s=%%s" % (k) for k in cond_dict.keys()])
        # del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'
        if not cond_dict:
            del_sql = 'DELETE FROM %(table_name)s'
        else:
            del_sql = 'DELETE FROM %(table_name)s where %(cond_stmt)s'

        return self.execute_update(
            del_sql % locals(), tuple(cond_dict.values()))

    def updateByTable(self, table, column_dict={}, cond_dict={}):
        # 更新,參數一:表名,參數二用於set 字段1=值1,字段2=值2...格式:{'字段1':'值1','字段2':'值2'},
        # 參數三:用於where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}
        self._check_table_name(table)

        column_dict = self.create_params(table, column_dict)
        cond_dict = self.create_params(table, cond_dict)
        set_stmt = ','.join(['%s=%%s' % k for k in column_dict.keys()])
        cond_stmt = ' and '.join(['%s=%%s' % k for k in cond_dict.keys()])
        if not cond_dict:
            upd_sql = 'UPDATE %(table)s set %(set_stmt)s'
        else:
            upd_sql = 'UPDATE %(table)s set %(set_stmt)s where %(cond_stmt)s'
        args = tuple(column_dict.values()) + tuple(cond_dict.values())  # 合並成1個
        return self.execute_update(upd_sql % locals(), args)


class Page(object):
    '分頁對象'

    def __init__(self, page_num=1, page_size=10, count=False):
        '''
        Page 初始化方法
        - @param page_num 頁碼,默認為1
        - @param page_size 頁面大小, 默認為10
        - @param count 是否包含 count 查詢
        '''
        # 當前頁數
        self.page_num = page_num if page_num > 0 else 1
        # 分頁大小
        self.page_size = page_size if page_size > 0 else 10
        # 總記錄數
        self.total = 0
        # 總頁數
        self.pages = 1
        # 起始行(用於 mysql 分頁查詢)
        self.start_row = (self.page_num - 1) * self.page_size
        # 結束行(用於 mysql 分頁查詢)
        self.end_row = self.start_row + self.page_size


if __name__ == '__main__':
    sql1 = MysqlUtil()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM