Python接口自动化测试框架
在自动化的测试体系中,包含了UI自动化测试和接口自动化测试,UI自动化实现的前提是软件版本进入稳定期,UI界面稳定、变动少,相比较之下接口自动化,接口受外界因素的影响较少,维护成本低,能够在最短时间发现问题。
一、浅谈接口测试
1、什么是接口测试:
API测试又称为接口测试,主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点,是对系统接口功能进行测试的一种手段,也是集成测试的一部分,通过直接控制被测应用的接口(API)来确定是否在功能、可靠性、性能和安全方面达到预期的软件测试活动。
2、如何测试接口:
- 检查接口返回的数据是否与预期结果一致。
- 检查接口的容错性,假如传递数据的类型错误时是否可以处理。
- 接口参数的边界值。例如,传递的参数足够大或为负数时,接口是否可以正常处理。
- 接口的性能,http请求接口大多与后端代码逻辑、执行的SQL语句性能、算法等相关。
- 接口的安全性,外部调用的接口尤为重要。
3、接口测试的意义、目的:
接口测试的核心意义、目的在于:以保证系统的正确和稳定为核心,以持续集成为手段,提高测试效率,提升用户体验,降低产品研发成本。
二、定义框架目录分层
接口自动化框架的没有统一标准,可以根据实际需要自定义,以满足功能测试目标要求为目的。一个基本的接口自动化测试框架需要满足的功能:测试用例管理、各种配置信息管理、数据库操作、日志打印、报告输出、邮件发送
""" API_Autotest/ |-- API_Case/ #测试用例 | | |-- PreviewRelease_01_RegisterLogin.py #预发布环境注册登录接口case | | |-- PreviewRelease_02_SubmitCredit.py #预发布环境主流程接口case | | |-- PreviewRelease_03_MobilePhonePWD.py #预发布环境.......接口case | | |-- Test_01_RegisterLogin.py #测试环境注册登录接口case | | |-- Test_02_SubmitCredit.py #测试环境主流程接口case | | |-- Test_03_LoanRepayment.py #测试环境......接口case | | | |-- data/ ###配置信息 | |-- conf_dict.py #账号密码等配置信息 | |-- custom_variable.py #向接口请求的请求的参数变量、keys、请求头 | |-- export_url.py #接口url | |-- request_dict.py #向接口请求的请求参数 | |-- logic/ ###主要逻辑 | |-- export_logic.py #接口实现主逻辑 | |-- database.py #数据库操作类 | |-- log_print.py #日志打印类 | |-- public_class.py #公用函数类 | |-- send_email.py #发送邮件类 | |-- log/ #日志 | |-- Case--201804182020_log #根据运行日期保存操作日志 | |-- Case--201804182022_log | |-- report/ #测试报告 | |-- report--201804181729.html #根据运行日期保存测试报告日志 | |-- report--201804181731.html | |-- readme.md #readme | |-- manage.py #接口case运行管理类 """
三、知识技能储备
萝卜青菜各有所爱,每个人心中的接口自动化测试框架也各不相同,想实现一个基础功能完备的接口测试框架需要的Python知识如下:
1、Python基础知识
列举了一些需要掌握的基础知识
- 基本语法、循环
- 列表
- 元组、字典
- 字符串、字符编码与转码
- 文件操作
- 函数、无参/有参参数、全局变量/局部变量
- 函数非固定参数,返回值(return)
- 类的实例化过程、特征、共有属性和私有属性
- 类的继承(新式类/经典类)、多态
- 类的装饰器方法、特殊成员方法
- 更多基础知识......
2、主要依赖的库
下面介绍的库都是数据库操作、日志打印、报告输出、邮件发送功能实现所依赖的库:
a、数据库操作
数据库操作主要pymysql库,下面为代码示例:

import pymysql import datetime,time import os from data.conf_dict import ConfData from logic.log_print import LOG from logic.public_class import Public class ConnectDatabase(): """ 连接数据库类 """ def __init__(self): self.Log = LOG() self.conf_data = ConfData() # 连接数据库110 self.connection_110 = pymysql.connect(host=self.conf_data.get_conf_data("database_110", "host"), port=self.conf_data.get_conf_data("database_110", "port"), user=self.conf_data.get_conf_data("database_110", "user"), password=self.conf_data.get_conf_data("database_110", "password"), db=self.conf_data.get_conf_data("database_110", "db"), charset='utf8', # 以字典形式展示所查询数据 cursorclass=pymysql.cursors.DictCursor) # 保存错误日志的文件名称 self.log_name = Public.get_new_file(path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "log")) def select_sql(self, **kwargs): """ 根据传入参数执行数据库查询操作 :param args: :param kwargs: database:(database_110)选择数据库、table:表名、condition:where条件 :return: """ database_name = kwargs.get("database") field_name = kwargs.get("field", "*") table_name = kwargs.get("table") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "SELECT %s FROM %s WHERE %s;" data = (field_name, table_name, where_condition) cursor.execute(sql % data) self.connection_110.commit() result = cursor.fetchone() return result except Exception as e: self.Log.log_warning("database", self.log_name, "select_error:%s" % e) def select_sql_all(self, *args, **kwargs): """ 根据传入参数执行数据库查询操作 :param args: :param kwargs: database:(database_110)选择数据库、table:表名、condition:where条件 :return: """ database_name = kwargs.get("database") field_name = kwargs.get("field", "*") table_name = kwargs.get("table") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "SELECT %s FROM %s WHERE %s;" data = (field_name, table_name, where_condition) print((sql % data)) cursor.execute(sql % data) self.connection_110.commit() result = cursor.fetchall() return result except Exception as e: self.Log.log_warning("database", self.log_name, "select_error:%s" % e) def update_sql(self, *args, **kwargs): """ 根据传入参数执行数据库更新操作 :param args: :param kwargs: database:(database_110)选择数据库、table:表名、set:更新的字段和值、condition:where条件 :return: """ database_name = kwargs.get("database") table_name = kwargs.get("table") set_value = kwargs.get("set") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "UPDATE %s SET %s WHERE %s;" data = (table_name, set_value, where_condition) # print(sql%data) cursor.execute(sql % data) self.connection_110.commit() return cursor.rowcount cursor.close() except Exception as e: self.Log.log_warning("database", self.log_name, "update_error:%s" % e) self.connection_110.rollback() def delete_sql(self, *args, **kwargs): """ 根据传入参数执行数据库删除操作 :param args: :param kwargs: database:(database_110)选择数据库、table:表名、condition:where条件 :return: """ database_name = kwargs.get("database") table_name = kwargs.get("table") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "DELETE from %s where %s;" data = (table_name, where_condition) cursor.execute(sql % data) self.connection_110.commit() return cursor.rowcount except Exception as e: self.Log.log_warning("database", self.log_name, "delete_error:%s" % e) self.connection_110.rollback() def insert_sql(self, *args, **kwargs): """ 根据传入参数执行数据库插入操作 :param args: :param kwargs: database:(database_110)选择数据库、sql:需要插入的sql语句 :return: """ database_name = kwargs.get("database") insert_sql = kwargs.get("sql") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: cursor.execute(insert_sql) self.connection_110.commit() return cursor.rowcount except Exception as e: self.Log.log_warning("database", self.log_name, "insert_error:%s" % e) self.connection_110.rollback() def mysql_function(self, *args, **kwargs): """ 根据传入参数执行数据库函数操作 :param args: :param kwargs: database:(database_110)选择数据库、function_name:函数名称,data_id:数据ID,phone:电话 :return: """ database_name = kwargs.get("database") function_name = kwargs.get("function_name") data_id = kwargs.get("data_id") phone = kwargs.get("phone") product_id = kwargs.get("product_id") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: cursor.callproc(function_name,args=(data_id,phone,product_id,)) self.connection_110.commit() except Exception as e: self.Log.log_warning("database", self.log_name, "mysql_function:%s" % e) self.connection_110.rollback() if __name__ == "__main__": b = ConnectDatabase() data_id = Public.create_short_id()
PS:pymsql库操作mysql数据库增、删、查、改、调用函数
b、日志打印
日志打印依赖logging库,下面为代码示例:

import logging import os import sys import datetime log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log") sys.path.append(log_path) class LOG(object): """ 日志打印类 """ def __init__(self): self.now_time = datetime.datetime.now().strftime("%Y%m%d%H%M") def log_info(self, *args): """ 根据传入参数打印普通日志 :param arg[0]log的功能模块名,arg[1] 保存log的文件名,arg[2]要打印的日志内容 :return 返回logger 对象 """ # 创建一个logger对象 logger = logging.getLogger(args[0]) logger.setLevel(logging.DEBUG) # 创建一个向屏幕输入的handler对象 ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) # 创建一个像文件输入的handler对象 log_file = os.path.join(log_path, "%s--%s_log" % (args[1], self.now_time)) fh = logging.FileHandler(log_file, mode="a+", encoding="utf-8") fh.setLevel(logging.DEBUG) # 设置log输入格式 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) fh.setFormatter(formatter) # logger,添加handler对象 logger.addHandler(ch) logger.addHandler(fh) logger.info(args[2]) # 在记录日志之后移除句柄, 不然会重复打印日志 logger.removeHandler(fh) logger.removeHandler(ch) return logger @staticmethod def log_warning(*args): """ 根据传入参数错误日志 :param arg[0]log的功能模块名,arg[1]文件名 arg[2] 需要打印的内容 :return 返回logger 对象 """ # 创建一个logger对象 logger = logging.getLogger(args[0]) logger.setLevel(logging.DEBUG) # 创建一个向屏幕输入的handler对象 ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) # 创建一个像文件输入的handler对象 log_file = os.path.join(log_path, args[1]) fh = logging.FileHandler(log_file, mode="a+", encoding="utf-8") fh.setLevel(logging.DEBUG) # 设置log输入格式 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) fh.setFormatter(formatter) # logger,添加handler对象 logger.addHandler(ch) logger.addHandler(fh) logger.warning(args[2]) # 在记录日志之后移除句柄, 不然会重复打印日志 logger.removeHandler(fh) logger.removeHandler(ch) print("aaa") return logger # @staticmethod def log_debug(self,message): """ 根据传入参数错误日志 :param arg[0]log的功能模块名,arg[1]文件名 arg[2] 需要打印的内容 :return 返回logger 对象 """ # 创建Logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # 创建Handler # 终端Handler consoleHandler = logging.StreamHandler(sys.stdout) consoleHandler.setLevel(logging.DEBUG) # 文件Handler fileHandler = logging.FileHandler('ing.log', mode='w', encoding='UTF-8') fileHandler.setLevel(logging.NOTSET) # Formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') consoleHandler.setFormatter(formatter) fileHandler.setFormatter(formatter) # 添加到Logger中 logger.addHandler(consoleHandler) logger.addHandler(fileHandler) return logger.debug(message) # if __name__ == "__main__": # A=LOG() # A.log_debug('123')
PS:logging库详细介绍
c、报告输入
报告输出依赖库主要为requests、unittest、下面为代码示例:

import requests from data.export_url import ExportERL from data.request_dict import RequestData from logic.log_print import LOG from logic.database import ConnectDatabase from data.custom_variable import GlobalVariable class ExportLogic(object): """ ExportLogic 接口主逻辑类 """ def __init__(self): self.Request = RequestData() self.Url = ExportERL() self.ConnectDatabase = ConnectDatabase() self.Log = LOG() self.Custom_variable = GlobalVariable() @staticmethod def request_post(*args, **kwargs): """ 传入请求参数,返回json请求结果 :param args: :param kwargs: json:请求数据, url:请求路径, header:请求头 :return: """ request_data = kwargs.get("json") request_url = kwargs.get("url") request_header = kwargs.get("header") response_json = requests.post(url=request_url, data=request_data, headers=request_header).json() return response_json @staticmethod def request_post_json(*args, **kwargs): """ 传入请求参数,返回json请求结果 :param args: :param kwargs: json:请求数据, url:请求路径, header:请求头 :return: """ request_data = kwargs.get("json") request_url = kwargs.get("url") request_header = kwargs.get("header") response_json = requests.post(url=request_url, json=request_data, headers=request_header) return response_json @staticmethod def request_post_file(*args, **kwargs): """ 上传接口独有 :param args: :param kwargs: json:请求数据, url:请求路径, header:请求头, file:文件路径 :return: """ request_data = kwargs.get("json") request_url = kwargs.get("url") request_header = kwargs.get("header") request_file = kwargs.get("files") response_json = requests.post(url=request_url, data=request_data, files=request_file, headers=request_header).json() return response_json if __name__ == "__main__": b = ExportLogic()

import unittest import os import HTMLTestReportCN import datetime from logic.export_logic import ExportLogic from data.custom_variable import GlobalVariable from data.export_url import ExportERL from logic.public_class import Public from data.request_dict import RequestData from logic.log_print import LOG from logic.database import ConnectDatabase from data.conf_dict import ConfData from logic.send_email import SendEmail import random,requests class LZExportCase(unittest.TestCase): """ 新浪有借有还接口测试用例 """ # 全距 PHONE = Public().create_phone() #提交五项资料的Header HEADER = GlobalVariable().get_header('lz_Header') #修改手机号的Header HEADER1 = GlobalVariable().get_header('lz_Header') BASEID = '' PERIODNUM = '' IMGID ='' Repayid = '' Periodnum_ = '' def setUp(self): self.Export_logic = ExportLogic() self.Export_url = ExportERL() self.Public = Public() self.Request_data = RequestData() self.Custom_variable = GlobalVariable() self.Log = LOG() self.ConnectDatabase = ConnectDatabase() self.Conf_data = ConfData() self.password = GlobalVariable().get_variable("password") self.phone = Public().create_phone() self.newphone = GlobalVariable().get_variable("YJ_newphone") # self.report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report") self.log_path = os.path.dirname(os.path.dirname(__file__)) self.path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static"), "logo.jpg") def tearDownC(self): pass # os.remove(os.path.join(self.report_path, "report.html")) # os.remove(os.path.join(os.path.join(self.path, "log"), "Export_log")) def test_085_FileLoad(self): """ Case--上传文件(saveType正常) """ url = self.Export_url.get_export_url("LZ_test_url", "File") request_json = self.Request_data.get_request_data('File') file_ = open(self.path, 'rb') file = { "file": ('logo.jpg', file_, 'image/jpeg') } request_json["saveType"] = '1' request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post_file(url=url, json=request_json, files=file, header=self.HEADER) LZExportCase.IMGID = response_json['data']['id'] self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code']) file_.close() def test_086_HeadImg(self): """ Case--修改头像(headimgId正常) """ url = self.Export_url.get_export_url("LZ_test_url", "HeadImg") request_json = self.Request_data.get_request_data('HeadImg') request_json["headimgId"] = LZExportCase.IMGID request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code']) def test_087_HeadImg(self): """ Case--修改头像(headimgId错误) """ url = self.Export_url.get_export_url("LZ_test_url", "HeadImg") request_json = self.Request_data.get_request_data('HeadImg') request_json["headimgId"] = '3213adcx213vcv' request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['code'], response_json['code']) def test_088_HeadImg(self): """ Case--修改头像(headimgId为空) """ url = self.Export_url.get_export_url("LZ_test_url", "HeadImg") request_json = self.Request_data.get_request_data('HeadImg') request_json["headimgId"] = '' request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['code'], response_json['code']) if __name__ == '__main__': Send_mail = SendEmail() now_time = datetime.datetime.now().strftime("%Y%m%d%H%M") module_name = os.path.basename(__file__).split(".")[0] module = __import__(module_name) path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "report") logo_path = os.path.join(os.path.join(path, "static"),"logo.jpg") fp = open(os.path.join(path, "report--%s.html" % now_time), "wb") runner = HTMLTestReportCN.HTMLTestRunner(stream=fp) all_suite = unittest.defaultTestLoader.loadTestsFromModule(module) runner.run(all_suite) fp.close() SendEmail.send_email(Send_mail)
d、邮件发送
邮件发送依赖两个python内置库smtplib、email:

import os import smtplib import email.mime.multipart import email.mime.text from email import encoders import datetime from data.conf_dict import ConfData from logic.public_class import Public class SendEmail(object): def __init__(self): self.conf_data = ConfData() self.sender = self.conf_data.get_conf_data("email", "sender") self.receiver = self.conf_data.get_conf_data("email", "receiver") self.SMTP_server = self.conf_data.get_conf_data("email", "SMTP_server") self.username = self.conf_data.get_conf_data("email", "username") self.password = self.conf_data.get_conf_data("email", "password") self.content = self.conf_data.get_conf_data("email", "content") self.now_time = datetime.datetime.now().strftime("%Y%m%d%H%M") self.report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report") self.log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log") def create_msg(self,receiver): """ 此函数主要构建收发邮件联系人、邮件正文、邮件附件、邮件标题 :return: """ # 构建邮件正文 msg = email.mime.multipart.MIMEMultipart() msg["from"] = self.sender msg["to"] = receiver msg['subject'] = "API自动化测试报告" txt = email.mime.text.MIMEText(self.content) msg.attach(txt) # 构建邮件附件之一:自动化测试报告 report_name = Public.get_new_file(path=self.report_path) report_path = os.path.join(self.report_path, report_name) report = email.mime.text.MIMEText(open(report_path, "rb") .read(), 'html', 'utf-8') report["Content-Type"] = 'application/octet-stream' report.add_header('Content-Disposition', 'attachment', filename=('gbk', '', report_name)) encoders.encode_base64(report) msg.attach(report) # 构建邮件附件之一:测试日志 log_name = Public.get_new_file(path=self.log_path) log_path = os.path.join(self.log_path, log_name) info_log = email.mime.text.MIMEText(open(log_path, 'rb').read(), 'base64', 'utf-8') info_log["Content-Type"] = 'application/octet-stream' info_log.add_header('Content-Disposition', 'attachment', filename=('gbk', '', log_name)) encoders.encode_base64(info_log) msg.attach(info_log) return msg def send_email(self,*args): """ 创建实例,发送邮件 :return: """ receiver = self.conf_data.get_conf_data("email", "%s_receiver" % args[0][0]) if args else self.receiver msg = self.create_msg(receiver) smtp = smtplib.SMTP_SSL(self.SMTP_server, 465) # 在Linux端使用ssL方式连接邮箱服务器 # smtp.connect(self.SMTP_server, 465) # 在windows端使用connect方式连接邮箱服务器 smtp.login(self.username, self.password) smtp.sendmail(self.sender, receiver.split(","), msg.as_string()) smtp.quit() if __name__ == "__main__": s = SendEmail() s.send_email()
PS:smtplib、email库详细介绍