python---atp自動化測試框架


代碼結構:

bin---啟動文件

cases---存放測試用例

conf ---配置文件

core --- 核心代碼

logs --- 日志

report --- 存放測試結果

readme.txt --- 使用說明

requirements.txt-----保存所有安裝的第三方模塊,方便代碼移植到其他電腦

寫代碼思路:

1、讀excel取出所有用例

2、解析用例

      1) 解析入參: a =1,b=1,phone=<phone>  把入參變成字典 {'a'=1,'b':2,'phone':15326687945}
          url
         請求方式:post

      2) 發請求,獲取到返回結果 MyRequest

      3) 解析返回   a、解析檢查點   'k=1,age=18,name!=abc'     [['k','=','1'],['age','=','18'],['name','!=','abc']]
                   b、獲取實際結果做對比,檢查用例通過還是失敗

3、返回結果寫入到excel中

4、生成報告,發郵件

atp測試缺點:不能測試有業務流程(即接口間有依賴)的情況

readme.txt

寫用例的支持參數化,支持以下參數化:
<phone>   自動產生手機號
<id_card>  身份證號
<email>    郵箱
<cur_time>  當前時間戳

requirements.txt

xlrd
requests
jsonpath
xlutils
nnlog
yagmail

case_operation.py

import xlrd
from core.my_requests import MyRequest

'''
讀exce獲取用例
'''
def get_case(path):
    all_case = []  #定義一個二維數組,保存所有的測試用例
    book = xlrd.open_workbook(path)
    sheet = book.sheet_by_index(0)
    for i in range(1,sheet.nrows):    #第一行表頭不要
        row_data = sheet.row_values(i)[4:8]  #切片(顧頭不顧尾),取4-7列:請求url、請求方式、請求數據、預期結果
        all_case.append(row_data)
    return all_case   #[[url,get,data,check],[url,get,data,check]]

'''
發request請求
'''
def send_request(url,method,data,headers=None):
    req = MyRequest(url,data,headers=headers)   #實例化一個MyRequest實例
    if method.upper()=="POST":    #測試用例中的請求方式可能是大寫或小寫
        res = req.post()
    elif method.upper() =='GET':
        res = req.get()
    else:
        res = {"data":"暫時不支持該方法!"}
    return res['data']
View Code

my_requests.py    封裝請求模塊類  處理了異常,打印了日志

import requests
import nnlog
import os
from conf.setting import LOG_PATH
class MyRequest:
    log_file_name  = os.path.join(LOG_PATH,'MyRequest.log') #日志存放在cases文件夾下
    time_out = 10 #請求超時時間
    def __init__(self,url,data=None,headers=None,file=None):
        self.url = url
        self.data = data
        self.headers = headers
        self.file = file
    def post(self):
        try:
            req = requests.post(self.url,data=self.data,headers=self.headers,
                                files=self.file,timeout=self.time_out)
        except Exception as e:
            res = {"status":0,"data":e.args}  #0代表請求失敗
        else:
            try:
               res = {"status":1,"data":req.json()} #1代表返回的json
            except Exception as e:
                res = {"staus":2,"data":req.text} #2代表返回不是json
        log_str = 'url: %s 請求方式:post  data:%s ,返回數據:%s'%(self.url,self.data,res)
        self.write_log(log_str)
        return res

    def get(self):
        try:
            req = requests.get(self.url,params=self.data,headers=self.headers,timeout=self.time_out)
        except Exception as e:
            res = {"status":0,"data":e.args}  #0代表請求失敗
        else:
            try:
               res = {"status":1,"data":req.json()} #1代表返回的json

            except Exception as e:
                res = {"staus":2,"data":req.text} #2代表返回不是json
        log_str = 'url: %s get請求 data:%s ,返回數據:%s'%(self.url,self.data,res)
        self.write_log(log_str)
        return res

    @classmethod   #類方法
    def write_log(cls,content):
        log = nnlog.Logger(cls.log_file_name)
        log.debug(content)
View Code

parse_param.py

import random
import string
import time

