安裝
pip3 install pymysql
連接、執行sql、關閉(游標)
import pymysql mysql_connect_dict={ 'host':'127.0.0.1', 'port':3306, 'user':'yycenter', 'password':'qwe123', 'db':'testmysql', 'charset':'utf8' } # 連接數據庫 # conn = pymysql.connect(**mysql_connect_dict) # 指定以dict形式返回,默認以元祖形式 conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor) print(conn) # 創建游標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("select * from test1") print(effect_row) # 6條記錄 # 關閉游標 cursor.close() # 關閉連接 conn.close()
execute之sql注入
注意:符號--會注釋掉它之后的sql,正確的語法:--后至少有一個任意字符
根本原理:就根據程序的字符串拼接name='%s',我們輸入一個xxx' -- haha,用我們輸入的xxx加'在程序中拼接成一個判斷條件name='xxx' -- haha'
sql='select * from userinfo where name="%s" and password="%s"' %(user,pwd) #注意%s需要加引號 #1、sql注入之:用戶存在,繞過密碼 egon' -- 任意字符 #2、sql注入之:用戶不存在,繞過用戶與密碼 xxx' or 1=1 -- 任意字符
解決方法:
# 原來是我們對sql進行字符串拼接 # sql="select * from userinfo where name='%s' and password='%s'" %(user,pwd) # print(sql) # res=cursor.execute(sql) #改寫為(execute幫我們做字符串拼接,我們無需且一定不能再為%s加引號了) sql="select * from userinfo where name=%s and password=%s" #!!!注意%s需要去掉引號,因為pymysql會自動為我們加上 res=cursor.execute(sql,[user,pwd]) #pymysql模塊自動幫我們解決sql注入的問題,只要我們按照pymysql的規矩來。
增、刪、改
conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor) print(conn) # 創建游標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("select * from test1") print(effect_row) # 6條記錄 # #執行sql語句 # #part1 sql="insert into test1(name,grade) values('egon',99)" res=cursor.execute(sql) # 執行sql語句,返回sql影響成功的行數 print(res) # 1 # # #part2 sql='insert into test1(name,grade) values(%s,%s);' res=cursor.execute(sql, ("alex",100)) # 執行sql語句,返回sql影響成功的行數 print(res)# 1 # # # part3 sql = 'insert into test1(name,grade) values(%s,%s);' res = cursor.executemany(sql,[('egon1',99),('egon2',88),('egon3',77)]) # 執行sql語句,返回sql影響成功的行數 print(res) # 3 執行sql語句,返回sql影響成功的行數 # conn.commit() # 提交后才發現表中插入記錄成功 # part4 sql = 'update test1 set grade = %s where name= %s;' res = cursor.execute(sql,(66,'egon')) # 執行sql語句,返回sql影響成功的行數 print(res) # 1 執行sql語句,返回sql影響成功的行數 conn.commit() # 提交后才發現表中插入記錄成功 # part5 sql = 'delete from test1 where name= %s;' res = cursor.execute(sql,('egon2')) # 執行sql語句,返回sql影響成功的行數 print(res) # 1 執行sql語句,返回sql影響成功的行數 conn.commit() # 提交后才發現表中插入記錄成功 # 關閉游標 cursor.close() # 關閉連接 conn.close()
查詢
# 指定以dict形式返回,默認以元祖形式 conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor) print(conn) # 創建游標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("select * from test1") print(effect_row) # res1=cursor.fetchone() # 獲取剩余結果的第一行數據 res2=cursor.fetchmany(2) # 獲取剩余結果前n行數據 res3=cursor.fetchall() # 獲取剩余結果所有數據 print(res1) #{'id': 1, 'name': 'aa', 'grade': 99} print(res2) #[{'id': 2, 'name': 'bb', 'grade': 55}, {'id': 3, 'name': 'cc', 'grade': 88}] print(res3) # [{'id': 9, 'name': 'y', 'grade': 44}, {'id': 10, 'name': 'y', 'grade': 99}, {'id': 11, 'name': 'y', 'grade': 55}] res4=cursor.fetchall() # 獲取剩余結果所有數據 print(res4) # 空,數據已經取完 # 在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置 cursor.scroll(0, mode='absolute') # 相對絕對位置移動 res5=cursor.fetchmany(2) # 獲取剩余結果前2行數據 print(res5) # [{'id': 1, 'name': 'aa', 'grade': 99}, {'id': 2, 'name': 'bb', 'grade': 55}] cursor.scroll(2, mode='relative') # 相對當前位置移動2條記錄,總記錄數還剩下2條 res6=cursor.fetchall() # 獲取剩余結果所有數據 print(res6) # [{'id': 10, 'name': 'y', 'grade': 99}, {'id': 11, 'name': 'y', 'grade': 55}] # 關閉游標 cursor.close() # 關閉連接 conn.close()
獲取新創建數據自增ID
可以獲取到最新自增的ID,也就是最后插入的一條數據ID
# conn = pymysql.connect(**mysql_connect_dict) # 指定以dict形式返回,默認以元祖形式 conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor) print(conn) # 創建游標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("select * from test1") sql = 'insert into test1(name,grade) values(%s,%s);' res = cursor.executemany(sql,[('egon1',99),('egon2',88),('egon3',77)]) # 執行sql語句,返回sql影響成功的行數 print(res) # 3 執行sql語句,返回sql影響成功的行數 conn.commit() #獲取自增id new_id = cursor.lastrowid print(new_id) #31 # 關閉游標 cursor.close() # 關閉連接 conn.close()
調用存儲過程
調用無參存儲過程
# 指定以dict形式返回,默認以元祖形式 conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor) print(conn) # 創建游標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("select * from test1") # 無參數存儲過程 cursor.callproc('p1') # 等價於cursor.execute("call p1()") # create procedure p1() # BEGIN # INSERT into test1(name,grade) values('egon4',100); # commit; # END row_1 = cursor.fetchone() print(row_1) conn.commit() # 關閉游標 cursor.close() # 關閉連接 conn.close()
b、調用有參存儲過程
# 指定以dict形式返回,默認以元祖形式 conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor) print(conn) # 創建游標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("select * from test1") # 有參數存儲過程 cursor.callproc('p2',args=('egon6',99)) # 等價於cursor.execute("call p1()") # delimiter // # CREATE PROCEDURE p2(IN p_in1 VARCHAR(20),IN p_in2 int) # BEGIN # INSERT into test1(name,grade) values(p_in1,p_in2); # commit; # END // # delimiter ; # 獲取執行完存儲的參數,參數@開頭 cursor.execute("select @_p2_0,@_p2_1;") #@p2_0代表第一個參數,@p2_1代表第二個參數,即返回值 row_1 = cursor.fetchone() print(row_1) # '@_p2_0': 'egon6', '@_p2_1': 99} conn.commit() # 關閉游標 cursor.close() # 關閉連接 conn.close()
使用with簡化連接過程
每次都連接關閉很麻煩,使用上下文管理,簡化連接過程
import pymysql import contextlib # 定義上下文管理器,連接后自動關閉連接 @contextlib.contextmanager def mysql(): mysql_connect_dict={ 'host':'127.0.0.1', 'port':3306, 'user':'yycenter', 'password':'qwe123', 'db':'testmysql', 'charset':'utf8' } # 連接數據庫 # conn = pymysql.connect(**mysql_connect_dict) # 指定以dict形式返回,默認以元祖形式 conn = pymysql.connect(**mysql_connect_dict,cursorclass=pymysql.cursors.DictCursor) print(conn) cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) try: yield cursor finally: conn.commit() cursor.close() conn.close() # 執行sql with mysql() as cursor: print(cursor) row_count = cursor.execute("select * from test1") row_1 = cursor.fetchone() print (row_count, row_1) # 8 {'id': 1, 'name': 'aa', 'grade': 99}
使用POOLDB連接mysql
import pymysql from DBUtils.PooledDB import PooledDB from utils.mylog_set import mylog from utils.time_tool import run_time from conf import settings def escape_quotes(val): '轉換單引號和雙引號' if isinstance(val, str): return val.replace("'", "\\\'") return val def stitch_sequence(seq=None, is_field=True, suf=None): ''' 序列拼接方法, 用於將序列拼接成字符串 - :seq: 拼接序列 - :suf: 拼接后綴(默認使用 ",") - :is_field: 是否為數據庫字段序列 ''' if seq is None: raise Exception("Parameter seq is None") suf = suf or "," res = str() for item in seq: res += '`%s`%s' % (item, suf) if is_field else '%s%s' % (item, suf) return res[:-len(suf)] class MysqlUtil(object): """ 簡便的數據庫操作 初始化參數如下: - :creator: 創建連接對象(默認: pymysql) - :host: 連接數據庫主機地址(默認: localhost) - :port: 連接數據庫端口(默認: 3306) - :user: 連接數據庫用戶名(默認: None), 如果為空,則會拋異常 - :password: 連接數據庫密碼(默認: None), 如果為空,則會拋異常 - :database: 連接數據庫(默認: None), 如果為空,則會拋異常 - :chatset: 編碼(默認: utf8) 初始化該數據庫下所有表的信息 """ def __init__(self, creator=pymysql, host=settings.mysqldb.get("host"), port=3306, user=settings.mysqldb.get("user"), password=settings.mysqldb.get("password"), database=settings.mysqldb.get("database"), charset="utf8"): if host is None: raise ValueError("Parameter [host] is None.") if port is None: raise ValueError("Parameter [port] is None.") if user is None: raise ValueError("Parameter [user] is None.") if password is None: raise ValueError("Parameter [password] is None.") if database is None: raise ValueError("Parameter [database] is None.") self.logger = mylog # 執行初始化 self._config = dict({ "creator": creator, "charset": charset, "host": host, "port": port, "user": user, "password": password, "database": database }) self._database = database self._table = None self._pool = None self._init_connect() self._init_params() def __del__(self): '重寫類被清除時調用的方法' if self._cursor: self._cursor.close() if self._conn: self._conn.close() def commit(self): # 提交 self._conn.commit() def rollback(self): # 回滾 self._conn.rollback() def _init_connect(self): '初始化連接' try: if self._pool is None: self._pool = PooledDB( **self._config, mincached=5, # 啟動時開啟的閑置連接數量(缺省值 0 以為着開始時不創建連接) maxcached=20, # 連接池中允許的閑置的最多連接數量(缺省值 0 代表不閑置連接池大小) maxshared=20, # 共享連接數允許的最大數量(缺省值 0 代表所有連接都是專用的)如果達到了最大數量,被請求為共享的連接將會被共享使用 maxusage=100) # 單個連接的最大允許復用次數(缺省值 0 或 False 代表不限制的復用).當達到最大數時,連接會自動重新連接(關閉和重新打開) # 獲得連接池 self._conn = self._pool.connection() # 建立連接 self._cursor = self._conn.cursor( cursor=pymysql.cursors.DictCursor) # 使用字典方式 self.logger.info("[{0}] 數據庫初始化成功。".format(self._database)) except Exception as e: self.logger.info.error(e) @run_time def execute_query(self, sql=None, args=(), single=False): '''執行查詢 SQL 語句 - :sql: sql 語句 - :single: 是否查詢單個結果集,默認False ''' try: if sql is None: raise Exception("Parameter sql is None.") self.logger.debug( "[{}] SQL >>> [{}] args =[{}]" .format( self._database, sql, args)) self._cursor.execute(sql, args) return self._cursor.fetchone() if single else self._cursor.fetchall() except Exception as e: self.logger.error(e) @run_time def execute_update(self, sql=None, args=()): '''執行更新 SQL 語句 - :sql: sql 語句 ''' try: if sql is None: raise Exception("Parameter sql is None.") self.logger.debug("[%s] SQL >>> [%s]" % (self._database, sql)) result = self._cursor.execute(sql, args) return result except Exception as e: self.logger.error(e) self._conn.rollback() def _init_params(self): '初始化參數' self._table_dict = {} self._information_schema_columns = [] self._table_column_dict_list = {} self._init_table_dict_list() self._init_table_column_dict_list() def _init_information_schema_columns(self): "查詢 information_schema.`COLUMNS` 中的列" sql = """ SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA='information_schema' AND TABLE_NAME='COLUMNS' """ result_list = self.execute_query(sql) # self.logger.debug(result_list) column_list = [r['COLUMN_NAME'] for r in result_list] self.logger.debug('column_list:>>> {}.'.format(column_list)) self._information_schema_columns = column_list def _init_table_dict(self, table_name): '初始化表' if not self._information_schema_columns: self._init_information_schema_columns() stitch_str = stitch_sequence(self._information_schema_columns) sql = """ SELECT %s FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' """ % (stitch_str, self._database, table_name) column_list = self.execute_query(sql) column_dict = {} for column in column_list: column_dict[column["COLUMN_NAME"]] = column self._table_dict[table_name] = column_dict def _init_table_dict_list(self): "初始化表字典對象" sql = "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA='%s'" % ( self._database) table_list = self.execute_query(sql) self._table_dict = {t['TABLE_NAME']: {} for t in table_list} self.logger.debug('_table_dicts {}'.format(self._table_dict)) for table in [t['TABLE_NAME'] for t in table_list]: self._init_table_dict(table) def _init_table_column_dict_list(self): '''初始化表字段字典列表''' """ example:{'test1': ['id', 'name', 'grade'] """ for table, column_dict in self._table_dict.items(): column_list = [column for column in column_dict.keys()] self._table_column_dict_list[table] = column_list self.logger.debug( "table_dict info: {}".format( self._table_column_dict_list)) # 根據表自動創建參數字典 def create_params(self, table, args={}): col_list = self._table_column_dict_list[table] params = {} for k in col_list: if args.__contains__(k): params[k] = args[k] return params def _parse_result(self, result): '用於解析單個查詢結果,返回字典對象' if result is None: return None obj = {key: value for key, value in zip(self._column_list, result)} return obj def _parse_results(self, results): '用於解析多個查詢結果,返回字典列表對象' if results is None: return None objs = [self._parse_result(result) for result in results] return objs def _get_primary_key(self, table_name): '獲取表對應的主鍵字段' if self._table_dict.get(table_name) is None: raise Exception(table_name, "is not exist.") for column, column_dict in self._table_dict[table_name].items(): if column_dict["COLUMN_KEY"] == "PRI": return column def _get_table_column_list(self, table_name=None): '查詢表的字段列表, 將查詢出來的字段列表存入 __fields 中' return self._table_column_dict_list[table_name] def _check_table_name(self, table_name): '''驗證 table_name 參數''' if table_name is None: raise Exception("Parameter [table_name] is None.") else: self._table = table_name self._column_list = self._table_column_dict_list[self._table] def count(self, table_name=None): '''統計記錄數''' self._check_table_name(table_name) sql = "SELECT count(*) FROM %s" % (self._table) result = self.execute_query(sql, True) return result[0] @run_time def executemany(self, sql, args): # 批量執行 try: self.logger.debug('executemany sql:{}'.format(sql)) return self.cursor.executemany(sql, args) except Exception as e: self.close() raise e # 執行sql,參數一:table,參數二:查詢列'col1,col2' 參數三:參數字典{'字段1':'值1','字段2':'值2'} def queryByTable(self, table, col='*', cond_dict={}): # self.execute(sql, args) self._check_table_name(table) cond_dict = self.create_params(table, cond_dict) cond_stmt = ' and '.join(['%s=%%s' % k for k in cond_dict.keys()]) # del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s' if not cond_dict: query_sql = 'select %(col)s FROM %(table)s' else: query_sql = 'select %(col)s FROM %(table)s where %(cond_stmt)s' # 執行sql,參數一:sql def queryBySql(self, query_sql): # self.execute(sql, args) '''驗證 table_name 參數''' if query_sql is None: raise Exception("Parameter [query_sql] is None.") return self.execute_query(query_sql) def insertByTable(self, table_name=None, obj={}): '''保存方法 - @param table_name 表名 - @param obj 對象 - @return 影響行數 ("test1", {'name': 'x', 'grade': 99} ''' self._check_table_name(table_name) if obj is None: obj = {} primary_key = self._get_primary_key(self._table) if primary_key not in obj.keys(): obj[primary_key] = None stitch_str = stitch_sequence(obj.keys()) # print(stitch_str) value_list = [] for key, value in obj.items(): if self._table_dict[self._table][key]["COLUMN_KEY"] != "PKI": value = "null" if value is None else '"%s"' % value value_list.append(escape_quotes(value)) stitch_value_str = stitch_sequence(value_list, False) sql = 'INSERT INTO `%s` (%s) VALUES(%s)' % ( self._table, stitch_str, stitch_value_str) return self.execute_update(sql) def deleteByTable(self, table_name=None, cond_dict={}): '''刪除 - @param table_name 表名 - @param cond_dict = {}: #參數二:用於where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'} - @return 影響行數 ''' self._check_table_name(table_name) cond_dict = self.create_params(table_name, cond_dict) cond_stmt = ' and '.join(["%s=%%s" % (k) for k in cond_dict.keys()]) # del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s' if not cond_dict: del_sql = 'DELETE FROM %(table_name)s' else: del_sql = 'DELETE FROM %(table_name)s where %(cond_stmt)s' return self.execute_update( del_sql % locals(), tuple(cond_dict.values())) def updateByTable(self, table, column_dict={}, cond_dict={}): # 更新,參數一:表名,參數二用於set 字段1=值1,字段2=值2...格式:{'字段1':'值1','字段2':'值2'}, # 參數三:用於where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'} self._check_table_name(table) column_dict = self.create_params(table, column_dict) cond_dict = self.create_params(table, cond_dict) set_stmt = ','.join(['%s=%%s' % k for k in column_dict.keys()]) cond_stmt = ' and '.join(['%s=%%s' % k for k in cond_dict.keys()]) if not cond_dict: upd_sql = 'UPDATE %(table)s set %(set_stmt)s' else: upd_sql = 'UPDATE %(table)s set %(set_stmt)s where %(cond_stmt)s' args = tuple(column_dict.values()) + tuple(cond_dict.values()) # 合並成1個 return self.execute_update(upd_sql % locals(), args) class Page(object): '分頁對象' def __init__(self, page_num=1, page_size=10, count=False): ''' Page 初始化方法 - @param page_num 頁碼,默認為1 - @param page_size 頁面大小, 默認為10 - @param count 是否包含 count 查詢 ''' # 當前頁數 self.page_num = page_num if page_num > 0 else 1 # 分頁大小 self.page_size = page_size if page_size > 0 else 10 # 總記錄數 self.total = 0 # 總頁數 self.pages = 1 # 起始行(用於 mysql 分頁查詢) self.start_row = (self.page_num - 1) * self.page_size # 結束行(用於 mysql 分頁查詢) self.end_row = self.start_row + self.page_size if __name__ == '__main__': sql1 = MysqlUtil()