#封裝MySQL類
#分表作業
#用類練習
1、
import pymysql
import traceback
from loguru import logger
class MySQL:
def __init__(self, mysql_info, cur_type=1):#構造函數,傳一個字典
self.mysql_info = mysql_info
self.cur_type = cur_type
self.__connect()
self.__create_cursor()
def __del__(self):#連接
logger.debug("關閉mysql連接")
self.__close()
def __connect(self):#加下划線就叫私有方法,方法或者變量前面加,就叫私有方法。只能自己調用在這個類里面
try:
self.__conn = pymysql.connect(**self.mysql_info)#
except Exception:
logger.error("mysql連接失敗!mysql的連接信息是{},錯誤信息,{}", #日志
self.mysql_info, traceback.format_exc()) #錯誤信息
quit()
def __create_cursor(self): #私有
self.cursor = self.__conn.cursor() if self.cur_type == 1 else self.__conn.cursor(pymysql.cursors.DictCursor)
def execute_sql(self, sql):
try:
self.cursor.execute(sql)
except Exception:
logger.warning("sql語句執行出錯,sql是:{},報錯信息:{}", sql, traceback.format_exc())
else:
return True
def fetchone(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchone()
logger.debug("本次sql查詢的結果是:{}", result)
return result
def fetchall(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchall()
logger.debug("本次sql查詢的結果是:{}", result)
return result
def __close(self):
if hasattr(self,"conn"): #判斷當前這個對象里面有沒有conn這個屬性
self.__conn.close()
self.cursor.close()
# update insert delete create drop
if __name__ == '__main__':
mysql_info = {
"host": "110.40.129.51",
"user": "jxz",
"passwd": "123456",
"db": "jxz",
"port": 3306,
"charset": "utf8",
"autocommit": True
}
mysql = MySQL(mysql_info)
mysql.execute_sql("select * from user_ztt")
for line in mysql.cursor:
print(line)
# mysql.fetchall("select * from user_ztt limit 5;")
# mysql.fetchone("select * from user_ztt where id =3")
hasattr內置函數,判斷一個對象有沒有方法或屬性
2、
import pymysql
import traceback
from loguru import logger
class MySQL:
def __init__(self, mysql_info, cur_type=1):
self.mysql_info = mysql_info
self.cur_type = cur_type
self.__connect()
self.__create_cursor()
def __del__(self):
logger.debug("關閉mysql連接")
self.__close()
def __connect(self):
try:
self.__conn = pymysql.connect(**self.mysql_info)
except Exception:
logger.error("mysql連接失敗!mysql的連接信息是{},錯誤信息,{}",
self.mysql_info, traceback.format_exc())
quit()
def __create_cursor(self):
self.cursor = self.__conn.cursor() if self.cur_type == 1 else self.__conn.cursor(pymysql.cursors.DictCursor)
def execute_sql(self, sql):
try:
self.cursor.execute(sql)
except Exception:
logger.warning("sql語句執行出錯,sql是:{},報錯信息:{}", sql, traceback.format_exc())
else:
return True
def fetchone(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchone()
logger.debug("本次sql查詢的結果是:{}", result)
return result
def fetchall(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchall()
logger.debug("本次sql查詢的結果是:{}", result)
return result
def __close(self):
if hasattr(self,"conn"): #判斷當前這個對象里面有沒有conn這個屬性
self.__conn.close()
self.cursor.close()
# update insert delete create drop
if __name__ == '__main__':
mysql_info = {
"host": "
110.40.129.51",
"user": "jxz",
"passwd": "123456",
"db": "jxz",
"port": 3306,
"charset": "utf8",
"autocommit": True
}
mysql = MySQL(mysql_info)
mysql.execute_sql("select * from user_ztt")
for line in mysql.cursor:
print(line)
# mysql.fetchall("select * from user_ztt limit 5;")
# mysql.fetchone("select * from user_ztt where id =3")
3、
import traceback
# 私有化: 私有變量,私有方法:只能在類里面調用
import pymysql
from day7_homework.config import mysql_info
class MySQl:
def __init__(self,mysql_info,data_type=1):
self.mysql_info=mysql_info
self.data_type=data_type
self.__connect_status=False
# 自動執行
self.__connect()
# 私有化 在前面添加兩個下划線
def __connect(self):
print("開始鏈接數據庫")
try:
self.__connect= pymysql.Connect(**self.mysql_info)
except:
print("數據庫鏈接出錯,錯誤信息為%s"%traceback.format_exc())
# 拋出異常 不在往下執行
raise Exception("數據庫鏈接出錯")
print("鏈接成功")
self.__connect_status=True
if self.data_type !=1:
self.__cur=self.__connect.cursor(pymysql.cursors.DictCursor)
else:
self.__cur=self.__connect.cursor()
def execute(self,sql):
try:
self.__cur.execute(sql)
except:
print("sql執行不正確,sql語句是%s"%sql)
else:
return True def fetchone(self,sql): if self.execute(sql):
return self.__cur.fetchone()
def fetchall(self,sql):
if self.execute(sql):
return self.__cur.fetchall()
def __close(self):
self.__cur.close()
self.__connect.close()
def __del__(self):
self.__close()
print("mysql鏈接關閉")
if __name__=="__main__":
my=MySQl()