#封装MySQL类
#分表作业
#用类练习
1、
import pymysql
import traceback
from loguru import logger
class MySQL:
def __init__(self, mysql_info, cur_type=1):#构造函数,传一个字典
self.mysql_info = mysql_info
self.cur_type = cur_type
self.__connect()
self.__create_cursor()
def __del__(self):#连接
logger.debug("关闭mysql连接")
self.__close()
def __connect(self):#加下划线就叫私有方法,方法或者变量前面加,就叫私有方法。只能自己调用在这个类里面
try:
self.__conn = pymysql.connect(**self.mysql_info)#
except Exception:
logger.error("mysql连接失败!mysql的连接信息是{},错误信息,{}", #日志
self.mysql_info, traceback.format_exc()) #错误信息
quit()
def __create_cursor(self): #私有
self.cursor = self.__conn.cursor() if self.cur_type == 1 else self.__conn.cursor(pymysql.cursors.DictCursor)
def execute_sql(self, sql):
try:
self.cursor.execute(sql)
except Exception:
logger.warning("sql语句执行出错,sql是:{},报错信息:{}", sql, traceback.format_exc())
else:
return True
def fetchone(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchone()
logger.debug("本次sql查询的结果是:{}", result)
return result
def fetchall(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchall()
logger.debug("本次sql查询的结果是:{}", result)
return result
def __close(self):
if hasattr(self,"conn"): #判断当前这个对象里面有没有conn这个属性
self.__conn.close()
self.cursor.close()
# update insert delete create drop
if __name__ == '__main__':
mysql_info = {
"host": "110.40.129.51",
"user": "jxz",
"passwd": "123456",
"db": "jxz",
"port": 3306,
"charset": "utf8",
"autocommit": True
}
mysql = MySQL(mysql_info)
mysql.execute_sql("select * from user_ztt")
for line in mysql.cursor:
print(line)
# mysql.fetchall("select * from user_ztt limit 5;")
# mysql.fetchone("select * from user_ztt where id =3")
hasattr内置函数,判断一个对象有没有方法或属性
2、
import pymysql
import traceback
from loguru import logger
class MySQL:
def __init__(self, mysql_info, cur_type=1):
self.mysql_info = mysql_info
self.cur_type = cur_type
self.__connect()
self.__create_cursor()
def __del__(self):
logger.debug("关闭mysql连接")
self.__close()
def __connect(self):
try:
self.__conn = pymysql.connect(**self.mysql_info)
except Exception:
logger.error("mysql连接失败!mysql的连接信息是{},错误信息,{}",
self.mysql_info, traceback.format_exc())
quit()
def __create_cursor(self):
self.cursor = self.__conn.cursor() if self.cur_type == 1 else self.__conn.cursor(pymysql.cursors.DictCursor)
def execute_sql(self, sql):
try:
self.cursor.execute(sql)
except Exception:
logger.warning("sql语句执行出错,sql是:{},报错信息:{}", sql, traceback.format_exc())
else:
return True
def fetchone(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchone()
logger.debug("本次sql查询的结果是:{}", result)
return result
def fetchall(self, sql):
sql_tag = self.execute_sql(sql)
if sql_tag:
result = self.cursor.fetchall()
logger.debug("本次sql查询的结果是:{}", result)
return result
def __close(self):
if hasattr(self,"conn"): #判断当前这个对象里面有没有conn这个属性
self.__conn.close()
self.cursor.close()
# update insert delete create drop
if __name__ == '__main__':
mysql_info = {
"host": "
110.40.129.51",
"user": "jxz",
"passwd": "123456",
"db": "jxz",
"port": 3306,
"charset": "utf8",
"autocommit": True
}
mysql = MySQL(mysql_info)
mysql.execute_sql("select * from user_ztt")
for line in mysql.cursor:
print(line)
# mysql.fetchall("select * from user_ztt limit 5;")
# mysql.fetchone("select * from user_ztt where id =3")
3、
import traceback
# 私有化: 私有变量,私有方法:只能在类里面调用
import pymysql
from day7_homework.config import mysql_info
class MySQl:
def __init__(self,mysql_info,data_type=1):
self.mysql_info=mysql_info
self.data_type=data_type
self.__connect_status=False
# 自动执行
self.__connect()
# 私有化 在前面添加两个下划线
def __connect(self):
print("开始链接数据库")
try:
self.__connect= pymysql.Connect(**self.mysql_info)
except:
print("数据库链接出错,错误信息为%s"%traceback.format_exc())
# 抛出异常 不在往下执行
raise Exception("数据库链接出错")
print("链接成功")
self.__connect_status=True
if self.data_type !=1:
self.__cur=self.__connect.cursor(pymysql.cursors.DictCursor)
else:
self.__cur=self.__connect.cursor()
def execute(self,sql):
try:
self.__cur.execute(sql)
except:
print("sql执行不正确,sql语句是%s"%sql)
else:
return True def fetchone(self,sql): if self.execute(sql):
return self.__cur.fetchone()
def fetchall(self,sql):
if self.execute(sql):
return self.__cur.fetchall()
def __close(self):
self.__cur.close()
self.__connect.close()
def __del__(self):
self.__close()
print("mysql链接关闭")
if __name__=="__main__":
my=MySQl()