#這個類是用來解析請求參數的
class ParseParam:

    func_map = ['phone','email','id_card','cur_time']  #映射函數的

    def __init__(self,param):
        self.param = param
        self.parse()   #實例化時就做映射
    def phone(self):
        phone_starts = ['134','181','138','177','150','132','188','186','189','130','170','153','155']  #手機號開頭3位固定
        start = random.choice(phone_starts)
        end = str(random.randint(0,99999999))  #產生手機號后8位
        res = start+ end.zfill(8)
        return res
    def email(self):
        email_end=['163.com','qq.com','126.com','sina.com']
        end = random.choice(email_end)
        start_str='ATP_test_'
        email_start = ''.join(random.sample(string.ascii_letters+string.digits,6)) #從所有的大寫字母、小寫字母、數字中取6位 sample取到的是list,用join轉成字典
        return start_str+email_start+'@'+end
    def id_card(self):
        '''這個產生身份證號的'''
        return 410881199011212121
    def cur_time(self):
        return int(time.time())  #time.time()精確的毫秒 int()取整
    def order_id(self):
        '''從數據庫里面獲取'''
        pass
    def session_id(self):
        '''從redis里面獲取的'''
        pass
    def parse(self):
        for func in self.func_map:
            temp = str(getattr(self,func)()) #手機號
            self.param = self.param.replace('<%s>'%func,temp)
    def strToDict(self):
        #這個函數是把請求參數轉成字典的
        data ={}
        pl = self.param.split(',')
        for p in pl:
            temp = p.split('=')
            if len(temp)>1:   #用例中不一定key value都寫完整了,最好先判斷是否都存在,否則會報錯
                key,value = temp
                data[key] = value
        return data



if __name__ == '__main__':
    param = 'username=niuhanyang' \
            ',phone=<phone>,email=<email>' \
            ',id_card=<id_card>,start_time=' \
            '<cur_time>'
    p = ParseParam(param)
    data = p.strToDict()
    print(data)

    print(p.phone())
    res = getattr(p,'phone') #getattr()是一個內置函數,第一個參數是一個對象,第二個參數是一個字符串(方法的名字) 獲取一個對象里面的屬性(方法、變量)
    print(res())  #返回的res是一個函數名,加()就可以調用了

    import os,requests
    res = hasattr(requests,'get')#第一個參數-模塊名 第二個參數-字符串(方法的名字)  判斷某個模塊、類下面有沒有某個方法或者變量
    print(res)  #True
View Code

parse_response.py

import jsonpath

class ResponseParse:
    seqs = ['!=', '>=', '<=', '=', '<', '>', 'in', 'notin']
    #定義支持的運算符
    def __init__(self,response,check):   #response是請求返回實際值  check是用例中的期望值
        self.response = response
        self.check = check

    def format_check(self):
        #格式化檢查信息,分別列出key 運算符 實際結果
        #會返回 [['error_code','=','0'],['name','!=','xxx']]
        format_list = []
        check_list = self.check.split(',')
        for s in check_list:
            for seq in self.seqs:
                if seq in s:
                    if len(s.split(seq))>1:
                        key, value = s.split(seq)
                        temp = [key, seq, value]
                        format_list.append(temp)
                        break
        return format_list

    def get_real_value(self,key):
        #從字典里面獲取key對應的value
        res = jsonpath.jsonpath(self.response,'$..%s'%key) #$..%s這個是jsonpath這個模塊的用法
        if res:
            return res[0]
        return '找不到該key【%s】'%key

    def operation_check(self,real,seq,hope):
        #根據運算符判斷結果
        msg = "判斷信息:%s %s %s "%(real,seq,hope)
        real = str(real)#注意:為了保持類型一致  返回值從字典中取出可能是int類型,先轉成str  hope從字符串中取出來的,一定是字符串
        if seq == '=':
            status = real == hope
        elif seq == '!=':
            status = real != hope
        elif seq =='in':
            status = real in hope
        elif seq == 'notin':
            status = real not in hope
        else:
            status,msg = self.num_check(real,seq,hope)
        return status,msg

    def num_check(self,real,seq,hope):
        #判斷數值類型的  > <  >=  <=
        msg = "判斷信息:%s %s %s "%(real,seq,hope)
        try:
            real = float(real)
            hope = float(hope)
        except Exception as e:
            msg = "比較時出錯,大小比較只能是數字類型!" \
                  "%s %s %s"%(real,seq,hope)
            status = False
        else:
            if seq == '>':
                status = real > hope
            elif seq == '<':
                status = real < hope
            elif seq == '<=':
                status = real <= hope
            else:
                status = real >= hope
        return status,msg

    def check_res(self):
        #校驗所有的檢查點
        check_list = self.format_check()   #format_check()返回的是一個二維數組,循環取出每條出來判斷
        # [['error_code', '=', '0'], ['name', '!=', 'xxx']]
        all_msg=''
        for check in check_list:#循環所有的檢查點
            key,seq,hope = check
            real = self.get_real_value(key)  #在response里找到key對應的值
            status,msg = self.operation_check(real,seq,hope)
            all_msg = all_msg+msg+'\n' #累加提示信息
            if status:
                pass
            else:
                return '失敗',all_msg
        return '通過',all_msg    #所有的check點都pass才返回pass,有一個失敗就返回fail
View Code

tool.py

import xlrd
from xlutils.copy import copy
import os
import datetime
from conf import setting
import yagmail

def make_today_dir():
    #創建當天的文件夾,返回絕對路徑
    today = str(datetime.date.today())
    #c:/xxx/xxx/atp/report/2018-11-24/測試用例.xls
    abs_path = os.path.join(setting.REPORT_PATH,today)
    #拼成當天的絕對路徑
    if os.path.exists(abs_path):
        pass
    else:
        os.mkdir(abs_path)
    return abs_path

def write_res(case_path,case_res):
    #c:/xxx/xxx/atp/cases/測試用例.xls
    #[ ['{"xdfsdf}','通過'],['{"xdfsdf}','失敗'] ]
    book = xlrd.open_workbook(case_path)
    new_book = copy(book)
    sheet = new_book.get_sheet(0)   #xlutils 里不能用 sheet_by_index()方法
    for row,res in enumerate(case_res,1):    #1 表示row要從1開始取
        response,status = res
        # 寫第8列和第9列
        sheet.write(row,8,response)
        sheet.write(row,9,status)

    cur_date_dir = make_today_dir()#創建當前文件夾,並且返回絕對路徑
    file_name = os.path.split(case_path)[-1] #只獲取到filename
    cur_time = datetime.datetime.today().strftime('%H%M%S') #獲取到當天時分秒
    new_file_name = cur_time+'_'+file_name #165530_測試用例.xls
    real_path = os.path.join(cur_date_dir,new_file_name)#拼路徑 測試報告放在report目錄下
    new_book.save(real_path)
    return real_path

def send_mail(content,file_path=None):
    #發郵件,傳入郵件正文和附件
    m = yagmail.SMTP(**setting.MAIL_INFO,)
    subject = '接口測試報告_%s'%str(datetime.datetime.today())
    m.send(subject=subject,to=setting.TO,contents=content,attachments=file_path)
View Code

setting.py

import os

#常量定義時大寫
BAE_PATH  = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #atp的目錄
LOG_PATH = os.path.join(BAE_PATH,'logs') #log目錄
CASE_PATH = os.path.join(BAE_PATH,'cases') #case目錄
REPORT_PATH = os.path.join(BAE_PATH,'report') #report目錄

#發郵件相關信息
MAIL_INFO = {                 #定義為字典,調用時用**MAIL_INFO即可
    'user':'15xxx3@qq.com',
    'password':'xxxmqrdgjcd',
    'host':'smtp.qq.com',  #163郵箱  smtp.163.com
    'smtp_ssl':True,    #發件箱是qq郵箱的話,為True
}

TO = ['xxxx@qq.com','4xxx7026@qq.com']


HOST = {
    'QA':'http://api.nnzhp.cn',  #測試環境
    'DEV':'http://dev.nnzhp.cn', #開發環境
    'PRE':'http://dev.nnzhp.cn'  #預生產環境
}

default_host = HOST.get('QA')  #默認用測試環境
View Code

start.py---單線程 

#首先將該project目錄加入到python環境變量
import os,sys
BAE_PATH  = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #atp的目錄
sys.path.insert(0,BAE_PATH)

from conf.setting import CASE_PATH,default_host
from core import case_operation,parse_param,parse_response
from core import tools
import glob
class RunCase:
    content = '''
    各位好!
        本次測試結果:總共運行%s條用例,通過%s條,失敗%s條。詳細信息見附件。
    '''
    def get_excel(self):
        all_excel_case_count = 0  #存放所有測試用例的次數
        all_excel_success_count = 0  #存放所有測試用例成功的次數
        report_path_list = []  #存放所有附件地址
        #s='/Users/nhy/test*.xls'
        for excel in glob.glob(os.path.join(CASE_PATH,'test*.xls')):   #glob模塊過濾--只要以test開頭,.xls結尾的文件
            cases = case_operation.get_case(excel)#調用讀取excel的函數
            results = self.send_requests(cases) #發送請求,並校驗結果  返回一個二維數組[[real_res,status],[real_res,status],.....]
            report_file_path = tools.write_res(excel,results)#寫入結果,返回的是測試結果的路徑
            report_path_list.append(report_file_path)
            all_count = len(cases) #本次循環的excel中總共多條用例
            all_excel_case_count += all_count
            all_excel_success_count += self.success_count
        all_excel_fail_count = all_excel_case_count - all_excel_success_count
        content = self.content % (all_excel_case_count, all_excel_success_count, all_excel_fail_count)
        tools.send_mail(content,report_path_list)

    def send_requests(self,cases):   #發送多個請求
           #[[url,get,data,check],[url,get,data,check]]
        self.success_count = 0  #類變量,保存成功用例條數
        results = []
        for case in cases:
            url,method,param,check = case #獲取到每條用例的參數
            p = parse_param.ParseParam(param) #解析請求參數  實例化一個ParseParam實例 ,實例化的過程中就將里面的<phone>,<email>等替換了
            data = p.strToDict()#請求參數轉成字典
            url = default_host + url   #用例中的url只有接口名,沒有IP,IP可以在setting文件中根據需要配置測試/開發/預生產環境
            response = case_operation.send_request(url,method,data)#發請求 send_request()發送單個請求,返回結果是一個string
            #下面這2行代碼是判斷用例執行是否通過的
            p2 = parse_response.ResponseParse(response,check)  #實例化一個ResponseParse實例
            status, msg = p2.check_res()#調用寫好的校驗結果方法,
            real_res = str(response)+'\n'+msg #是把校驗的信息和返回的json拼到一起
            results.append([real_res,status]) #這里面的小list是每一個用例運行的結果
            if status == '通過':
                self.success_count += 1 #統計成功的次數
        return results #返回運行的結果

    def main(self):
        print('開始測試'.center(50,'*'))
        self.get_excel()
        print('測試結束'.center(50,'*'))

if __name__ == '__main__':
    run = RunCase()
    run.main()
View Code

 start.py---多線程 

#首先將該project目錄加入到python環境變量
import os,sys
BAE_PATH  = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #atp的目錄
sys.path.insert(0,BAE_PATH)

from conf.setting import CASE_PATH,default_host
from core import case_operation,parse_param,parse_response
from core import tools
import glob
import threading

class RunCase:
    content = '''
    各位好!
        本次測試結果:總共運行%s條用例,通過%s條,失敗%s條。詳細信息見附件。
    '''
    def thread_run(self):
        self.all_excel_case_count = 0  #存放所有測試用例的次數
        self.all_excel_success_count = 0  #存放所有測試用例成功的次數
        report_path_list = []  #存放所有附件地址

        def run(excel):
            # excel='/Users/nhy/test*.xls'
            cases = case_operation.get_case(excel)  # 調用讀取excel的函數
            results = self.send_requests(cases)  # 發送請求,並校驗結果  返回一個二維數組[[real_res,status],[real_res,status],.....]
            report_file_path = tools.write_res(excel, results)  # 寫入結果,返回的是測試結果的路徑
            report_path_list.append(report_file_path)
            all_count = len(cases)  # 本次循環的excel中總共多條用例
            self.all_excel_case_count += all_count
            # self.all_excel_success_count += self.success_count

        for excel in glob.glob(os.path.join(CASE_PATH,'test*.xls')):   #glob模塊過濾--只要以test開頭,.xls結尾的文件
            t = threading.Thread(target=run,args=(excel,))
            t.start()

        while threading.active_count() != 1:   #主線程等待子線程都運行完
                pass

        all_excel_fail_count = self.all_excel_case_count - self.all_excel_success_count
        content = self.content % (self.all_excel_case_count, self.all_excel_success_count, all_excel_fail_count)
        tools.send_mail(content,report_path_list)

    def send_requests(self,cases):   #發送多個請求
           #[[url,get,data,check],[url,get,data,check]]
        success_count = 0  #類變量,保存成功用例條數
        results = []
        for case in cases:
            url,method,param,check = case #獲取到每條用例的參數
            p = parse_param.ParseParam(param) #解析請求參數  實例化一個ParseParam實例 ,實例化的過程中就將里面的<phone>,<email>等替換了
            data = p.strToDict()#請求參數轉成字典
            url = default_host + url   #用例中的url只有接口名,沒有IP,IP可以在setting文件中根據需要配置測試/開發/預生產環境
            response = case_operation.send_request(url,method,data)#發請求 send_request()發送單個請求,返回結果是一個string
            #下面這2行代碼是判斷用例執行是否通過的
            p2 = parse_response.ResponseParse(response,check)  #實例化一個ResponseParse實例
            status, msg = p2.check_res()#調用寫好的校驗結果方法,
            real_res = str(response)+'\n'+msg #是把校驗的信息和返回的json拼到一起
            results.append([real_res,status]) #這里面的小list是每一個用例運行的結果
            if status == '通過':
                success_count += 1 #統計成功的次數
        self.all_excel_success_count += success_count
        return results #返回運行的結果

    def main(self):
        print('開始測試'.center(50,'*'))
        self.get_excel()
        print('測試結束'.center(50,'*'))

if __name__ == '__main__':
    run = RunCase()
    run.thread_run()
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM