pytest接口自动化测试
结合单元测试框架pytest+数据驱动模型+allure
目录
api: 存储测试接口
conftest.py :设置前置操作
目前前置操作:1、获取token并传入headers,2、获取命令行参数给到环境变量,指定运行环境
commmon:存储封装的公共方法
connect_mysql.py:连接数据库
http_requests.py: 封装自己的请求方法
logger.py: 封装输出日志文件
read_yaml.py:读取yaml文件测试用例数据
read_save_data.py:读取保存的数据文件
case: 存放所有的测试用例
data:存放测试需要的数据
save_data: 存放接口返回数据、接口下载文件
test_data: 存放测试用例依赖数据
upload_data: 存放上传接口文件
logs: 存放输出的日志文件
report: 存放测试输出报告
getpathinfo.py :封装项目测试路径
pytest.int :配置文件
requirement.txt: 本地python包(pip install -r requirements.txt 安装项目中所有python包)
run_main.py: 项目运行文件
结构设计
1.每一个接口用例组合在一个测试类里面生成一个py文件
2.将每个用例调用的接口封装在一个测试类里面生成一个py文件
3.将测试数据存放在yml文件中通过parametrize进行参数化,实现数据驱动
4.通过allure生成测试报告
代码展示
api/api_service.py #需要测试的一类接口

''' Code description:服务相关接口 Create time: 2020/12/3 Developer: 叶修 ''' import os from common.http_requests import HttpRequests class Api_Auth_Service(object): def __init__(self): self.headers = HttpRequests().headers def api_home_service_list(self): # 首页服务列表 # url = "http://192.168.2.199:9092/v1/auth/auth_service/findAuthService" url = os.environ["host"] + "/v1/auth/auth_service/findAuthService" # 读取conftest.py文件地址进行拼接 response = HttpRequests().get(url, headers=self.headers, verify=False) # print(response.json()) return response def get_service_id(self): #获取银行卡三要素认证服务id url = "http://192.168.2.199:9092/v1/auth/auth_service/findAuthService" #url = os.environ["host"] + "/v1/auth/auth_service/findAuthService" # 读取conftest.py文件地址进行拼接 response = HttpRequests().get(url,headers=self.headers) #print(response.json()['data'][0]['service_list'][0]['id']) service_id = response.json()['data'][0]['service_list'][1]['id'] return service_id def api_service_info(self,serviceId='0b6cf45bec757afa7ee7209d30012ce1',developerId=''): #服务详情 body = { "serviceId" :serviceId, "developerId":developerId } url = "http://192.168.2.199:9092/v1/auth/auth_service/findServiceDetail" #url = os.environ["host"] + "/v1/auth/auth_service/findServiceDetail"#读取conftest.py文件地址进行拼接 response = HttpRequests().get(url,headers=self.headers,params = body,verify=False) #print(response.json()) return response def api_add_or_update_service(self,api_param_req,api_param_res,description,error_code,icon,id,interface_remarks, name,product_info,request_method,sample_code,sort,type,url): #服务添加或者更新 body={ "api_param_req": api_param_req, "api_param_res": api_param_res, "description": description, "error_code": error_code, "icon": icon, "id": id, "interface_remarks": interface_remarks, "name": name, "product_info": product_info, "request_method": request_method, "sample_code": sample_code, "sort": sort, "type": type, "url": url, } #url = "http://192.168.2.199:9092/v1/auth/auth_service/insertOrUpdateService" url = os.environ["host"] + "/v1/auth/auth_service/insertOrUpdateService" # 读取conftest.py文件地址进行拼接 response = HttpRequests().post(url,json=body,headers=self.headers,verify=False) return response def api_add_service_price(self,id,max_number,money,service_id,small_number): #服务价格添加 body = { "id": id, "max_number": max_number, "money": money, "service_id": service_id, "small_number": small_number } # url = "http://192.168.2.199:9092/v1/auth/auth_service/insertServicePrice" url = os.environ["host"] + "/v1/auth/auth_service/insertServicePrice" # 读取conftest.py文件地址进行拼接 response = HttpRequests().post(url, json=body, headers=self.headers, verify=False) return response def api_apply_service(self,developer_id,service_id): #申请服务 body ={ "developer_id": developer_id, "service_id": service_id } # url = "http://192.168.2.199:9092/v1/auth/auth_service/applyService" url = os.environ["host"] + "/v1/auth/auth_service/applyService" # 读取conftest.py文件地址进行拼接 response = HttpRequests().post(url, json=body, headers=self.headers, verify=False) return response if __name__ == '__main__': #Auth_Service().api_home_service_list() Api_Auth_Service().get_service_id() #Auth_Service().api_service_info()
api/get_token.py#获取登录token

''' Code description:获取token Create time:2020-12-03 Developer:叶修 ''' import os import urllib3 from common.http_requests import HttpRequests class Get_Token(object): def get_token(self,account='****',password='****'): #url = "http://192.168.2.199:9092/v1/auth/developer/accountLogin" url = os.environ["host"]+"/v1/auth/developer/accountLogin" body = { "account": account, "password": password, } urllib3.disable_warnings() r = HttpRequests().post(url, json=body,verify=False) #print(r.json()) token = r.json()['data']['token'] params = { "access_token": token } HttpRequests().params.update(params)#更新token到session return token if __name__ == '__main__': print(Get_Token().get_token())
case/test_service_info.py #上面接口某一测试用例

''' Code description: 服务详情 Create time: 2020/12/3 Developer: 叶修 ''' import sys import allure import pytest from common.logger import Log from common.read_yaml import ReadYaml from api.api_auth_service.api_auth_service import Api_Auth_Service testdata = ReadYaml("auth_service.yml").get_yaml_data() # 读取数据 @allure.feature('服务详情') class Test_Service_Info(object): log = Log() @pytest.mark.process @pytest.mark.parametrize('serviceId,developerId,expect', testdata['service_info'], ids=['服务详情']) def test_service_info(self,serviceId,developerId,expect): self.log.info('%s{%s}' % ((sys._getframe().f_code.co_name,'------服务详情接口-----'))) with allure.step('获取服务id'): serviceId = Api_Auth_Service().get_service_id() with allure.step('服务详情'): msg = Api_Auth_Service().api_service_info(serviceId,developerId) self.log.info('%s:%s' % ((sys._getframe().f_code.co_name, '获取请求结果:%s' % msg.json()))) # 断言 assert msg.json()["result_message"] == expect['result_message'] assert msg.json()['result_code'] == expect['result_code'] assert 'url' in msg.json()['data']
conftest.py

''' Code description:配置信息 Create time: 2020/12/3 Developer: 叶修 ''' import os import pytest from api.get_token import Get_Token from common.http_requests import HttpRequests @pytest.fixture(scope="session") def get_token(): '''前置操作获取token并传入headers''' Get_Token().get_token() if not HttpRequests().params.get("access_token", ""):#没有get到token,跳出用例 pytest.skip("未获取token跳过用例") yield HttpRequests().req HttpRequests().req.close() def pytest_addoption(parser): parser.addoption( "--cmdhost", action="store", default="http://192.168.1.54:32099", help="my option: type1 or type2" ) @pytest.fixture(scope="session",autouse=True) def host(request): '''获取命令行参数''' #获取命令行参数给到环境变量 #pytest --cmdhost 运行指定环境 os.environ["host"] = request.config.getoption("--cmdhost") print("当前用例运行测试环境:%s" % os.environ["host"])
common/connect_mysql.py

''' Code description: 配置连接数据库 Create time: 2020/12/3 Developer: 叶修 ''' import pymysql dbinfo = { "host":"******", "user":"root", "password":"******", "port":31855 } class DbConnect(): def __init__(self,db_conf,database=""): self.db_conf = db_conf #打开数据库 self.db = pymysql.connect(database = database, cursorclass = pymysql.cursors.DictCursor, **db_conf) #使用cursor()方式获取操作游标 self.cursor = self.db.cursor() def select(self,sql): #sql查询 self.cursor.execute(sql)#执行sql results = self.cursor.fetchall() return results def execute(self,sql): #sql 删除 提示 修改 try: self.cursor.execute(sql)#执行sql self.db.commit()#提交修改 except: #发生错误时回滚 self.db.rollback() def close(self): self.db.close()#关闭连接 def select_sql(select_sql): '''查询数据库''' db = DbConnect(dbinfo,database='auth_platform') result = db.select(select_sql) db.close() return result def execute_sql(sql): '''执行SQL''' db = DbConnect(dbinfo,database='auth_platform') db.execute(sql) db.close() if __name__ == '__main__': sql = "SELECT * FROM auth_platform.auth_service where name='四要素认证'" sel = select_sql(sql)[0]['name'] print(sel)
common/http_requests.py

''' Code description: 封装自己的请求类型 Create time: 2020/12/3 Developer: 叶修 ''' import requests # 定义一个HttpRequests的类 class HttpRequests(object): req = requests.session()#定义session会话 # 定义公共请求头 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36', 'cookie':'' } params = { 'access_token':'' } # 封装自己的get请求,获取资源 def get(self, url='', params='', data='', headers=None, cookies=None,stream=None,verify=None): response = self.req.get(url,params=params,data=data,headers=headers,cookies=cookies,stream=stream,verify=verify) return response # 封装自己的post方法,创建资源 def post(self, url='', params='',data='', json='', headers=None, cookies=None,stream=None,verify=None): response = self.req.post(url,params=params,data=data,json=json,headers=headers,cookies=cookies,stream=stream,verify=verify) return response # 封装自己的put方法,更新资源 def put(self, url='', params='', data='', headers=None, cookies=None,verify=None): response = self.req.put(url, params=params, data=data, headers=headers, cookies=cookies,verify=verify) return response # 封装自己的delete方法,删除资源 def delete(self, url='', params='', data='', headers=None, cookies=None,verify=None): response = self.req.delete(url, params=params, data=data, headers=headers, cookies=cookies,verify=verify) return response
common/logger.py

''' Code description: 封装输出日志文件 Create time: 2020/12/3 Developer: 叶修 ''' import logging,time import os import getpathinfo path = getpathinfo.get_path()#获取本地路径 log_path = os.path.join(path,'logs')# log_path是存放日志的路径 # 如果不存在这个logs文件夹,就自动创建一个 if not os.path.exists(log_path):os.mkdir(log_path) class Log(): def __init__(self): #文件的命名 self.logname = os.path.join(log_path,'%s.log'%time.strftime('%Y_%m_%d')) self.logger = logging.getLogger() self.logger.setLevel(logging.DEBUG) #日志输出格式 self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') def __console(self,level,message): #创建一个fileHander,用于写入本地 fh = logging.FileHandler(self.logname,'a',encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(self.formatter) self.logger.addHandler(fh) #创建一个StreamHandler,用于输入到控制台 ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(self.formatter) self.logger.addHandler(ch) if level == 'info': self.logger.info(message) elif level == 'debug': self.logger.debug(message) elif level == 'warning': self.logger.warning(message) elif level == 'error': self.logger.error(message) #避免日志重复 self.logger.removeHandler(fh) self.logger.removeHandler(ch) #关闭打开文件 fh.close() def debug(self,message): self.__console('debug',message) def info(self,message): self.__console('info',message) def warning(self,message): self.__console('warning',message) def error(self,message): self.__console('error',message) if __name__ == '__main__': log = Log() log.info('测试') log.debug('测试') log.warning('测试') log.error('测试')
common/read_save_data.py

''' Code description: 读取保存数据 Create time: 2020/12/8 Developer: 叶修 ''' import os import yaml import getpathinfo class Read_Save_Date(): def __init__(self): path = getpathinfo.get_path()#获取本地路径 self.head_img_path = os.path.join(path,'data','save_data')+"/"+'head_img_path.txt'# head_img_path文件地址 self.order_id_path = os.path.join(path, 'data', 'save_data') + "/" + 'order_id.txt' # order_id.txt文件地址 def get_head_img_path(self): # 获取head_img_path with open(self.head_img_path, "r", encoding="utf-8")as f: return f.read() def get_order_id(self): # 获取order_id with open(self.order_id_path, "r", encoding="utf-8")as f: return f.read() if __name__ == '__main__': print(Read_Save_Date().get_head_img_path()) print(Read_Save_Date().get_order_id())
common/read_yaml.py

''' Code description: 读取yml文件测试数据 Create time: 2020/12/3 Developer: 叶修 ''' import os import yaml import getpathinfo class ReadYaml(): def __init__(self,filename): path = getpathinfo.get_path()#获取本地路径 self.filepath = os.path.join(path,'data','test_data')+"/"+filename#拼接定位到data文件夹 def get_yaml_data(self): with open(self.filepath, "r", encoding="utf-8")as f: # 调用load方法加载文件流 return yaml.load(f,Loader=yaml.FullLoader) if __name__ == '__main__': data = ReadYaml("auth_service.yml").get_yaml_data()['add_or_update_service'] print(data)
data/
data/test_data/auth_service.yml

home_service_list: - [{'result_code': '0', 'result_message': '处理成功'}] service_info: - ['','',{'result_code': '0', 'result_message': '处理成功'}] add_or_update_service: - [['1','1','1','1','1','123456','1','测试','1','1','1','1','1','1'],{'result_code': '0', 'result_message': '处理成功'}] add_service_price: - ['123456789','10','0','','0',{'result_code': '0', 'result_message': '处理成功'}] apply_service: - ['','',{'result_code': '0', 'result_message': '处理成功'}]
logs/
report/
getpathinfo.py

''' Code description:配置文件路径 Create time: 2020/12/3 Developer: 叶修 ''' import os def get_path(): # 获取当前路径 curpath = os.path.dirname(os.path.realpath(__file__)) return curpath if __name__ == '__main__':# 执行该文件,测试下是否OK print('测试路径是否OK,路径为:', get_path())
pytest.ini

#pytest.ini [pytest] markers = process addopts = -p no:warnings #addopts = -v --reruns 1 --html=./report/report.html --self-contained-html #addopts = -v --reruns 1 --alluredir ./report/allure_raw #addopts = -v -s -p no:warnings --reruns 1 --pytest_report ./report/Pytest_Report.html
requirements.txt

allure-pytest==2.8.18 allure-python-commons==2.8.18 BeautifulReport==0.1.3 beautifulsoup4==4.9.3 ddt==1.4.1 Faker==4.18.0 Flask==1.1.1 httpie==1.0.3 httplib2==0.9.2 HttpRunner==1.5.8 py==1.9.0 PyMySQL==0.10.1 pytest==6.1.1 pytest-base-url==1.4.2 pytest-cov==2.10.1 pytest-forked==1.3.0 pytest-html==2.1.1 pytest-instafail==0.4.2 pytest-metadata==1.10.0 pytest-mock==3.3.1 pywin32==228 PyYAML==5.3.1 requests==2.22.0 requests-oauthlib==1.3.0 requests-toolbelt==0.9.1
run_main.py

''' Code description: 运行主流程测试用例 Create time: 2020/11/5 Developer: 叶修 ''' import os import pytest if __name__ == '__main__': pytest.main(['-m','process', '-s','--alluredir', 'report/tmp'])#-m运行mark标记文件 os.system('allure generate report/tmp -o report/html --clean') # /report/tmp 为存放报告的源文件目录