ApiTesting全鏈路接口自動化測試框架 - 初版(一)


簡介

此框架是基於Python+Pytest+Requests+Allure+Yaml+Json實現全鏈路接口自動化測試。

主要流程:解析接口數據包 ->生成接口基礎配置(yml) ->生成測試用例(yaml+json) ->生成測試腳本(.py) ->運行測試(pytest) ->生成測試報告(allure)

測試流程:初始化請求 ->處理接口基礎信息 ->讀取前置接口用例 ->發送前置接口 ->處理當前接口數據 ->發送當前接口  ->檢查接口返回

接口自動化測試無非分幾大塊:測試用例設計、測試腳本編寫、測試結果校驗、測試報告生成、測試配置管理。

其中常見有幾大難點:接口之間依賴關聯、測試數據與腳本分離、測試數據參數化處理、全量自動化耗時。

而這些本框架通通已為你解決,你無須編寫任何代碼,只需要你抓取接口數據包即可

關於接口依賴:你只要填寫前置接口相對路徑即可,如果存在數據依賴關系,此時你也僅需要填寫前置接口對應的參數值,本框架將自動為你調用和替換關聯數據。

關於測試數據:本框架采用yaml記錄接口基本信息,當請求參數和結果較大時,將單獨保存到json文件中,解決各類數據的錯綜復雜問題。

關於參數化:本框架采用常用工具使用的變量標識 ${var} ,通過正則表達式,自動檢索變量,自動為你替換變量,並且為你提供多種函數助手【$RandInt()$GenGuid()】為你解決測試數據生成問題。

關於用例執行:本框架利用pytest擴展庫,支持多線程模式、失敗用例重試、用例模糊匹配等。

目前主要支持四種運行模式:

> 0 -不開啟自動生成測試用例功能,將直接運行測試

> 1 -根據手工編寫用例,自動生成測試腳本,然后運行測試

> 2 -根據接口抓包數據,自動生成測試用例和測試腳本,然后運行測試

> 3 -根據接口抓包數據,自動生成測試用例和測試腳本,但不運行測試

注意:目前解析僅支持(.chlsj)格式,請使用Charles工具抓包導出JSON Session File

目前支持多種函數助手(以下僅為示例,之后將單獨說明):

print('替換變量並計算表達式:', replace('$Eval(${unitCode}*1000+1)', {'unitCode': 9876543210})) print('生成1-9之間的隨機數:', replace('$RandInt(1,9)')) print('生成10位隨機字符:', replace('$RandStr(10)')) print('從列表中隨機選擇:', replace('$RandChoice(a,b,c,d)')) print('生成一個偽手機號:', replace('$GenPhone()')) print('生成一個guid:', replace('$GenGuid()')) print('生成一個偽微信ID:', replace('$GenWxid()')) print('生成一個偽身份證:', replace('$GenNoid()')) print('生成一個18歲偽身份證:', replace("$GenNoid(y-18)")) print('生成下個月今天的日期:', replace("$GenDate(m+1)")) print('生成昨天此時的時間:', replace("$GenDatetime(d-1)"))
替換變量並計算表達式: 9876543210 生成1-9之間的隨機數: 9 生成10位隨機字符: CB8512d4E6 從列表中隨機選擇: d 生成一個偽手機號: 18890688629 生成一個guid: 78A6698C-6793-11EB-8221-005056C00008 生成一個偽微信ID: AUTO9K6MRzVGfsNB4ZkIuSdXravD 生成一個偽身份證: 999577202102052043 生成一個18歲偽身份證: 953700200302056259 生成下個月今天的日期: 2021-03-05 生成昨天此時的時間: 2021-02-04 17:21:04.696745

 


框架流程圖

 


項目結構

 


啟動服務(startup.py)

# -*- coding:utf-8 -*-
# @Time    : 2021/2/1
# @Author  : Leo Zhang
# @File    : startup.py
# ***********************
import os
import sys
import pytest
import logging


if __name__ == '__main__':
    from comm.script import writeLogs, writeCase
    from config import *

    # 開啟日志記錄(默認logs目錄)
    writeLogs.MyLogs(ROOT_DIR+'logs')

    # 判斷運行模式
    if RC['auto_switch'] == 3:
        logging.info("根據接口抓包數據,自動生成測試用例和測試腳本,但不運行測試!")
        writeCase.write_case(DATA_DIR, auto_yaml=True)
        sys.exit(0)

    elif RC['auto_switch'] == 2:
        logging.info("根據接口抓包數據,自動生成測試用例和測試腳本,然后運行測試!")
        writeCase.write_case(DATA_DIR, auto_yaml=True)

    elif RC['auto_switch'] == 1:
        # 如果掃描路徑為空在則取項目page目錄
        if not os.path.exists(RC['scan_dir']):
            RC['scan_dir'] = PAGE_DIR
        logging.info("根據手工編寫用例,自動生成測試腳本,然后運行測試!")
        writeCase.write_case(RC['scan_dir'], auto_yaml=False)

    else:
        logging.info("不開啟自動生成測試用例功能,將直接運行測試!")

    # 定義運行參數
    args_list = ['-vs', TEST_DIR,
                 '-n', str(RC['process']),
                 '--reruns', str(RC['reruns']),
                 '--maxfail', str(RC['maxfail']),
                 '--alluredir', REPORT_DIR+'/xml',
                 '--clean-alluredir']
    # 判斷是否開啟用例匹配
    if RC['pattern']:
        args_list += ['-k ' + str(RC['pattern'])]
    test_result = pytest.main(args_list)

    # 生成allure報告
    cmd = 'allure generate --clean %s -o %s ' % (REPORT_DIR+'/xml', REPORT_DIR+'/html')
    os.system(cmd)

運行配置說明(runConfig.yml)

# 運行項目名
project_name: PyDemo

# 運行模式:
auto_switch: 2
# 0 -不開啟自動生成測試用例功能,將直接運行測試
# 1 -根據手工編寫用例,自動生成測試腳本,然后運行測試
# 2 -根據接口抓包數據,自動生成測試用例和測試腳本,然后運行測試
# 3 -根據接口抓包數據,自動生成測試用例和測試腳本,但不運行測試
# 注意:目前解析僅支持(.chlsj)格式,請使用Charles工具抓包導出JSON Session File

# 掃描測試用例目錄(且僅當auto_switch=1時有用)
scan_dir:

# 使用模糊匹配測試用例(空則匹配所有)
pattern:

# 執行並發線程數(0表示不開啟)
process: 0

# 失敗重試次數(0表示不重試)
reruns: 0

# 本輪測試最大允許失敗數(超出則立即結束測試)
maxfail: 20

# 接口調用間隔時間(s)
interval: 1

# 測試結果校驗方式說明(共5種方式):
# no_check:不做任何校驗
# check_code:僅校驗接口返回碼code
# check_json:校驗接口返回碼code,並進行json格式比較返回結果(默認方式)
# entirely_check:校驗接口返回碼code,並進行完整比較返回結果
# regular_check:校驗接口返回碼code,並進行正則匹配返回結果

測試腳本基礎模板(test_template.py)

# -*- coding:utf-8 -*-
# @Time    : 2021/2/2
# @Author  : Leo Zhang
# @File    : test_template.py
# ****************************
import os
import allure
import pytest
from comm.utils.readYaml import read_yaml_data
from comm.unit.initializePremise import init_premise
from comm.unit.apiSend import send_request
from comm.unit.checkResult import check_result
case_yaml = os.path.realpath(__file__).replace('testcase', 'page').replace('py', 'yaml')
case_path = os.path.dirname(case_yaml)
case_dict = read_yaml_data(case_yaml)


@allure.feature(case_dict["test_info"]["title"])
class TestTemplate:

    @pytest.mark.parametrize("case_data", case_dict["test_case"])
    @allure.story("test_template")
    def test_template(self, case_data):
        # 初始化請求:執行前置接口+替換關聯變量
        test_info, case_data = init_premise(case_dict["test_info"], case_data, case_path)
        # 發送當前接口
        code, data = send_request(test_info, case_data)
        # 校驗接口返回
        check_result(case_data, code, data)

接口配置示例(apiConfig.yml) 

PyDemo:
  # 首次會根據接口數據包生成,可自行更改或添加新配置,所有配置將作為公共關聯值
  host: 10.88.88.141:20037
  headers:
    Content-Type: application/x-www-form-urlencoded;charset=UTF-8
  cookies:
  headtoken: xu5YwIZFkVGczMn0H0rot2ps7zRIbvrTHNwMXx1sJXg=

測試用例說明(test.yaml)- 3.12更新

# 用例基本信息
test_info:
  # 測試用例標題,默認截取請求地址倒數第2個字段名,在報告中作為一級目錄顯示
  title: register
  # 請求域名,默認讀取公共關聯值,可修改
  host: ${host}
  # 請求協議
  scheme: http
  # 請求類型
  method: POST
  # 請求地址
  address: /api/register/findParam
  # 參數媒體類型
  mime_type: application/x-www-form-urlencoded
  # 請求頭,默認讀取公共關聯值,可修改
  headers: ${headers}
  # 超時時長(s)
  timeout: 10
  # 是否需要上傳文件
  file: false
  # 是否需要獲取cookie
  cookies: false
  # 是否存在前置接口,如果存在,則填寫前置接口用例相對路徑,如:/register/test_getAdultCurbactList.yaml
  premise: false
  
# 測試用例,默認僅生成一個,可手動添加多個
test_case:
  # 用例概要,默認截取請求地址倒數第1個字段名
- summary: findParam
  # 用例描述,在報告中作為二級目錄顯示
  describe: test_findParam
  # 接口請求參數,當總字符數超過200,將轉為json文件單獨存儲
  parameter:
    params:
      unitCode: '3202112002'
      first: 0
      pym: ''
      pageSize: 10
      page: 0
    headtoken: ${headtoken}
  # 接口檢查結果
  check_body:
    # 檢查類型,目前支持5種,可自行修改,默認check_json,即僅檢查實際與期望結果格式是否一致
    check_type: check_json
    # 期望接口返回碼
    expected_code: 200
    # 期望接口返回消息體,當總字符數超過200,將轉為json文件單獨存儲
    expected_result:
      success: true
      code:
      msg: 返回成功
      data:
      - '1'
      - '1'
      callTime:

測試報告示例(allure)

 


 核心代碼(3.12新增)

script.1、接口數據包解析:依據接口數據,截取對應字段,生成測試用例yaml以及參數文件json。

# -*- coding:utf-8 -*-
# @Time    : 2020/10/15
# @Author  : Leo Zhang
# @File    : writeCaseYaml.py
# ****************************
import os
import json
import logging
import urllib.parse
from comm.utils.readYaml import write_yaml_file, read_yaml_data
from comm.utils.readJson import write_json_file
from config import API_CONFIG, PROJECT_NAME


def write_case_yaml(har_path):
    """循環讀取接口數據文件

    :param har_path: Charles導出文件路徑
    :return:
    """
    case_file_list = list()
    logging.info("讀取抓包文件主目錄: {}".format(har_path))
    har_list = os.listdir(har_path)
    for each in har_list:
        ext_name = os.path.splitext(each)[1]
        if ext_name == '.chlsj':

            logging.info("讀取抓包文件: {}".format(each))
            file_path = har_path+'/'+each
            with open(file_path, 'r', encoding='utf-8') as f:
                har_cts = json.loads(f.read())
                har_ct = har_cts[0]

                # 獲取接口基本信息
                host = har_ct["host"]
                port = har_ct["port"]
                method = har_ct["method"]
                path = har_ct["path"]
                headers = har_ct["request"]["header"]['headers']
                title = path.split("/")[-1].replace('-', '')
                module = path.split("/")[-2].replace('-', '')

                # 創建模塊目錄
                module_path = har_path.split('data')[0] + '/page/' + module
                try:
                    os.makedirs(module_path)
                except:
                    pass

                # 定義api通過配置
                api_config = dict()
                simp_header = dict()
                for header in headers:
                    # 去除基礎請求頭
                    base_header = ['Host',
                                   'Content-Length',
                                   'User-Agent',
                                   'Origin',
                                   'Referer',
                                   'Connection',
                                   'Accept',
                                   'Accept-Encoding',
                                   'Accept-Language']
                    if header['name'] not in base_header:
                        simp_header[header['name']] = header['value']
                api_config['host'] = host+':'+str(port)
                # 判斷是否存在自定義消息頭
                if simp_header:
                    api_config['headers'] = simp_header
                else:
                    api_config['headers'] = None
                api_config['cookies'] = None
                # 檢查是否已存在項目配置信息,沒有則寫入
                rconfig = read_yaml_data(API_CONFIG)
                if rconfig:
                    if PROJECT_NAME not in rconfig:
                        rconfig[PROJECT_NAME] = api_config
                        write_yaml_file(API_CONFIG, rconfig)
                else:
                    nconfig = dict()
                    nconfig[PROJECT_NAME] = api_config
                    write_yaml_file(API_CONFIG, nconfig)

                # 定義測試信息
                test_info = dict()
                test_info["title"] = module
                test_info["host"] = '${host}'
                test_info["scheme"] = har_ct["scheme"]
                test_info["method"] = method
                test_info["address"] = path
                test_info["mime_type"] = har_ct["request"]["mimeType"]
                test_info["headers"] = '${headers}'
                test_info["timeout"] = 10
                test_info["file"] = False
                test_info["cookies"] = False
                test_info["premise"] = False

                # 解析請求報文
                parameter = dict()
                try:
                    if method in 'POST':
                        parameter_list = urllib.parse.unquote(har_ct["request"]["body"]["text"])
                    elif method in 'PUT':
                        parameter_list = har_ct["request"]["body"]["text"]
                    elif method in 'DELETE':
                        parameter_list = urllib.parse.unquote(har_ct["request"]["body"]["text"])
                    else:
                        parameter_list = har_ct["query"]

                    if "&" in parameter_list:
                        for key in parameter_list.split("&"):
                            val = key.split("=")
                            parameter[val[0]] = val[1]
                    else:
                        parameter = json.loads(parameter_list)
                except Exception as e:
                    logging.error("未找到parameter: %s" % e)
                    raise e

                # 定義用例信息
                test_case_list = list()
                test_case = dict()
                test_case["summary"] = title
                test_case["describe"] = 'test_'+title

                # 定義請求入參信息,且當參數字符總長度大於200時單獨寫入json文件
                if len(str(parameter)) > 200:
                    param_name = title+'_request.json'
                    if param_name not in os.listdir(module_path):
                        # 定義請求json
                        param_dict = dict()
                        param_dict["summary"] = title
                        param_dict["body"] = parameter
                        param_file = module_path+'/'+param_name
                        logging.info("生成請求文件: {}".format(param_file))
                        write_json_file(param_file, [param_dict])
                    test_case["parameter"] = param_name
                else:
                    test_case["parameter"] = parameter

                # 定義請求返回信息
                response_code = har_ct["response"]["status"]
                response_body = har_ct["response"]["body"]["text"]
                check = dict()
                check["check_type"] = 'check_json'
                check["expected_code"] = response_code
                expected_request = json.loads(response_body)

                # 當返回參數字符總長度大於200時單獨寫入json文件
                if len(str(expected_request)) > 200:
                    result_name = title+'_response.json'
                    if result_name not in os.listdir(module_path):
                        # 定義響應json
                        result_dict = dict()
                        result_dict["summary"] = title
                        result_dict["body"] = expected_request
                        result_file = module_path + '/' + result_name
                        logging.info("生成響應文件: {}".format(result_file))
                        write_json_file(result_file, [result_dict])
                    check["expected_result"] = result_name
                else:
                    check["expected_result"] = expected_request
                test_case["check"] = check
                test_case_list.append(test_case)

                # 合並測試信息、用例信息
                case_list = dict()
                case_list["test_info"] = test_info
                case_list["test_case"] = test_case_list

                # 寫入測試用例(存在則忽略)
                case_name = 'test_'+title+'.yaml'
                case_file = module_path+'/'+case_name
                if not os.path.exists(case_file):
                    logging.info("生成用例文件: {}".format(case_file))
                    write_yaml_file(case_file, case_list)

                case_file_list.append(case_file)
    return case_file_list


if __name__ == '__main__':
    real_path = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/')
    print('測試用例列表: ', write_case_yaml(real_path+'/data'))
writeCaseYaml.py

script.2、測試腳本生成:依據測試用例,復制模板並修改相關字段。

# -*- coding:utf-8 -*-
# @Time    : 2020/10/15
# @Author  : Leo Zhang
# @File    : writeCase.py
# ************************
import os
from config import ROOT_DIR
from comm.script.writeCaseYml import write_case_yaml, read_yaml_data
temp_file = ROOT_DIR+'config/test_template.py'


def write_case(case_path, auto_yaml=True):
    """

    :param case_path: 用例路徑,當auto_yaml為True時,需要傳入data目錄,否則傳入掃描目錄
    :param auto_yaml: 是否自動生成yaml文件
    :return:
    """
    # 判斷是否自動生成yaml用例
    if auto_yaml:
        yaml_list = write_case_yaml(case_path)
    else:
        yaml_list = list()
        file_list = os.listdir(case_path)
        for file in file_list:
            if '.yaml' in file:
                yaml_path = case_path+'/'+file
                yaml_list.append(yaml_path)

    # 遍歷測試用例列表
    for yaml_file in yaml_list:
        test_data = read_yaml_data(yaml_file)
        test_script = yaml_file.replace('page', 'testcase').replace('.yaml', '.py')
        # case_name = os.path.basename(test_script).replace('.py', '')
        case_path = os.path.dirname(test_script)
        # 判斷文件路徑是否存在
        if not os.path.exists(case_path):
            os.makedirs(case_path)

        # 替換模板內容
        file_data = ''
        with open(temp_file, "r", encoding="utf-8") as f:
            for line in f:
                if 'TestTemplate' in line:
                    title = test_data['test_info']['title']
                    line = line.replace('Template', title.title())
                if 'test_template' in line:
                    if '@allure.story' in line:
                        describe = test_data['test_case'][0]['describe']
                        line = line.replace('test_template', describe)
                    else:
                        summary = test_data['test_case'][0]['summary']
                        line = line.replace('template', summary)
                file_data += line

        # 寫入新腳本
        with open(test_script, "w", encoding="utf-8") as f:
            f.write(file_data)


if __name__ == '__main__':
    real_path = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/')
    write_case(real_path + '/data', auto_yaml=True)
    # write_case(real_path+'/page/oauth', auto_yaml=False)
writeCase.py

script.3、運行日志收集:初始化日志模塊,保存運行日志。

# -*- coding:utf-8 -*-
# @Time    : 2020/9/22
# @Author  : Leo Zhang
# @File    : writeLogs.py
# ************************
import logging
import time
import sys
import os


class MyLogs:

    def __init__(self, log_path):
        # 定義日志默認路徑和日志名稱
        if not os.path.exists(log_path):
            os.makedirs(log_path)
        runtime = time.strftime('%Y-%m-%d', time.localtime(time.time()))
        logfile = os.path.join(log_path, runtime+'.log')
        logfile_err = os.path.join(log_path, runtime+'_error.log')

        # 第一步,初始化日志對象並設置日志等級
        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)
        logger.handlers = []

        # 第二步,創建一個handler,用於寫入debug日志文件
        fh = logging.FileHandler(logfile, mode='a+')
        fh.setLevel(logging.DEBUG)

        # 第三步,創建一個handler,用於寫入error日志文件
        fh_err = logging.FileHandler(logfile_err, mode='a+')
        fh_err.setLevel(logging.ERROR)

        # 第四步,再創建一個handler,用於輸出info日志到控制台
        sh = logging.StreamHandler(sys.stdout)
        sh.setLevel(logging.INFO)

        # 第五步,定義handler的輸出格式
        formatter = logging.Formatter("%(asctime)s - %(filename)s - %(levelname)s: %(message)s")
        fh.setFormatter(formatter)
        fh_err.setFormatter(formatter)
        sh.setFormatter(formatter)

        # 第六步,將logger添加到handler里面
        logger.addHandler(fh)
        logger.addHandler(fh_err)
        logger.addHandler(sh)
writeLogs.py

unit.1、請求協議方法封裝:包括post、get、put、delete,以及cookie保存。

# -*- coding:utf-8 -*-
# @Time    : 2021/2/2
# @Author  : Leo Zhang
# @File    : apiMethod.py
# *************************
import os
import json
import random
import logging
import requests
import simplejson
from requests_toolbelt import MultipartEncoder
from comm.utils.readYaml import write_yaml_file, read_yaml_data
from config import API_CONFIG, PROJECT_NAME


def post(headers, address, mime_type, timeout=10, data=None, files=None, cookies=None):
    """
    post請求
    :param headers: 請求頭
    :param address: 請求地址
    :param mime_type: 請求參數格式(form_data,raw)
    :param timeout: 超時時間
    :param data: 請求參數
    :param files: 文件路徑
    :param cookies:
    :return:
    """
    # 判斷請求參數類型
    if 'form_data' in mime_type:
        for i in files:
            value = files[i]
            if '/' in value:
                file_parm = i
                files[file_parm] = (os.path.basename(value), open(value, 'rb'))
        enc = MultipartEncoder(
            fields=files,
            boundary='--------------' + str(random.randint(1e28, 1e29-1))
        )
        headers['Content-Type'] = enc.content_type
        response = requests.post(url=address,
                                 data=enc,
                                 headers=headers,
                                 timeout=timeout,
                                 cookies=cookies)
    elif 'data' in mime_type:
        response = requests.post(url=address,
                                 data=data,
                                 headers=headers,
                                 timeout=timeout,
                                 files=files,
                                 cookies=cookies)
    else:
        response = requests.post(url=address,
                                 json=data,
                                 headers=headers,
                                 timeout=timeout,
                                 files=files,
                                 cookies=cookies)
    try:
        if response.status_code != 200:
            return response.status_code, response.text
        else:
            return response.status_code, response.json()
    except json.decoder.JSONDecodeError:
        return response.status_code, None
    except simplejson.errors.JSONDecodeError:
        return response.status_code, None
    except Exception as e:
        logging.exception('ERROR')
        logging.error(e)
        raise


def get(headers, address, data, timeout=8, cookies=None):
    """
    get請求
    :param headers: 請求頭
    :param address: 請求地址
    :param data: 請求參數
    :param timeout: 超時時間
    :param cookies:
    :return:
    """
    response = requests.get(url=address,
                            params=data,
                            headers=headers,
                            timeout=timeout,
                            cookies=cookies)
    if response.status_code == 301:
        response = requests.get(url=response.headers["location"])
    try:
        return response.status_code, response.json()
    except json.decoder.JSONDecodeError:
        return response.status_code, None
    except simplejson.errors.JSONDecodeError:
        return response.status_code, None
    except Exception as e:
        logging.exception('ERROR')
        logging.error(e)
        raise


def put(headers, address, mime_type, timeout=8, data=None, files=None, cookies=None):
    """
    put請求
    :param headers: 請求頭
    :param address: 請求地址
    :param mime_type: 請求參數格式(form_data,raw)
    :param timeout: 超時時間
    :param data: 請求參數
    :param files: 文件路徑
    :param cookies:
    :return:
    """
    if mime_type == 'raw':
        data = json.dumps(data)
    elif mime_type == 'application/json':
        data = json.dumps(data)
    response = requests.put(url=address,
                            data=data,
                            headers=headers,
                            timeout=timeout,
                            files=files,
                            cookies=cookies)
    try:
        return response.status_code, response.json()
    except json.decoder.JSONDecodeError:
        return response.status_code, None
    except simplejson.errors.JSONDecodeError:
        return response.status_code, None
    except Exception as e:
        logging.exception('ERROR')
        logging.error(e)
        raise


def delete(headers, address, data, timeout=8, cookies=None):
    """
    delete請求
    :param headers: 請求頭
    :param address: 請求地址
    :param data: 請求參數
    :param timeout: 超時時間
    :param cookies:
    :return:
    """
    response = requests.delete(url=address,
                               params=data,
                               headers=headers,
                               timeout=timeout,
                               cookies=cookies)
    try:
        return response.status_code, response.json()
    except json.decoder.JSONDecodeError:
        return response.status_code, None
    except simplejson.errors.JSONDecodeError:
        return response.status_code, None
    except Exception as e:
        logging.exception('ERROR')
        logging.error(e)
        raise


def save_cookie(headers, address, mime_type, timeout=8, data=None, files=None, cookies=None):
    """
    保存cookie信息
    :param headers: 請求頭
    :param address: 請求地址
    :param mime_type: 請求參數格式(form_data,raw)
    :param timeout: 超時時間
    :param data: 請求參數
    :param files: 文件路徑
    :param cookies:
    :return:
    """
    if 'data' in mime_type:
        response = requests.post(url=address,
                                 data=data,
                                 headers=headers,
                                 timeout=timeout,
                                 files=files,
                                 cookies=cookies)
    else:
        response = requests.post(url=address,
                                 json=data,
                                 headers=headers,
                                 timeout=timeout,
                                 files=files,
                                 cookies=cookies)
    try:
        cookies = response.cookies.get_dict()
        # 讀取api配置並寫入最新的cookie結果
        aconfig = read_yaml_data(API_CONFIG)
        aconfig[PROJECT_NAME]['cookies'] = cookies
        write_yaml_file(API_CONFIG, aconfig)
        logging.debug("cookies已保存,結果為:{}".format(cookies))
    except json.decoder.JSONDecodeError:
        return response.status_code, None
    except simplejson.errors.JSONDecodeError:
        return response.status_code, None
    except Exception as e:
        logging.exception('ERROR')
        logging.error(e)
        raise
apiMethod.py

unit.2、接口發送方法封裝:讀取測試用例,拼接請求信息,發送請求,返回結果。

# -*- coding:utf-8 -*-
# @Time    : 2021/2/2
# @Author  : Leo Zhang
# @File    : apiSend.py
# ***********************
import logging
import allure
import time
from config import INTERVAL
from comm.unit import apiMethod


def send_request(test_info, case_data):
    """
    封裝請求
    :param test_info: 測試信息
    :param case_data: 用例數據
    :return:
    """
    try:
        # 獲取用例基本信息
        host = test_info["host"]
        scheme = test_info["scheme"]
        method = test_info["method"].upper()
        address = test_info["address"]
        mime_type = test_info["mime_type"]
        headers = test_info["headers"]
        cookies = test_info["cookies"]
        file = test_info["file"]
        timeout = test_info["timeout"]
        summary = case_data["summary"]
        parameter = case_data["parameter"]
    except Exception as e:
        raise KeyError('獲取用例基本信息失敗:{}'.format(e))

    request_url = scheme + "://" + host + address
    logging.info("=" * 150)
    logging.info("請求接口:%s" % str(summary))
    logging.info("請求地址:%s" % request_url)
    logging.info("請求頭: %s" % str(headers))
    logging.info("請求參數: %s" % str(parameter))

    # 判斷是否保存cookies
    if summary == 'save_cookies':
        with allure.step("保存cookies信息"):
            allure.attach(name="請求接口", body=str(summary))
            allure.attach(name="請求地址", body=request_url)
            allure.attach(name="請求頭", body=str(headers))
            allure.attach(name="請求參數", body=str(parameter))
            apiMethod.save_cookie(headers=headers,
                                  address=request_url,
                                  mime_type=mime_type,
                                  data=parameter,
                                  cookies=cookies,
                                  timeout=timeout)
    # 判斷接口請求類型
    if method == 'POST':
        logging.info("請求方法: POST")
        # 判斷是否上傳文件
        if file:
            with allure.step("POST上傳文件"):
                allure.attach(name="請求接口", body=str(summary))
                allure.attach(name="請求地址", body=request_url)
                allure.attach(name="請求頭", body=str(headers))
                allure.attach(name="請求參數", body=str(parameter))
            result = apiMethod.post(headers=headers,
                                    address=request_url,
                                    mime_type=mime_type,
                                    files=parameter,
                                    cookies=cookies,
                                    timeout=timeout)
        else:
            with allure.step("POST請求接口"):
                allure.attach(name="請求接口", body=str(summary))
                allure.attach(name="請求地址", body=request_url)
                allure.attach(name="請求頭", body=str(headers))
                allure.attach(name="請求參數", body=str(parameter))
            result = apiMethod.post(headers=headers,
                                    address=request_url,
                                    mime_type=mime_type,
                                    data=parameter,
                                    cookies=cookies,
                                    timeout=timeout)
    elif method == 'GET':
        logging.info("請求方法: GET")
        with allure.step("GET請求接口"):
            allure.attach(name="請求接口", body=str(summary))
            allure.attach(name="請求地址", body=request_url)
            allure.attach(name="請求頭", body=str(headers))
            allure.attach(name="請求參數", body=str(parameter))
        result = apiMethod.get(headers=headers,
                               address=request_url,
                               data=parameter,
                               cookies=cookies,
                               timeout=timeout)
    elif method == 'PUT':
        logging.info("請求方法: PUT")
        # 判斷是否上傳文件
        if file:
            with allure.step("PUT上傳文件"):
                allure.attach(name="請求接口", body=str(summary))
                allure.attach(name="請求地址", body=request_url)
                allure.attach(name="請求頭", body=str(headers))
                allure.attach(name="請求參數", body=str(parameter))
            result = apiMethod.put(headers=headers,
                                   address=request_url,
                                   mime_type=mime_type,
                                   files=parameter,
                                   cookies=cookies,
                                   timeout=timeout)
        else:
            with allure.step("PUT請求接口"):
                allure.attach(name="請求接口", body=str(summary))
                allure.attach(name="請求地址", body=request_url)
                allure.attach(name="請求頭", body=str(headers))
                allure.attach(name="請求參數", body=str(parameter))
            result = apiMethod.put(headers=headers,
                                   address=request_url,
                                   mime_type=mime_type,
                                   data=parameter,
                                   cookies=cookies,
                                   timeout=timeout)
    elif method == 'DELETE':
        logging.info("請求方法: DELETE")
        with allure.step("DELETE請求接口"):
            allure.attach(name="請求接口", body=str(summary))
            allure.attach(name="請求地址", body=request_url)
            allure.attach(name="請求頭", body=str(headers))
            allure.attach(name="請求參數", body=str(parameter))
        result = apiMethod.delete(headers=headers,
                                  address=request_url,
                                  data=parameter,
                                  cookies=cookies,
                                  timeout=timeout)
    else:
        result = {"code": None, "data": None}
    logging.info("請求接口結果:\n %s" % str(result))
    time.sleep(INTERVAL)
    return result
apiSend.py

unit.3、接口初始化處理:讀取公共關聯配置,獲取當前關聯值,執行前置接口,替換關聯值,返回用例信息。

# -*- coding:utf-8 -*-
# @Time    : 2021/2/3
# @Author  : Leo Zhang
# @File    : initializePremise.py
# **************************
import logging
import time
import json
from json import JSONDecodeError
from config import PAGE_DIR, PROJECT_NAME, API_CONFIG
from comm.unit import apiSend, readRelevance, replaceRelevance
from comm.utils import readYaml


def read_json(summary, json_obj, case_path):
    """
    校驗內容讀取
    :param summary: 用例名稱
    :param json_obj: json文件或數據對象
    :param case_path: case路徑
    :return:
    """
    if isinstance(json_obj, dict):
        return json_obj
    else:
        try:
            # 讀取json文件指定用例數據
            with open(case_path+'/'+json_obj, "r", encoding="utf-8") as js:
                data_list = json.load(js)
                for data in data_list:
                    if data['summary'] == summary:
                        return data['body']
        except FileNotFoundError:
            raise Exception("用例關聯文件不存在\n文件路徑: %s" % json_obj)
        except JSONDecodeError:
            raise Exception("用例關聯的文件有誤\n文件路徑: %s" % json_obj)


def init_premise(test_info, case_data, case_path):
    """用例前提條件執行,提取關鍵值

    :param test_info: 測試信息
    :param case_data: 用例數據
    :param case_path: 用例路徑
    :return:
    """
    # 獲取項目公共關聯值
    aconfig = readYaml.read_yaml_data(API_CONFIG)
    __relevance = aconfig[PROJECT_NAME]
    # 處理測試信息
    test_info = replaceRelevance.replace(test_info, __relevance)
    logging.debug("測試信息處理結果:{}".format(test_info))
    # 處理Cookies
    if test_info['cookies']:
        cookies = aconfig[PROJECT_NAME]['cookies']
        logging.debug("請求Cookies處理結果:{}".format(cookies))

    # 判斷是否存在前置接口
    pre_case_path = test_info["premise"]
    if pre_case_path:
        # 獲取前置接口用例
        logging.info("獲取前置接口測試用例:{}".format(pre_case_path))
        pre_case_path = PAGE_DIR + pre_case_path
        pre_case_dict = readYaml.read_yaml_data(pre_case_path)
        pre_test_info = pre_case_dict['test_info']
        pre_case_data = pre_case_dict['test_case'][0]
        # 判斷前置接口是否也存在前置接口
        if pre_test_info["premise"]:
            init_premise(pre_test_info, pre_case_data, pre_case_path)

        for i in range(3):
            # 處理前置接口測試信息
            pre_test_info = replaceRelevance.replace(pre_test_info, __relevance)
            logging.debug("測試信息處理結果:{}".format(pre_test_info))
            # 處理前置接口Cookies
            if pre_test_info['cookies']:
                cookies = aconfig[PROJECT_NAME]['cookies']
                logging.debug("請求Cookies處理結果:{}".format(cookies))
            # 處理前置接口入參:獲取入參-替換關聯值-發送請求
            pre_parameter = read_json(pre_case_data['summary'], pre_case_data['parameter'], pre_case_path)
            pre_parameter = replaceRelevance.replace(pre_parameter, __relevance)
            pre_case_data['parameter'] = pre_parameter
            logging.debug("請求參數處理結果:{}".format(pre_parameter))
            logging.info("執行前置接口測試用例:{}".format(pre_test_info))
            code, data = apiSend.send_request(pre_test_info, pre_case_data)

            # 檢查接口是否調用成功
            if data:
                # 處理當前接口入參:獲取入參-獲取關聯值-替換關聯值
                parameter = read_json(case_data['summary'], case_data['parameter'], case_path)
                __relevance = readRelevance.get_relevance(data, parameter, __relevance)
                parameter = replaceRelevance.replace(parameter, __relevance)
                case_data['parameter'] = parameter
                logging.debug("請求參數處理結果:{}".format(parameter))

                # 獲取當前接口期望結果:獲取期望結果-獲取關聯值-替換關聯值
                expected_rs = read_json(case_data['summary'], case_data['check']['expected_result'], case_path)
                parameter['data'] = data
                __relevance = readRelevance.get_relevance(parameter, expected_rs, __relevance)
                expected_rs = replaceRelevance.replace(expected_rs, __relevance)
                case_data['check']['expected_result'] = expected_rs
                logging.debug("期望返回處理結果:{}".format(case_data))
                break
            else:
                time.sleep(1)
                logging.error("前置接口請求失敗!等待1秒后重試!")
        else:
            logging.info("前置接口請求失敗!嘗試三次失敗!")
            raise Exception("獲取前置接口關聯數據失敗!")
    else:
        # 處理當前接口入參:獲取入參-獲取關聯值-替換關聯值
        parameter = read_json(case_data['summary'], case_data['parameter'], case_path)
        parameter = replaceRelevance.replace(parameter, __relevance)
        case_data['parameter'] = parameter
        logging.debug("請求參數處理結果:{}".format(parameter))

        # 獲取當前接口期望結果:獲取期望結果-獲取關聯值-替換關聯值
        expected_rs = read_json(case_data['summary'], case_data['check']['expected_result'], case_path)
        __relevance = readRelevance.get_relevance(parameter, expected_rs, __relevance)
        expected_rs = replaceRelevance.replace(expected_rs, __relevance)
        case_data['check']['expected_result'] = expected_rs
        logging.debug("期望返回處理結果:{}".format(case_data))

    return test_info, case_data
initializePremise.py

unit.4、獲取關聯值配置:通過正則匹配${}檢索關聯值,然后獲取消息體中的對應值。

# -*- coding:utf-8 -*-
# @Time    : 2021/1/8
# @Author  : Leo Zhang
# @File    : readRelevance.py
# ****************************
import logging
import re

__relevance = ""


def get_value(data, value):
    """獲取數據中的值

    :param data:
    :param value:
    :return:
    """
    global __relevance
    if isinstance(data, dict):
        if value in data:
            __relevance = data[value]
        else:
            for key in data:
                __relevance = get_value(data[key], value)
    elif isinstance(data, list):
        for key in data:
            if isinstance(key, dict):
                __relevance = get_value(key, value)
                break
    return __relevance


def get_relevance(data, relevance_list, relevance=None):
    """獲取關聯鍵值對

    :param data:
    :param relevance_list:
    :param relevance:
    :return:
    """
    # 獲取關聯鍵列表
    relevance_list = re.findall(r"\${(.*?)}", str(relevance_list))
    relevance_list = list(set(relevance_list))
    logging.debug("獲取關聯鍵列表:\n%s" % relevance_list)
    # 判斷關聯鍵和源數據是否有值
    if (not data) or (not relevance_list):
        return relevance

    # 判斷是否存在其他關聯鍵對象
    if not relevance:
        relevance = dict()
    # 遍歷關聯鍵
    for each in relevance_list:
        if each in relevance:
            pass
            # # 考慮到一個關聯鍵,多個值
            # if isinstance(relevance[each], list):
            #     a = relevance[each]
            #     a.append(relevance_value)
            #     relevance[each] = a
            # else:
            #     a = relevance[each]
            #     b = list()
            #     b.append(a)
            #     b.append(relevance_value)
            #     relevance[each] = b
        else:
            # 從結果中提取關聯鍵的值
            relevance[each] = get_value(data, each)
    logging.debug("提取關聯鍵對象:\n%s" % relevance)
    return relevance
readRelevance.py

unit.5、替換關聯值:首先替換關聯值,然后替換函數助手生成數據,最后替換表達式結果。

# -*- coding:utf-8 -*-
# @Time    : 2020/12/09
# @Author  : Leo Zhang
# @File    : replaceRelevance.py
# ****************************
import re
from comm.utils.randomly import *

pattern_var = r"\${(.*?)}"
pattern_eval = r"\$Eval\((.*?)\)"
pattern_str = r'\$RandStr\(([0-9]*?)\)'
pattern_int = r'\$RandInt\(([0-9]*,[0-9]*?)\)'
pattern_choice = r"\$RandChoice\((.*?)\)"
pattern_float = r'\$RandFloat\(([0-9]*,[0-9]*,[0-9]*)\)'
pattern_phone = r'\$GenPhone\(\)'
pattern_guid = r'\$GenGuid\(\)'
pattern_wxid = r'\$GenWxid\(\)'
pattern_noid = r'\$GenNoid\((.*?)\)'
pattern_date = r'\$GenDate\((.*?)\)'
pattern_datetime = r'\$GenDatetime\((.*?)\)'


def replace_pattern(pattern, value):
    """替換正則表達式

    :param pattern: 匹配字符
    :param value: 匹配值
    :return:
    """
    patterns = pattern.split('(.*?)')
    return ''.join([patterns[0], value, patterns[-1]])


def replace_relevance(param, relevance=None):
    """替換變量關聯值

    :param param: 參數對象
    :param relevance: 關聯對象
    :return:
    """
    result = re.findall(pattern_var, str(param))
    if (not result) or (not relevance):
        pass
    else:
        for each in result:
            try:
                # 關聯參數多值替換
                # relevance_index = 0
                # if isinstance(relevance[each], list):
                #     try:
                #         param = re.sub(pattern, relevance[each][relevance_index], param, count=1)
                #         relevance_index += 1
                #     except IndexError:
                #         relevance_index = 0
                #         param = re.sub(pattern, relevance[each][relevance_index], param, count=1)
                #         relevance_index += 1
                value = relevance[each]
                pattern = re.compile(r'\${' + each + '}')
                try:
                    param = re.sub(pattern, value, param)
                except TypeError:
                    param = value
            except KeyError:
                raise KeyError('替換變量{0}失敗,未發現變量對應關聯值!\n關聯列表:{1}'.format(param, relevance))
                # pass
    return param


def replace_eval(param):
    """替換eval表達式結果

    :param param: 參數對象
    :return:
    """
    result = re.findall(pattern_eval, str(param))
    if not result:
        pass
    else:
        for each in result:
            try:
                if 'import' in each:
                    raise Exception('存在非法標識import')
                else:
                    value = str(eval(each))
                    param = re.sub(pattern_eval, value, param)
            except KeyError as e:
                raise Exception('獲取值[ % ]失敗!\n%'.format(param, e))
            except SyntaxError:
                pass
    return param


def replace_random(param):
    """替換隨機方法參數值

    :param param:
    :return:
    """
    int_list = re.findall(pattern_int, str(param))
    str_list = re.findall(pattern_str, str(param))
    choice_list = re.findall(pattern_choice, str(param))
    guid_list = re.findall(pattern_guid, str(param))
    noid_list = re.findall(pattern_noid, str(param))
    phone_list = re.findall(pattern_phone, str(param))
    wxid_list = re.findall(pattern_wxid, str(param))
    date_list = re.findall(pattern_date, str(param))
    datetime_list = re.findall(pattern_datetime, str(param))

    if len(str_list):
        for each in str_list:
            # pattern = re.compile(r'\$RandStr\(' + each + r'\)')
            # param = re.sub(pattern, str(random_str(each)), param, count=1)
            param = re.sub(pattern_str, str(random_str(each)), param, count=1)

    if len(int_list):
        for each in int_list:
            param = re.sub(pattern_int, str(random_int(each)), param, count=1)

    if len(choice_list):
        for each in choice_list:
            param = re.sub(pattern_choice, str(random_choice(each)), param, count=1)

    if len(date_list):
        for each in date_list:
            param = re.sub(pattern_date, str(generate_date(each)), param, count=1)

    if len(datetime_list):
        for each in datetime_list:
            param = re.sub(pattern_datetime, str(generate_datetime(each)), param, count=1)

    if len(noid_list):
        for each in noid_list:
            param = re.sub(pattern_noid, str(generate_noid(each)), param, count=1)

    if len(phone_list):
        for i in phone_list:
            param = re.sub(pattern_phone, str(generate_phone()), param, count=1)

    if len(guid_list):
        for i in guid_list:
            param = re.sub(pattern_guid, generate_guid(), param, count=1)

    if len(wxid_list):
        for i in wxid_list:
            param = re.sub(pattern_wxid, generate_wxid(), param, count=1)

    return param


def replace(param, relevance=None):
    """替換參數對應關聯數據

    :param param: 參數對象
    :param relevance: 關聯對象
    :return:
    """
    if not param:
        pass
    elif isinstance(param, dict):
        for key, value in param.items():
            if isinstance(value, dict):
                param[key] = replace(value, relevance)
            elif isinstance(value, list):
                for index, sub_value in enumerate(value):
                    param[key][index] = replace(sub_value, relevance)
            else:
                value = replace_relevance(value, relevance)
                value = replace_random(value)
                value = replace_eval(value)
                param[key] = value

    elif isinstance(param, list):
        for index, value in enumerate(param):
            param[index] = replace(value, relevance)

    else:
        param = replace_relevance(param, relevance)
        param = replace_random(param)
        param = replace_eval(param)

    return param


if __name__ == '__main__':
    print('替換變量並計算表達式:', replace('$Eval(${unitCode}*1000+1)', {'unitCode': 9876543210}))
    print('生成1-9之間的隨機數:', replace('$RandInt(1,9)'))
    print('生成10位隨機字符:', replace('$RandStr(10)'))
    print('從列表中隨機選擇:', replace('$RandChoice(a,b,c,d)'))
    print('生成一個偽手機號:', replace('$GenPhone()'))
    print('生成一個guid:', replace('$GenGuid()'))
    print('生成一個偽微信ID:', replace('$GenWxid()'))
    print('生成一個偽身份證:', replace('$GenNoid()'))
    print('生成一個18歲偽身份證:', replace("$GenNoid(y-18)"))
    print('生成下個月今天的日期:', replace("$GenDate(m+1)"))
    print('生成昨天此時的時間:', replace("$GenDatetime(d-1)"))
replaceRelevance.py

unit.6、接口結果校驗:包括不檢查、僅檢查接口狀態碼、對比實際與期望結果格式、完全對比校驗、正則方式校驗。

# -*- coding:utf-8 -*-
# @Time    : 2021/2/2
# @Author  : Leo Zhang
# @File    : checkResult.py
# ***************************
import re
import allure
import operator


def check_json(src_data, dst_data):
    """
    校驗的json
    :param src_data: 檢驗內容
    :param dst_data: 接口返回的數據
    :return:
    """
    if isinstance(src_data, dict):
        for key in src_data:
            if key not in dst_data:
                raise Exception("JSON格式校驗,關鍵字 %s 不在返回結果 %s 中!" % (key, dst_data))
            else:
                this_key = key
                if isinstance(src_data[this_key], dict) and isinstance(dst_data[this_key], dict):
                    check_json(src_data[this_key], dst_data[this_key])
                elif not isinstance(src_data[this_key], type(dst_data[this_key])):
                    raise Exception("JSON格式校驗,關鍵字 %s 返回結果 %s 與期望結果 %s 類型不符"
                                    % (this_key, src_data[this_key], dst_data[this_key]))
                else:
                    pass
    else:
        raise Exception("JSON校驗內容非dict格式:{}".format(src_data))


def check_result(case_data, code, data):
    """
    校驗測試結果
    :param case_data: 用例數據
    :param code: HTTP狀態
    :param data: 返回的接口json數據
    :return:
    """
    try:
        # 獲取用例檢查信息
        check_type = case_data['check']['check_type']
        expected_code = case_data['check']['expected_code']
        expected_result = case_data['check']['expected_result']
    except Exception as e:
        raise KeyError('獲取用例檢查信息失敗:{}'.format(e))
    
    if check_type == 'no_check':
        with allure.step("不校驗結果"):
            pass

    elif check_type == 'check_code':
        with allure.step("HTTP狀態碼校驗"):
            allure.attach(name="實際code", body=str(code))
            allure.attach(name="期望code", body=str(expected_code))
            allure.attach(name='實際data', body=str(data))
        if int(code) != expected_code:
            raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code))

    elif check_type == 'check_json':
        with allure.step("JSON格式校驗結果"):
            allure.attach(name="實際code", body=str(code))
            allure.attach(name="期望code", body=str(expected_code))
            allure.attach(name='實際data', body=str(data))
            allure.attach(name='期望data', body=str(expected_result))
        if int(code) == expected_code:
            if not data:
                data = "{}"
            check_json(expected_result, data)
        else:
            raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code))

    elif check_type == 'entirely_check':
        with allure.step("完全校驗結果"):
            allure.attach(name="實際code", body=str(code))
            allure.attach(name="期望code", body=str(expected_code))
            allure.attach(name='實際data', body=str(data))
            allure.attach(name='期望data', body=str(expected_result))
        if int(code) == expected_code:
            result = operator.eq(expected_result, data)
            if not result:
                raise Exception("完全校驗失敗! %s ! = %s" % (expected_result, data))
        else:
            raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code))

    elif check_type == 'regular_check':
        if int(code) == expected_code:
            try:
                result = ""
                if isinstance(expected_result, list):
                    for i in expected_result:
                        result = re.findall(i.replace("\"", "\""), str(data))
                        allure.attach('校驗完成結果\n', str(result))
                else:
                    result = re.findall(expected_result.replace("\"", "\'"), str(data))
                    with allure.step("正則匹配校驗結果"):
                        allure.attach(name="實際code", body=str(code))
                        allure.attach(name="期望code", body=str(expected_code))
                        allure.attach(name='實際data', body=str(data))
                        allure.attach(name='期望data', body=str(expected_result).replace("\'", "\""))
                        allure.attach(name=expected_result.replace("\"", "\'") + '校驗完成結果',
                                      body=str(result).replace("\'", "\""))
                if not result:
                    raise Exception("正則未校驗到內容! %s" % expected_result)
            except KeyError:
                raise Exception("正則校驗執行失敗! %s\n正則表達式為空時" % expected_result)
        else:
            raise Exception("http狀態碼錯誤!\n %s != %s" % (code, expected_code))

    else:
        raise Exception("無該校驗方式%s" % check_type)
checkResult.py

utils.1、函數助手:調用不同方法來生成相關測試數據,比如生成指定長度隨機字符、生成指定日期時間、生成唯一標識guid等。

# -*- coding:utf-8 -*-
# @Time    : 2020/12/10
# @Author  : Leo Zhang
# @File    : randomly.py
# *************************
import string
import random
import datetime
from dateutil.relativedelta import relativedelta


def random_str(str_len):
    """從a-zA-Z0-9生成制定數量的隨機字符

    :param str_len: 字符串長度
    :return:
    """
    try:
        str_len = int(str_len)
    except ValueError:
        raise Exception("調用隨機字符失敗,[ %s ]長度參數有誤!" % str_len)
    strings = ''.join(random.sample(string.hexdigits, +str_len))
    return strings


def random_int(scope):
    """獲取隨機整型數據

    :param scope: 數據范圍
    :return:
    """
    try:
        start_num, end_num = scope.split(",")
        start_num = int(start_num)
        end_num = int(end_num)
    except ValueError:
        raise Exception("調用隨機整數失敗,[ %s ]范圍參數有誤!" % str(scope))
    if start_num <= end_num:
        number = random.randint(start_num, end_num)
    else:
        number = random.randint(end_num, start_num)
    return number


def random_float(data):
    """獲取隨機浮點數據

    :param data: 數組
    :return:
    """
    try:
        start_num, end_num, accuracy = data.split(",")
        start_num = int(start_num)
        end_num = int(end_num)
        accuracy = int(accuracy)
    except ValueError:
        raise Exception("調用隨機浮點數失敗,[ %s ]范圍參數或精度有誤!" % data)

    if start_num <= end_num:
        number = random.uniform(start_num, end_num)
    else:
        number = random.uniform(end_num, start_num)
    number = round(number, accuracy)
    return number


def random_choice(data):
    """獲取數組隨機值

    :param data: 數組
    :return:
    """
    _list = data.split(",")
    each = random.choice(_list)
    return each


def get_date_mark(now, mark, num):
    if 'y' == mark:
        return now + relativedelta(years=num)
    elif 'm' == mark:
        return now + relativedelta(months=num)
    elif 'd' == mark:
        return now + relativedelta(days=num)
    elif 'h' == mark:
        return now + relativedelta(hours=num)
    elif 'M' == mark:
        return now + relativedelta(minutes=num)
    elif 's' == mark:
        return now + relativedelta(seconds=num)
    else:
        raise Exception("日期字段標識[ %s ]錯誤, 請使用[年y,月m,日d,時h,分M,秒s]標識!" % mark)


def generate_date(expr=''):
    """生成日期對象(不含時分秒)

    :param expr: 日期表達式,如"d-1"代表日期減1
    :return:
    """
    today = datetime.date.today()
    if expr:
        try:
            mark = expr[:1]
            num = int(expr[1:])
        except (TypeError, NameError):
            raise Exception("調用生成日期失敗,日期表達式[ %s ]有誤!" % expr)
        return get_date_mark(today, mark, num)
    else:
        return today


def generate_datetime(expr=''):
    """生成日期時間對象(含時分秒毫秒)

    :param expr: 日期表達式,如"d-1"代表日期減1
    :return:
    """
    now = datetime.datetime.now()
    if expr:
        try:
            mark = expr[:1]
            num = int(expr[1:])
        except (TypeError, NameError):
            raise Exception("調用生成日期失敗,日期表達式[ %s ]有誤!" % expr)
        return get_date_mark(now, mark, num)
    else:
        return now


def generate_timestamp(expr=''):
    """生成時間戳(13位)

    :param expr: 日期表達式,如"d-1"代表日期減1
    :return:
    """
    datetime_obj = generate_datetime(expr)
    return int(datetime.datetime.timestamp(datetime_obj)) * 1000


def generate_guid():
    """基於MAC地址+時間戳+隨機數來生成GUID

    :param:
    :return:
    """
    import uuid
    return str(uuid.uuid1()).upper()


def generate_wxid():
    """基於AUTO標識+26位英文字母大小寫+數字生成偽微信ID

    :param:
    :return:
    """
    return 'AUTO' + ''.join(random.sample(string.ascii_letters + string.digits, 24))


def generate_noid(expr=''):
    """基於6位隨機數字+出生日期+4位隨機數生成偽身份證

    :param expr: 日期表達式,如"d-1"代表日期減1
    :return:
    """
    birthday = generate_date(expr)
    birthday = str(birthday).replace('-', '')
    return int(str(random.randint(100000, 999999)) + birthday + str(random.randint(1000, 9999)))


def generate_phone():
    """基於三大運營商號段+隨機數生成偽手機號

    :param:
    :return:
    """
    ctcc = [133,153,173,177,180,181,189,191,193,199]
    cucc = [130,131,132,155,156,166,175,176,185,186,166]
    cmcc = [134,135,136,137,138,139,147,150,151,152,157,158,159,172,178,182,183,184,187,188,198]
    begin = 10 ** 7
    end = 10 ** 8 - 1
    prefix = random.choice(ctcc+cucc+cmcc)
    return str(prefix) + str(random.randint(begin, end))


if __name__ == '__main__':
    # 簡單隨機數據
    print(random_str(16))
    print(random_int("100,200"))
    print(random_float("200,100,5"))
    print(random_choice("aaa,bbb,ccc"))

    # 生成日期數據
    print(generate_date())
    print(generate_datetime())
    print(generate_date('m+1'))
    print(generate_datetime('d+1'))
    print(generate_timestamp('s+100'))
    print(generate_noid('y-18'))

    # 生成常用數據
    print(generate_guid())
    print(generate_wxid())
    print(generate_noid())
    print(generate_phone())
randomly.py

 


實戰演示

1、首先環境准備:Python  + Allure (這里不做詳細說明,請參考我Pytest分類博文)

接着下載項目:https://github.com/Leozhanggg/ApiTesting (方便的話給個星,不要白嫖呀,哈哈。。。)

然后加載依賴:pip install -r requirements.txt   (或者使用Pycharm打開,會自動彈出提示安裝)

 

2、使用Charles工具抓取接口數據包,並且導出選擇JSON Session File (.chlsj) 格式 (工具自己百度下載吧)

 

3、新建一個項目MyTest目錄和一個data目錄,把抓取的接口數據包放置進來,然后修改runConfig.yml項目名為MyTest

 

4、直接開始運行,然后你就會發現項目目錄多了很多文件,測試已經完成。。。沒錯,就是這么簡單,你還可以查看allure報告。

 

 


談談我自己

以往我使用過多種基於Python的自動化測試框架,特別是robotframework,簡單易上手,對於培養普通測試工程師比較迅速,但是優點同時也是缺點,由於RF自身局限性,會讓簡單的語法變得復雜化,

如果你不做分層處理,可能會出現一條簡單的測試用例編需要寫上百行,后期維護更是非常麻煩,

我記得有一次檢查測試工程師的自動化測試用例時,發現竟然有兩百多行,對於RF這種表格語法兩百多行你知道閱讀是多么的痛苦嘛。。。

當然這也是源於我們的項目性質,由於大數據業務,接口只是一小部分,而數據的校驗才是大頭,

並且涉及到多類數據庫,比如redis、mysql、es、hbase、solr等,而且有的接口會同時保存到多個表然后同步到多個數據庫,校驗點數不過來。。。

就這樣前前后后我帶領着幾個測試工程師改了幾版,雖然最后大大的減少了測試代碼,但是依然還是很多,

並且運行時長很難解決。所以從去年開始使用pytest測試框架,當然這也是我首次接觸pytest,第一個項目也改了幾版,但是由於純pytest編寫,

所有的東西都在一塊,改起來也比較簡單,最終的效果當然是質的提升,首先時代碼方面可以輕松的做分層處理,不會受到RF之類的框架限制,

而在執行時長方面可以采用pytest自身的多線程模塊,大大減少執行時長,同時大大提高了框架的擴展性。

 

而本框架源於https://github.com/wangxiaoxi3/API_service項目,加上自己實際項目實施經驗重構而來,保留了核心功能,增加了自己對接口自動化測試的理解。

特別對於關聯值處理方面,不在需要手動標記,而采用自動檢索方式,另外關於前置接口處理,也不在需要手動編寫,只需要指定前置接口相對路徑即可,並且多個用例可以嵌套。

另外在請求地址和消息頭上,不在需要手動配置,將在接口數據解析中自動篩選消息頭和請求地址,然后寫入接口公共配置中,除非有變動,否則無需做任何配置。

但是目前開發的第一版並沒有加入數據庫校驗,僅為了單純接口的自動化測試,后期將考慮加入數據庫校驗模塊。以下為本人實際項目數據庫校驗示例:

 

※ 如果有任何疑問可以留言,當然如果覺得寫得不錯可以收藏、推薦一下,另外github幫忙給個星!!!

 

作者:Leozhanggg

出處:https://www.cnblogs.com/leozhanggg/p/14373878.html

源碼:https://github.com/Leozhanggg/ApiTesting

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM