簡介
此框架是基於Python+Pytest+Requests+Allure+Yaml+Json實現全鏈路接口自動化測試。
主要流程:解析接口數據包 ->生成接口基礎配置(yml) ->生成測試用例(yaml+json) ->生成測試腳本(.py) ->運行測試(pytest) ->生成測試報告(allure)
測試流程:初始化請求 ->處理接口基礎信息 ->讀取前置接口用例 ->發送前置接口 ->處理當前接口數據 ->發送當前接口 ->檢查接口返回
接口自動化測試無非分幾大塊:測試用例設計、測試腳本編寫、測試結果校驗、測試報告生成、測試配置管理。
其中常見有幾大難點:接口之間依賴關聯、測試數據與腳本分離、測試數據參數化處理、全量自動化耗時。
而這些本框架通通已為你解決,你無須編寫任何代碼,只需要你抓取接口數據包即可。
關於接口依賴:你只要填寫前置接口相對路徑即可,如果存在數據依賴關系,此時你也僅需要填寫前置接口對應的參數值,本框架將自動為你調用和替換關聯數據。
關於測試數據:本框架采用yaml記錄接口基本信息,當請求參數和結果較大時,將單獨保存到json文件中,解決各類數據的錯綜復雜問題。
關於參數化:本框架采用常用工具使用的變量標識 ${var} ,通過正則表達式,自動檢索變量,自動為你替換變量,並且為你提供多種函數助手【$RandInt()、$GenGuid()】為你解決測試數據生成問題。
關於用例執行:本框架利用pytest擴展庫,支持多線程模式、失敗用例重試、用例模糊匹配等。
目前主要支持四種運行模式:
> 0 -不開啟自動生成測試用例功能,將直接運行測試
> 1 -根據手工編寫用例,自動生成測試腳本,然后運行測試
> 2 -根據接口抓包數據,自動生成測試用例和測試腳本,然后運行測試
> 3 -根據接口抓包數據,自動生成測試用例和測試腳本,但不運行測試
注意:目前解析僅支持(.chlsj)格式,請使用Charles工具抓包導出JSON Session File
目前支持多種函數助手(以下僅為示例,之后將單獨說明):
print('替換變量並計算表達式:', replace('$Eval(${unitCode}*1000+1)', {'unitCode': 9876543210})) print('生成1-9之間的隨機數:', replace('$RandInt(1,9)')) print('生成10位隨機字符:', replace('$RandStr(10)')) print('從列表中隨機選擇:', replace('$RandChoice(a,b,c,d)')) print('生成一個偽手機號:', replace('$GenPhone()')) print('生成一個guid:', replace('$GenGuid()')) print('生成一個偽微信ID:', replace('$GenWxid()')) print('生成一個偽身份證:', replace('$GenNoid()')) print('生成一個18歲偽身份證:', replace("$GenNoid(y-18)")) print('生成下個月今天的日期:', replace("$GenDate(m+1)")) print('生成昨天此時的時間:', replace("$GenDatetime(d-1)"))
替換變量並計算表達式: 9876543210 生成1-9之間的隨機數: 9 生成10位隨機字符: CB8512d4E6 從列表中隨機選擇: d 生成一個偽手機號: 18890688629 生成一個guid: 78A6698C-6793-11EB-8221-005056C00008 生成一個偽微信ID: AUTO9K6MRzVGfsNB4ZkIuSdXravD 生成一個偽身份證: 999577202102052043 生成一個18歲偽身份證: 953700200302056259 生成下個月今天的日期: 2021-03-05 生成昨天此時的時間: 2021-02-04 17:21:04.696745
框架流程圖
項目結構
啟動服務(startup.py)
# -*- coding:utf-8 -*- # @Time : 2021/2/1 # @Author : Leo Zhang # @File : startup.py # *********************** import os import sys import pytest import logging if __name__ == '__main__': from comm.script import writeLogs, writeCase from config import * # 開啟日志記錄(默認logs目錄) writeLogs.MyLogs(ROOT_DIR+'logs') # 判斷運行模式 if RC['auto_switch'] == 3: logging.info("根據接口抓包數據,自動生成測試用例和測試腳本,但不運行測試!") writeCase.write_case(DATA_DIR, auto_yaml=True) sys.exit(0) elif RC['auto_switch'] == 2: logging.info("根據接口抓包數據,自動生成測試用例和測試腳本,然后運行測試!") writeCase.write_case(DATA_DIR, auto_yaml=True) elif RC['auto_switch'] == 1: # 如果掃描路徑為空在則取項目page目錄 if not os.path.exists(RC['scan_dir']): RC['scan_dir'] = PAGE_DIR logging.info("根據手工編寫用例,自動生成測試腳本,然后運行測試!") writeCase.write_case(RC['scan_dir'], auto_yaml=False) else: logging.info("不開啟自動生成測試用例功能,將直接運行測試!") # 定義運行參數 args_list = ['-vs', TEST_DIR, '-n', str(RC['process']), '--reruns', str(RC['reruns']), '--maxfail', str(RC['maxfail']), '--alluredir', REPORT_DIR+'/xml', '--clean-alluredir'] # 判斷是否開啟用例匹配 if RC['pattern']: args_list += ['-k ' + str(RC['pattern'])] test_result = pytest.main(args_list) # 生成allure報告 cmd = 'allure generate --clean %s -o %s ' % (REPORT_DIR+'/xml', REPORT_DIR+'/html') os.system(cmd)
運行配置說明(runConfig.yml)
# 運行項目名 project_name: PyDemo # 運行模式: auto_switch: 2 # 0 -不開啟自動生成測試用例功能,將直接運行測試 # 1 -根據手工編寫用例,自動生成測試腳本,然后運行測試 # 2 -根據接口抓包數據,自動生成測試用例和測試腳本,然后運行測試 # 3 -根據接口抓包數據,自動生成測試用例和測試腳本,但不運行測試 # 注意:目前解析僅支持(.chlsj)格式,請使用Charles工具抓包導出JSON Session File # 掃描測試用例目錄(且僅當auto_switch=1時有用) scan_dir: # 使用模糊匹配測試用例(空則匹配所有) pattern: # 執行並發線程數(0表示不開啟) process: 0 # 失敗重試次數(0表示不重試) reruns: 0 # 本輪測試最大允許失敗數(超出則立即結束測試) maxfail: 20 # 接口調用間隔時間(s) interval: 1 # 測試結果校驗方式說明(共5種方式): # no_check:不做任何校驗 # check_code:僅校驗接口返回碼code # check_json:校驗接口返回碼code,並進行json格式比較返回結果(默認方式) # entirely_check:校驗接口返回碼code,並進行完整比較返回結果 # regular_check:校驗接口返回碼code,並進行正則匹配返回結果
測試腳本基礎模板(test_template.py)
# -*- coding:utf-8 -*- # @Time : 2021/2/2 # @Author : Leo Zhang # @File : test_template.py # **************************** import os import allure import pytest from comm.utils.readYaml import read_yaml_data from comm.unit.initializePremise import init_premise from comm.unit.apiSend import send_request from comm.unit.checkResult import check_result case_yaml = os.path.realpath(__file__).replace('testcase', 'page').replace('py', 'yaml') case_path = os.path.dirname(case_yaml) case_dict = read_yaml_data(case_yaml) @allure.feature(case_dict["test_info"]["title"]) class TestTemplate: @pytest.mark.parametrize("case_data", case_dict["test_case"]) @allure.story("test_template") def test_template(self, case_data): # 初始化請求:執行前置接口+替換關聯變量 test_info, case_data = init_premise(case_dict["test_info"], case_data, case_path) # 發送當前接口 code, data = send_request(test_info, case_data) # 校驗接口返回 check_result(case_data, code, data)
接口配置示例(apiConfig.yml)
PyDemo: # 首次會根據接口數據包生成,可自行更改或添加新配置,所有配置將作為公共關聯值 host: 10.88.88.141:20037 headers: Content-Type: application/x-www-form-urlencoded;charset=UTF-8 cookies: headtoken: xu5YwIZFkVGczMn0H0rot2ps7zRIbvrTHNwMXx1sJXg=
測試用例說明(test.yaml)- 3.12更新
# 用例基本信息 test_info: # 測試用例標題,默認截取請求地址倒數第2個字段名,在報告中作為一級目錄顯示 title: register # 請求域名,默認讀取公共關聯值,可修改 host: ${host} # 請求協議 scheme: http # 請求類型 method: POST # 請求地址 address: /api/register/findParam # 參數媒體類型 mime_type: application/x-www-form-urlencoded # 請求頭,默認讀取公共關聯值,可修改 headers: ${headers} # 超時時長(s) timeout: 10 # 是否需要上傳文件 file: false # 是否需要獲取cookie cookies: false # 是否存在前置接口,如果存在,則填寫前置接口用例相對路徑,如:/register/test_getAdultCurbactList.yaml premise: false # 測試用例,默認僅生成一個,可手動添加多個 test_case: # 用例概要,默認截取請求地址倒數第1個字段名 - summary: findParam # 用例描述,在報告中作為二級目錄顯示 describe: test_findParam # 接口請求參數,當總字符數超過200,將轉為json文件單獨存儲 parameter: params: unitCode: '3202112002' first: 0 pym: '' pageSize: 10 page: 0 headtoken: ${headtoken} # 接口檢查結果 check_body: # 檢查類型,目前支持5種,可自行修改,默認check_json,即僅檢查實際與期望結果格式是否一致 check_type: check_json # 期望接口返回碼 expected_code: 200 # 期望接口返回消息體,當總字符數超過200,將轉為json文件單獨存儲 expected_result: success: true code: msg: 返回成功 data: - '1' - '1' callTime:
測試報告示例(allure)
核心代碼(3.12新增)
script.1、接口數據包解析:依據接口數據,截取對應字段,生成測試用例yaml以及參數文件json。

# -*- coding:utf-8 -*- # @Time : 2020/10/15 # @Author : Leo Zhang # @File : writeCaseYaml.py # **************************** import os import json import logging import urllib.parse from comm.utils.readYaml import write_yaml_file, read_yaml_data from comm.utils.readJson import write_json_file from config import API_CONFIG, PROJECT_NAME def write_case_yaml(har_path): """循環讀取接口數據文件 :param har_path: Charles導出文件路徑 :return: """ case_file_list = list() logging.info("讀取抓包文件主目錄: {}".format(har_path)) har_list = os.listdir(har_path) for each in har_list: ext_name = os.path.splitext(each)[1] if ext_name == '.chlsj': logging.info("讀取抓包文件: {}".format(each)) file_path = har_path+'/'+each with open(file_path, 'r', encoding='utf-8') as f: har_cts = json.loads(f.read()) har_ct = har_cts[0] # 獲取接口基本信息 host = har_ct["host"] port = har_ct["port"] method = har_ct["method"] path = har_ct["path"] headers = har_ct["request"]["header"]['headers'] title = path.split("/")[-1].replace('-', '') module = path.split("/")[-2].replace('-', '') # 創建模塊目錄 module_path = har_path.split('data')[0] + '/page/' + module try: os.makedirs(module_path) except: pass # 定義api通過配置 api_config = dict() simp_header = dict() for header in headers: # 去除基礎請求頭 base_header = ['Host', 'Content-Length', 'User-Agent', 'Origin', 'Referer', 'Connection', 'Accept', 'Accept-Encoding', 'Accept-Language'] if header['name'] not in base_header: simp_header[header['name']] = header['value'] api_config['host'] = host+':'+str(port) # 判斷是否存在自定義消息頭 if simp_header: api_config['headers'] = simp_header else: api_config['headers'] = None api_config['cookies'] = None # 檢查是否已存在項目配置信息,沒有則寫入 rconfig = read_yaml_data(API_CONFIG) if rconfig: if PROJECT_NAME not in rconfig: rconfig[PROJECT_NAME] = api_config write_yaml_file(API_CONFIG, rconfig) else: nconfig = dict() nconfig[PROJECT_NAME] = api_config write_yaml_file(API_CONFIG, nconfig) # 定義測試信息 test_info = dict() test_info["title"] = module test_info["host"] = '${host}' test_info["scheme"] = har_ct["scheme"] test_info["method"] = method test_info["address"] = path test_info["mime_type"] = har_ct["request"]["mimeType"] test_info["headers"] = '${headers}' test_info["timeout"] = 10 test_info["file"] = False test_info["cookies"] = False test_info["premise"] = False # 解析請求報文 parameter = dict() try: if method in 'POST': parameter_list = urllib.parse.unquote(har_ct["request"]["body"]["text"]) elif method in 'PUT': parameter_list = har_ct["request"]["body"]["text"] elif method in 'DELETE': parameter_list = urllib.parse.unquote(har_ct["request"]["body"]["text"]) else: parameter_list = har_ct["query"] if "&" in parameter_list: for key in parameter_list.split("&"): val = key.split("=") parameter[val[0]] = val[1] else: parameter = json.loads(parameter_list) except Exception as e: logging.error("未找到parameter: %s" % e) raise e # 定義用例信息 test_case_list = list() test_case = dict() test_case["summary"] = title test_case["describe"] = 'test_'+title # 定義請求入參信息,且當參數字符總長度大於200時單獨寫入json文件 if len(str(parameter)) > 200: param_name = title+'_request.json' if param_name not in os.listdir(module_path): # 定義請求json param_dict = dict() param_dict["summary"] = title param_dict["body"] = parameter param_file = module_path+'/'+param_name logging.info("生成請求文件: {}".format(param_file)) write_json_file(param_file, [param_dict]) test_case["parameter"] = param_name else: test_case["parameter"] = parameter # 定義請求返回信息 response_code = har_ct["response"]["status"] response_body = har_ct["response"]["body"]["text"] check = dict() check["check_type"] = 'check_json' check["expected_code"] = response_code expected_request = json.loads(response_body) # 當返回參數字符總長度大於200時單獨寫入json文件 if len(str(expected_request)) > 200: result_name = title+'_response.json' if result_name not in os.listdir(module_path): # 定義響應json result_dict = dict() result_dict["summary"] = title result_dict["body"] = expected_request result_file = module_path + '/' + result_name logging.info("生成響應文件: {}".format(result_file)) write_json_file(result_file, [result_dict]) check["expected_result"] = result_name else: check["expected_result"] = expected_request test_case["check"] = check test_case_list.append(test_case) # 合並測試信息、用例信息 case_list = dict() case_list["test_info"] = test_info case_list["test_case"] = test_case_list # 寫入測試用例(存在則忽略) case_name = 'test_'+title+'.yaml' case_file = module_path+'/'+case_name if not os.path.exists(case_file): logging.info("生成用例文件: {}".format(case_file)) write_yaml_file(case_file, case_list) case_file_list.append(case_file) return case_file_list if __name__ == '__main__': real_path = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/') print('測試用例列表: ', write_case_yaml(real_path+'/data'))
script.2、測試腳本生成:依據測試用例,復制模板並修改相關字段。

# -*- coding:utf-8 -*- # @Time : 2020/10/15 # @Author : Leo Zhang # @File : writeCase.py # ************************ import os from config import ROOT_DIR from comm.script.writeCaseYml import write_case_yaml, read_yaml_data temp_file = ROOT_DIR+'config/test_template.py' def write_case(case_path, auto_yaml=True): """ :param case_path: 用例路徑,當auto_yaml為True時,需要傳入data目錄,否則傳入掃描目錄 :param auto_yaml: 是否自動生成yaml文件 :return: """ # 判斷是否自動生成yaml用例 if auto_yaml: yaml_list = write_case_yaml(case_path) else: yaml_list = list() file_list = os.listdir(case_path) for file in file_list: if '.yaml' in file: yaml_path = case_path+'/'+file yaml_list.append(yaml_path) # 遍歷測試用例列表 for yaml_file in yaml_list: test_data = read_yaml_data(yaml_file) test_script = yaml_file.replace('page', 'testcase').replace('.yaml', '.py') # case_name = os.path.basename(test_script).replace('.py', '') case_path = os.path.dirname(test_script) # 判斷文件路徑是否存在 if not os.path.exists(case_path): os.makedirs(case_path) # 替換模板內容 file_data = '' with open(temp_file, "r", encoding="utf-8") as f: for line in f: if 'TestTemplate' in line: title = test_data['test_info']['title'] line = line.replace('Template', title.title()) if 'test_template' in line: if '@allure.story' in line: describe = test_data['test_case'][0]['describe'] line = line.replace('test_template', describe) else: summary = test_data['test_case'][0]['summary'] line = line.replace('template', summary) file_data += line # 寫入新腳本 with open(test_script, "w", encoding="utf-8") as f: f.write(file_data) if __name__ == '__main__': real_path = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/') write_case(real_path + '/data', auto_yaml=True) # write_case(real_path+'/page/oauth', auto_yaml=False)
script.3、運行日志收集:初始化日志模塊,保存運行日志。

# -*- coding:utf-8 -*- # @Time : 2020/9/22 # @Author : Leo Zhang # @File : writeLogs.py # ************************ import logging import time import sys import os class MyLogs: def __init__(self, log_path): # 定義日志默認路徑和日志名稱 if not os.path.exists(log_path): os.makedirs(log_path) runtime = time.strftime('%Y-%m-%d', time.localtime(time.time())) logfile = os.path.join(log_path, runtime+'.log') logfile_err = os.path.join(log_path, runtime+'_error.log') # 第一步,初始化日志對象並設置日志等級 logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.handlers = [] # 第二步,創建一個handler,用於寫入debug日志文件 fh = logging.FileHandler(logfile, mode='a+') fh.setLevel(logging.DEBUG) # 第三步,創建一個handler,用於寫入error日志文件 fh_err = logging.FileHandler(logfile_err, mode='a+') fh_err.setLevel(logging.ERROR) # 第四步,再創建一個handler,用於輸出info日志到控制台 sh = logging.StreamHandler(sys.stdout) sh.setLevel(logging.INFO) # 第五步,定義handler的輸出格式 formatter = logging.Formatter("%(asctime)s - %(filename)s - %(levelname)s: %(message)s") fh.setFormatter(formatter) fh_err.setFormatter(formatter) sh.setFormatter(formatter) # 第六步,將logger添加到handler里面 logger.addHandler(fh) logger.addHandler(fh_err) logger.addHandler(sh)
unit.1、請求協議方法封裝:包括post、get、put、delete,以及cookie保存。

# -*- coding:utf-8 -*- # @Time : 2021/2/2 # @Author : Leo Zhang # @File : apiMethod.py # ************************* import os import json import random import logging import requests import simplejson from requests_toolbelt import MultipartEncoder from comm.utils.readYaml import write_yaml_file, read_yaml_data from config import API_CONFIG, PROJECT_NAME def post(headers, address, mime_type, timeout=10, data=None, files=None, cookies=None): """ post請求 :param headers: 請求頭 :param address: 請求地址 :param mime_type: 請求參數格式(form_data,raw) :param timeout: 超時時間 :param data: 請求參數 :param files: 文件路徑 :param cookies: :return: """ # 判斷請求參數類型 if 'form_data' in mime_type: for i in files: value = files[i] if '/' in value: file_parm = i files[file_parm] = (os.path.basename(value), open(value, 'rb')) enc = MultipartEncoder( fields=files, boundary='--------------' + str(random.randint(1e28, 1e29-1)) ) headers['Content-Type'] = enc.content_type response = requests.post(url=address, data=enc, headers=headers, timeout=timeout, cookies=cookies) elif 'data' in mime_type: response = requests.post(url=address, data=data, headers=headers, timeout=timeout, files=files, cookies=cookies) else: response = requests.post(url=address, json=data, headers=headers, timeout=timeout, files=files, cookies=cookies) try: if response.status_code != 200: return response.status_code, response.text else: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, None except simplejson.errors.JSONDecodeError: return response.status_code, None except Exception as e: logging.exception('ERROR') logging.error(e) raise def get(headers, address, data, timeout=8, cookies=None): """ get請求 :param headers: 請求頭 :param address: 請求地址 :param data: 請求參數 :param timeout: 超時時間 :param cookies: :return: """ response = requests.get(url=address, params=data, headers=headers, timeout=timeout, cookies=cookies) if response.status_code == 301: response = requests.get(url=response.headers["location"]) try: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, None except simplejson.errors.JSONDecodeError: return response.status_code, None except Exception as e: logging.exception('ERROR') logging.error(e) raise def put(headers, address, mime_type, timeout=8, data=None, files=None, cookies=None): """ put請求 :param headers: 請求頭 :param address: 請求地址 :param mime_type: 請求參數格式(form_data,raw) :param timeout: 超時時間 :param data: 請求參數 :param files: 文件路徑 :param cookies: :return: """ if mime_type == 'raw': data = json.dumps(data) elif mime_type == 'application/json': data = json.dumps(data) response = requests.put(url=address, data=data, headers=headers, timeout=timeout, files=files, cookies=cookies) try: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, None except simplejson.errors.JSONDecodeError: return response.status_code, None except Exception as e: logging.exception('ERROR') logging.error(e) raise def delete(headers, address, data, timeout=8, cookies=None): """ delete請求 :param headers: 請求頭 :param address: 請求地址 :param data: 請求參數 :param timeout: 超時時間 :param cookies: :return: """ response = requests.delete(url=address, params=data, headers=headers, timeout=timeout, cookies=cookies) try: return response.status_code, response.json() except json.decoder.JSONDecodeError: return response.status_code, None except simplejson.errors.JSONDecodeError: return response.status_code, None except Exception as e: logging.exception('ERROR') logging.error(e) raise def save_cookie(headers, address, mime_type, timeout=8, data=None, files=None, cookies=None): """ 保存cookie信息 :param headers: 請求頭 :param address: 請求地址 :param mime_type: 請求參數格式(form_data,raw) :param timeout: 超時時間 :param data: 請求參數 :param files: 文件路徑 :param cookies: :return: """ if 'data' in mime_type: response = requests.post(url=address, data=data, headers=headers, timeout=timeout, files=files, cookies=cookies) else: response = requests.post(url=address, json=data, headers=headers, timeout=timeout, files=files, cookies=cookies) try: cookies = response.cookies.get_dict() # 讀取api配置並寫入最新的cookie結果 aconfig = read_yaml_data(API_CONFIG) aconfig[PROJECT_NAME]['cookies'] = cookies write_yaml_file(API_CONFIG, aconfig) logging.debug("cookies已保存,結果為:{}".format(cookies)) except json.decoder.JSONDecodeError: return response.status_code, None except simplejson.errors.JSONDecodeError: return response.status_code, None except Exception as e: logging.exception('ERROR') logging.error(e) raise
unit.2、接口發送方法封裝:讀取測試用例,拼接請求信息,發送請求,返回結果。

# -*- coding:utf-8 -*- # @Time : 2021/2/2 # @Author : Leo Zhang # @File : apiSend.py # *********************** import logging import allure import time from config import INTERVAL from comm.unit import apiMethod def send_request(test_info, case_data): """ 封裝請求 :param test_info: 測試信息 :param case_data: 用例數據 :return: """ try: # 獲取用例基本信息 host = test_info["host"] scheme = test_info["scheme"] method = test_info["method"].upper() address = test_info["address"] mime_type = test_info["mime_type"] headers = test_info["headers"] cookies = test_info["cookies"] file = test_info["file"] timeout = test_info["timeout"] summary = case_data["summary"] parameter = case_data["parameter"] except Exception as e: raise KeyError('獲取用例基本信息失敗:{}'.format(e)) request_url = scheme + "://" + host + address logging.info("=" * 150) logging.info("請求接口:%s" % str(summary)) logging.info("請求地址:%s" % request_url) logging.info("請求頭: %s" % str(headers)) logging.info("請求參數: %s" % str(parameter)) # 判斷是否保存cookies if summary == 'save_cookies': with allure.step("保存cookies信息"): allure.attach(name="請求接口", body=str(summary)) allure.attach(name="請求地址", body=request_url) allure.attach(name="請求頭", body=str(headers)) allure.attach(name="請求參數", body=str(parameter)) apiMethod.save_cookie(headers=headers, address=request_url, mime_type=mime_type, data=parameter, cookies=cookies, timeout=timeout) # 判斷接口請求類型 if method == 'POST': logging.info("請求方法: POST") # 判斷是否上傳文件 if file: with allure.step("POST上傳文件"): allure.attach(name="請求接口", body=str(summary)) allure.attach(name="請求地址", body=request_url) allure.attach(name="請求頭", body=str(headers)) allure.attach(name="請求參數", body=str(parameter)) result = apiMethod.post(headers=headers, address=request_url, mime_type=mime_type, files=parameter, cookies=cookies, timeout=timeout) else: with allure.step("POST請求接口"): allure.attach(name="請求接口", body=str(summary)) allure.attach(name="請求地址", body=request_url) allure.attach(name="請求頭", body=str(headers)) allure.attach(name="請求參數", body=str(parameter)) result = apiMethod.post(headers=headers, address=request_url, mime_type=mime_type, data=parameter, cookies=cookies, timeout=timeout) elif method == 'GET': logging.info("請求方法: GET") with allure.step("GET請求接口"): allure.attach(name="請求接口", body=str(summary)) allure.attach(name="請求地址", body=request_url) allure.attach(name="請求頭", body=str(headers)) allure.attach(name="請求參數", body=str(parameter)) result = apiMethod.get(headers=headers, address=request_url, data=parameter, cookies=cookies, timeout=timeout) elif method == 'PUT': logging.info("請求方法: PUT") # 判斷是否上傳文件 if file: with allure.step("PUT上傳文件"): allure.attach(name="請求接口", body=str(summary)) allure.attach(name="請求地址", body=request_url) allure.attach(name="請求頭", body=str(headers)) allure.attach(name="請求參數", body=str(parameter)) result = apiMethod.put(headers=headers, address=request_url, mime_type=mime_type, files=parameter, cookies=cookies, timeout=timeout) else: with allure.step("PUT請求接口"): allure.attach(name="請求接口", body=str(summary)) allure.attach(name="請求地址", body=request_url) allure.attach(name="請求頭", body=str(headers)) allure.attach(name="請求參數", body=str(parameter)) result = apiMethod.put(headers=headers, address=request_url, mime_type=mime_type, data=parameter, cookies=cookies, timeout=timeout) elif method == 'DELETE': logging.info("請求方法: DELETE") with allure.step("DELETE請求接口"): allure.attach(name="請求接口", body=str(summary)) allure.attach(name="請求地址", body=request_url) allure.attach(name="請求頭", body=str(headers)) allure.attach(name="請求參數", body=str(parameter)) result = apiMethod.delete(headers=headers, address=request_url, data=parameter, cookies=cookies, timeout=timeout) else: result = {"code": None, "data": None} logging.info("請求接口結果:\n %s" % str(result)) time.sleep(INTERVAL) return result
unit.3、接口初始化處理:讀取公共關聯配置,獲取當前關聯值,執行前置接口,替換關聯值,返回用例信息。

# -*- coding:utf-8 -*- # @Time : 2021/2/3 # @Author : Leo Zhang # @File : initializePremise.py # ************************** import logging import time import json from json import JSONDecodeError from config import PAGE_DIR, PROJECT_NAME, API_CONFIG from comm.unit import apiSend, readRelevance, replaceRelevance from comm.utils import readYaml def read_json(summary, json_obj, case_path): """ 校驗內容讀取 :param summary: 用例名稱 :param json_obj: json文件或數據對象 :param case_path: case路徑 :return: """ if isinstance(json_obj, dict): return json_obj else: try: # 讀取json文件指定用例數據 with open(case_path+'/'+json_obj, "r", encoding="utf-8") as js: data_list = json.load(js) for data in data_list: if data['summary'] == summary: return data['body'] except FileNotFoundError: raise Exception("用例關聯文件不存在\n文件路徑: %s" % json_obj) except JSONDecodeError: raise Exception("用例關聯的文件有誤\n文件路徑: %s" % json_obj) def init_premise(test_info, case_data, case_path): """用例前提條件執行,提取關鍵值 :param test_info: 測試信息 :param case_data: 用例數據 :param case_path: 用例路徑 :return: """ # 獲取項目公共關聯值 aconfig = readYaml.read_yaml_data(API_CONFIG) __relevance = aconfig[PROJECT_NAME] # 處理測試信息 test_info = replaceRelevance.replace(test_info, __relevance) logging.debug("測試信息處理結果:{}".format(test_info)) # 處理Cookies if test_info['cookies']: cookies = aconfig[PROJECT_NAME]['cookies'] logging.debug("請求Cookies處理結果:{}".format(cookies)) # 判斷是否存在前置接口 pre_case_path = test_info["premise"] if pre_case_path: # 獲取前置接口用例 logging.info("獲取前置接口測試用例:{}".format(pre_case_path)) pre_case_path = PAGE_DIR + pre_case_path pre_case_dict = readYaml.read_yaml_data(pre_case_path) pre_test_info = pre_case_dict['test_info'] pre_case_data = pre_case_dict['test_case'][0] # 判斷前置接口是否也存在前置接口 if pre_test_info["premise"]: init_premise(pre_test_info, pre_case_data, pre_case_path) for i in range(3): # 處理前置接口測試信息 pre_test_info = replaceRelevance.replace(pre_test_info, __relevance) logging.debug("測試信息處理結果:{}".format(pre_test_info)) # 處理前置接口Cookies if pre_test_info['cookies']: cookies = aconfig[PROJECT_NAME]['cookies'] logging.debug("請求Cookies處理結果:{}".format(cookies)) # 處理前置接口入參:獲取入參-替換關聯值-發送請求 pre_parameter = read_json(pre_case_data['summary'], pre_case_data['parameter'], pre_case_path) pre_parameter = replaceRelevance.replace(pre_parameter, __relevance) pre_case_data['parameter'] = pre_parameter logging.debug("請求參數處理結果:{}".format(pre_parameter)) logging.info("執行前置接口測試用例:{}".format(pre_test_info)) code, data = apiSend.send_request(pre_test_info, pre_case_data) # 檢查接口是否調用成功 if data: # 處理當前接口入參:獲取入參-獲取關聯值-替換關聯值 parameter = read_json(case_data['summary'], case_data['parameter'], case_path) __relevance = readRelevance.get_relevance(data, parameter, __relevance) parameter = replaceRelevance.replace(parameter, __relevance) case_data['parameter'] = parameter logging.debug("請求參數處理結果:{}".format(parameter)) # 獲取當前接口期望結果:獲取期望結果-獲取關聯值-替換關聯值 expected_rs = read_json(case_data['summary'], case_data['check']['expected_result'], case_path) parameter['data'] = data __relevance = readRelevance.get_relevance(parameter, expected_rs, __relevance) expected_rs = replaceRelevance.replace(expected_rs, __relevance) case_data['check']['expected_result'] = expected_rs logging.debug("期望返回處理結果:{}".format(case_data)) break else: time.sleep(1) logging.error("前置接口請求失敗!等待1秒后重試!") else: logging.info("前置接口請求失敗!嘗試三次失敗!") raise Exception("獲取前置接口關聯數據失敗!") else: # 處理當前接口入參:獲取入參-獲取關聯值-替換關聯值 parameter = read_json(case_data['summary'], case_data['parameter'], case_path) parameter = replaceRelevance.replace(parameter, __relevance) case_data['parameter'] = parameter logging.debug("請求參數處理結果:{}".format(parameter)) # 獲取當前接口期望結果:獲取期望結果-獲取關聯值-替換關聯值 expected_rs = read_json(case_data['summary'], case_data['check']['expected_result'], case_path) __relevance = readRelevance.get_relevance(parameter, expected_rs, __relevance) expected_rs = replaceRelevance.replace(expected_rs, __relevance) case_data['check']['expected_result'] = expected_rs logging.debug("期望返回處理結果:{}".format(case_data)) return test_info, case_data
unit.4、獲取關聯值配置:通過正則匹配${}檢索關聯值,然后獲取消息體中的對應值。

# -*- coding:utf-8 -*- # @Time : 2021/1/8 # @Author : Leo Zhang # @File : readRelevance.py # **************************** import logging import re __relevance = "" def get_value(data, value): """獲取數據中的值 :param data: :param value: :return: """ global __relevance if isinstance(data, dict): if value in data: __relevance = data[value] else: for key in data: __relevance = get_value(data[key], value) elif isinstance(data, list): for key in data: if isinstance(key, dict): __relevance = get_value(key, value) break return __relevance def get_relevance(data, relevance_list, relevance=None): """獲取關聯鍵值對 :param data: :param relevance_list: :param relevance: :return: """ # 獲取關聯鍵列表 relevance_list = re.findall(r"\${(.*?)}", str(relevance_list)) relevance_list = list(set(relevance_list)) logging.debug("獲取關聯鍵列表:\n%s" % relevance_list) # 判斷關聯鍵和源數據是否有值 if (not data) or (not relevance_list): return relevance # 判斷是否存在其他關聯鍵對象 if not relevance: relevance = dict() # 遍歷關聯鍵 for each in relevance_list: if each in relevance: pass # # 考慮到一個關聯鍵,多個值 # if isinstance(relevance[each], list): # a = relevance[each] # a.append(relevance_value) # relevance[each] = a # else: # a = relevance[each] # b = list() # b.append(a) # b.append(relevance_value) # relevance[each] = b else: # 從結果中提取關聯鍵的值 relevance[each] = get_value(data, each) logging.debug("提取關聯鍵對象:\n%s" % relevance) return relevance
unit.5、替換關聯值:首先替換關聯值,然后替換函數助手生成數據,最后替換表達式結果。

# -*- coding:utf-8 -*- # @Time : 2020/12/09 # @Author : Leo Zhang # @File : replaceRelevance.py # **************************** import re from comm.utils.randomly import * pattern_var = r"\${(.*?)}" pattern_eval = r"\$Eval\((.*?)\)" pattern_str = r'\$RandStr\(([0-9]*?)\)' pattern_int = r'\$RandInt\(([0-9]*,[0-9]*?)\)' pattern_choice = r"\$RandChoice\((.*?)\)" pattern_float = r'\$RandFloat\(([0-9]*,[0-9]*,[0-9]*)\)' pattern_phone = r'\$GenPhone\(\)' pattern_guid = r'\$GenGuid\(\)' pattern_wxid = r'\$GenWxid\(\)' pattern_noid = r'\$GenNoid\((.*?)\)' pattern_date = r'\$GenDate\((.*?)\)' pattern_datetime = r'\$GenDatetime\((.*?)\)' def replace_pattern(pattern, value): """替換正則表達式 :param pattern: 匹配字符 :param value: 匹配值 :return: """ patterns = pattern.split('(.*?)') return ''.join([patterns[0], value, patterns[-1]]) def replace_relevance(param, relevance=None): """替換變量關聯值 :param param: 參數對象 :param relevance: 關聯對象 :return: """ result = re.findall(pattern_var, str(param)) if (not result) or (not relevance): pass else: for each in result: try: # 關聯參數多值替換 # relevance_index = 0 # if isinstance(relevance[each], list): # try: # param = re.sub(pattern, relevance[each][relevance_index], param, count=1) # relevance_index += 1 # except IndexError: # relevance_index = 0 # param = re.sub(pattern, relevance[each][relevance_index], param, count=1) # relevance_index += 1 value = relevance[each] pattern = re.compile(r'\${' + each + '}') try: param = re.sub(pattern, value, param) except TypeError: param = value except KeyError: raise KeyError('替換變量{0}失敗,未發現變量對應關聯值!\n關聯列表:{1}'.format(param, relevance)) # pass return param def replace_eval(param): """替換eval表達式結果 :param param: 參數對象 :return: """ result = re.findall(pattern_eval, str(param)) if not result: pass else: for each in result: try: if 'import' in each: raise Exception('存在非法標識import') else: value = str(eval(each)) param = re.sub(pattern_eval, value, param) except KeyError as e: raise Exception('獲取值[ % ]失敗!\n%'.format(param, e)) except SyntaxError: pass return param def replace_random(param): """替換隨機方法參數值 :param param: :return: """ int_list = re.findall(pattern_int, str(param)) str_list = re.findall(pattern_str, str(param)) choice_list = re.findall(pattern_choice, str(param)) guid_list = re.findall(pattern_guid, str(param)) noid_list = re.findall(pattern_noid, str(param)) phone_list = re.findall(pattern_phone, str(param)) wxid_list = re.findall(pattern_wxid, str(param)) date_list = re.findall(pattern_date, str(param)) datetime_list = re.findall(pattern_datetime, str(param)) if len(str_list): for each in str_list: # pattern = re.compile(r'\$RandStr\(' + each + r'\)') # param = re.sub(pattern, str(random_str(each)), param, count=1) param = re.sub(pattern_str, str(random_str(each)), param, count=1) if len(int_list): for each in int_list: param = re.sub(pattern_int, str(random_int(each)), param, count=1) if len(choice_list): for each in choice_list: param = re.sub(pattern_choice, str(random_choice(each)), param, count=1) if len(date_list): for each in date_list: param = re.sub(pattern_date, str(generate_date(each)), param, count=1) if len(datetime_list): for each in datetime_list: param = re.sub(pattern_datetime, str(generate_datetime(each)), param, count=1) if len(noid_list): for each in noid_list: param = re.sub(pattern_noid, str(generate_noid(each)), param, count=1) if len(phone_list): for i in phone_list: param = re.sub(pattern_phone, str(generate_phone()), param, count=1) if len(guid_list): for i in guid_list: param = re.sub(pattern_guid, generate_guid(), param, count=1) if len(wxid_list): for i in wxid_list: param = re.sub(pattern_wxid, generate_wxid(), param, count=1) return param def replace(param, relevance=None): """替換參數對應關聯數據 :param param: 參數對象 :param relevance: 關聯對象 :return: """ if not param: pass elif isinstance(param, dict): for key, value in param.items(): if isinstance(value, dict): param[key] = replace(value, relevance) elif isinstance(value, list): for index, sub_value in enumerate(value): param[key][index] = replace(sub_value, relevance) else: value = replace_relevance(value, relevance) value = replace_random(value) value = replace_eval(value) param[key] = value elif isinstance(param, list): for index, value in enumerate(param): param[index] = replace(value, relevance) else: param = replace_relevance(param, relevance) param = replace_random(param) param = replace_eval(param) return param if __name__ == '__main__': print('替換變量並計算表達式:', replace('$Eval(${unitCode}*1000+1)', {'unitCode': 9876543210})) print('生成1-9之間的隨機數:', replace('$RandInt(1,9)')) print('生成10位隨機字符:', replace('$RandStr(10)')) print('從列表中隨機選擇:', replace('$RandChoice(a,b,c,d)')) print('生成一個偽手機號:', replace('$GenPhone()')) print('生成一個guid:', replace('$GenGuid()')) print('生成一個偽微信ID:', replace('$GenWxid()')) print('生成一個偽身份證:', replace('$GenNoid()')) print('生成一個18歲偽身份證:', replace("$GenNoid(y-18)")) print('生成下個月今天的日期:', replace("$GenDate(m+1)")) print('生成昨天此時的時間:', replace("$GenDatetime(d-1)"))
unit.6、接口結果校驗:包括不檢查、僅檢查接口狀態碼、對比實際與期望結果格式、完全對比校驗、正則方式校驗。

# -*- coding:utf-8 -*- # @Time : 2021/2/2 # @Author : Leo Zhang # @File : checkResult.py # *************************** import re import allure import operator def check_json(src_data, dst_data): """ 校驗的json :param src_data: 檢驗內容 :param dst_data: 接口返回的數據 :return: """ if isinstance(src_data, dict): for key in src_data: if key not in dst_data: raise Exception("JSON格式校驗,關鍵字 %s 不在返回結果 %s 中!" % (key, dst_data)) else: this_key = key if isinstance(src_data[this_key], dict) and isinstance(dst_data[this_key], dict): check_json(src_data[this_key], dst_data[this_key]) elif not isinstance(src_data[this_key], type(dst_data[this_key])): raise Exception("JSON格式校驗,關鍵字 %s 返回結果 %s 與期望結果 %s 類型不符" % (this_key, src_data[this_key], dst_data[this_key])) else: pass else: raise Exception("JSON校驗內容非dict格式:{}".format(src_data)) def check_result(case_data, code, data): """ 校驗測試結果 :param case_data: 用例數據 :param code: HTTP狀態 :param data: 返回的接口json數據 :return: """ try: # 獲取用例檢查信息 check_type = case_data['check']['check_type'] expected_code = case_data['check']['expected_code'] expected_result = case_data['check']['expected_result'] except Exception as e: raise KeyError('獲取用例檢查信息失敗:{}'.format(e)) if check_type == 'no_check': with allure.step("不校驗結果"): pass elif check_type == 'check_code': with allure.step("HTTP狀態碼校驗"): allure.attach(name="實際code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='實際data', body=str(data)) if int(code) != expected_code: raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code)) elif check_type == 'check_json': with allure.step("JSON格式校驗結果"): allure.attach(name="實際code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='實際data', body=str(data)) allure.attach(name='期望data', body=str(expected_result)) if int(code) == expected_code: if not data: data = "{}" check_json(expected_result, data) else: raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code)) elif check_type == 'entirely_check': with allure.step("完全校驗結果"): allure.attach(name="實際code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='實際data', body=str(data)) allure.attach(name='期望data', body=str(expected_result)) if int(code) == expected_code: result = operator.eq(expected_result, data) if not result: raise Exception("完全校驗失敗! %s ! = %s" % (expected_result, data)) else: raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code)) elif check_type == 'regular_check': if int(code) == expected_code: try: result = "" if isinstance(expected_result, list): for i in expected_result: result = re.findall(i.replace("\"", "\""), str(data)) allure.attach('校驗完成結果\n', str(result)) else: result = re.findall(expected_result.replace("\"", "\'"), str(data)) with allure.step("正則匹配校驗結果"): allure.attach(name="實際code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='實際data', body=str(data)) allure.attach(name='期望data', body=str(expected_result).replace("\'", "\"")) allure.attach(name=expected_result.replace("\"", "\'") + '校驗完成結果', body=str(result).replace("\'", "\"")) if not result: raise Exception("正則未校驗到內容! %s" % expected_result) except KeyError: raise Exception("正則校驗執行失敗! %s\n正則表達式為空時" % expected_result) else: raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code)) else: raise Exception("無該校驗方式%s" % check_type)
utils.1、函數助手:調用不同方法來生成相關測試數據,比如生成指定長度隨機字符、生成指定日期時間、生成唯一標識guid等。

# -*- coding:utf-8 -*- # @Time : 2020/12/10 # @Author : Leo Zhang # @File : randomly.py # ************************* import string import random import datetime from dateutil.relativedelta import relativedelta def random_str(str_len): """從a-zA-Z0-9生成制定數量的隨機字符 :param str_len: 字符串長度 :return: """ try: str_len = int(str_len) except ValueError: raise Exception("調用隨機字符失敗,[ %s ]長度參數有誤!" % str_len) strings = ''.join(random.sample(string.hexdigits, +str_len)) return strings def random_int(scope): """獲取隨機整型數據 :param scope: 數據范圍 :return: """ try: start_num, end_num = scope.split(",") start_num = int(start_num) end_num = int(end_num) except ValueError: raise Exception("調用隨機整數失敗,[ %s ]范圍參數有誤!" % str(scope)) if start_num <= end_num: number = random.randint(start_num, end_num) else: number = random.randint(end_num, start_num) return number def random_float(data): """獲取隨機浮點數據 :param data: 數組 :return: """ try: start_num, end_num, accuracy = data.split(",") start_num = int(start_num) end_num = int(end_num) accuracy = int(accuracy) except ValueError: raise Exception("調用隨機浮點數失敗,[ %s ]范圍參數或精度有誤!" % data) if start_num <= end_num: number = random.uniform(start_num, end_num) else: number = random.uniform(end_num, start_num) number = round(number, accuracy) return number def random_choice(data): """獲取數組隨機值 :param data: 數組 :return: """ _list = data.split(",") each = random.choice(_list) return each def get_date_mark(now, mark, num): if 'y' == mark: return now + relativedelta(years=num) elif 'm' == mark: return now + relativedelta(months=num) elif 'd' == mark: return now + relativedelta(days=num) elif 'h' == mark: return now + relativedelta(hours=num) elif 'M' == mark: return now + relativedelta(minutes=num) elif 's' == mark: return now + relativedelta(seconds=num) else: raise Exception("日期字段標識[ %s ]錯誤, 請使用[年y,月m,日d,時h,分M,秒s]標識!" % mark) def generate_date(expr=''): """生成日期對象(不含時分秒) :param expr: 日期表達式,如"d-1"代表日期減1 :return: """ today = datetime.date.today() if expr: try: mark = expr[:1] num = int(expr[1:]) except (TypeError, NameError): raise Exception("調用生成日期失敗,日期表達式[ %s ]有誤!" % expr) return get_date_mark(today, mark, num) else: return today def generate_datetime(expr=''): """生成日期時間對象(含時分秒毫秒) :param expr: 日期表達式,如"d-1"代表日期減1 :return: """ now = datetime.datetime.now() if expr: try: mark = expr[:1] num = int(expr[1:]) except (TypeError, NameError): raise Exception("調用生成日期失敗,日期表達式[ %s ]有誤!" % expr) return get_date_mark(now, mark, num) else: return now def generate_timestamp(expr=''): """生成時間戳(13位) :param expr: 日期表達式,如"d-1"代表日期減1 :return: """ datetime_obj = generate_datetime(expr) return int(datetime.datetime.timestamp(datetime_obj)) * 1000 def generate_guid(): """基於MAC地址+時間戳+隨機數來生成GUID :param: :return: """ import uuid return str(uuid.uuid1()).upper() def generate_wxid(): """基於AUTO標識+26位英文字母大小寫+數字生成偽微信ID :param: :return: """ return 'AUTO' + ''.join(random.sample(string.ascii_letters + string.digits, 24)) def generate_noid(expr=''): """基於6位隨機數字+出生日期+4位隨機數生成偽身份證 :param expr: 日期表達式,如"d-1"代表日期減1 :return: """ birthday = generate_date(expr) birthday = str(birthday).replace('-', '') return int(str(random.randint(100000, 999999)) + birthday + str(random.randint(1000, 9999))) def generate_phone(): """基於三大運營商號段+隨機數生成偽手機號 :param: :return: """ ctcc = [133,153,173,177,180,181,189,191,193,199] cucc = [130,131,132,155,156,166,175,176,185,186,166] cmcc = [134,135,136,137,138,139,147,150,151,152,157,158,159,172,178,182,183,184,187,188,198] begin = 10 ** 7 end = 10 ** 8 - 1 prefix = random.choice(ctcc+cucc+cmcc) return str(prefix) + str(random.randint(begin, end)) if __name__ == '__main__': # 簡單隨機數據 print(random_str(16)) print(random_int("100,200")) print(random_float("200,100,5")) print(random_choice("aaa,bbb,ccc")) # 生成日期數據 print(generate_date()) print(generate_datetime()) print(generate_date('m+1')) print(generate_datetime('d+1')) print(generate_timestamp('s+100')) print(generate_noid('y-18')) # 生成常用數據 print(generate_guid()) print(generate_wxid()) print(generate_noid()) print(generate_phone())
實戰演示
1、首先環境准備:Python + Allure (這里不做詳細說明,請參考我Pytest分類博文)
接着下載項目:https://github.com/Leozhanggg/ApiTesting (方便的話給個星,不要白嫖呀,哈哈。。。)
然后加載依賴:pip install -r requirements.txt
(或者使用Pycharm打開,會自動彈出提示安裝)
2、使用Charles工具抓取接口數據包,並且導出選擇JSON Session File (.chlsj) 格式 (工具自己百度下載吧)
3、新建一個項目MyTest目錄和一個data目錄,把抓取的接口數據包放置進來,然后修改runConfig.yml項目名為MyTest
4、直接開始運行,然后你就會發現項目目錄多了很多文件,測試已經完成。。。沒錯,就是這么簡單,你還可以查看allure報告。
談談我自己
以往我使用過多種基於Python的自動化測試框架,特別是robotframework,簡單易上手,對於培養普通測試工程師比較迅速,但是優點同時也是缺點,由於RF自身局限性,會讓簡單的語法變得復雜化,
如果你不做分層處理,可能會出現一條簡單的測試用例編需要寫上百行,后期維護更是非常麻煩,
我記得有一次檢查測試工程師的自動化測試用例時,發現竟然有兩百多行,對於RF這種表格語法兩百多行你知道閱讀是多么的痛苦嘛。。。
當然這也是源於我們的項目性質,由於大數據業務,接口只是一小部分,而數據的校驗才是大頭,
並且涉及到多類數據庫,比如redis、mysql、es、hbase、solr等,而且有的接口會同時保存到多個表然后同步到多個數據庫,校驗點數不過來。。。
就這樣前前后后我帶領着幾個測試工程師改了幾版,雖然最后大大的減少了測試代碼,但是依然還是很多,
並且運行時長很難解決。所以從去年開始使用pytest測試框架,當然這也是我首次接觸pytest,第一個項目也改了幾版,但是由於純pytest編寫,
所有的東西都在一塊,改起來也比較簡單,最終的效果當然是質的提升,首先時代碼方面可以輕松的做分層處理,不會受到RF之類的框架限制,
而在執行時長方面可以采用pytest自身的多線程模塊,大大減少執行時長,同時大大提高了框架的擴展性。
而本框架源於https://github.com/wangxiaoxi3/API_service項目,加上自己實際項目實施經驗重構而來,保留了核心功能,增加了自己對接口自動化測試的理解。
特別對於關聯值處理方面,不在需要手動標記,而采用自動檢索方式,另外關於前置接口處理,也不在需要手動編寫,只需要指定前置接口相對路徑即可,並且多個用例可以嵌套。
另外在請求地址和消息頭上,不在需要手動配置,將在接口數據解析中自動篩選消息頭和請求地址,然后寫入接口公共配置中,除非有變動,否則無需做任何配置。
但是目前開發的第一版並沒有加入數據庫校驗,僅為了單純接口的自動化測試,后期將考慮加入數據庫校驗模塊。以下為本人實際項目數據庫校驗示例:
※ 如果有任何疑問可以留言,當然如果覺得寫得不錯可以收藏、推薦一下,另外github幫忙給個星!!!
作者:Leozhanggg
出處:https://www.cnblogs.com/leozhanggg/p/14373878.html
源碼:https://github.com/Leozhanggg/ApiTesting
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。