Python接口自動化測試框架
在自動化的測試體系中,包含了UI自動化測試和接口自動化測試,UI自動化實現的前提是軟件版本進入穩定期,UI界面穩定、變動少,相比較之下接口自動化,接口受外界因素的影響較少,維護成本低,能夠在最短時間發現問題。
一、淺談接口測試
1、什么是接口測試:
API測試又稱為接口測試,主要用於檢測外部系統與系統之間以及內部各個子系統之間的交互點,是對系統接口功能進行測試的一種手段,也是集成測試的一部分,通過直接控制被測應用的接口(API)來確定是否在功能、可靠性、性能和安全方面達到預期的軟件測試活動。
2、如何測試接口:
- 檢查接口返回的數據是否與預期結果一致。
- 檢查接口的容錯性,假如傳遞數據的類型錯誤時是否可以處理。
- 接口參數的邊界值。例如,傳遞的參數足夠大或為負數時,接口是否可以正常處理。
- 接口的性能,http請求接口大多與后端代碼邏輯、執行的SQL語句性能、算法等相關。
- 接口的安全性,外部調用的接口尤為重要。
3、接口測試的意義、目的:
接口測試的核心意義、目的在於:以保證系統的正確和穩定為核心,以持續集成為手段,提高測試效率,提升用戶體驗,降低產品研發成本。
二、定義框架目錄分層
接口自動化框架的沒有統一標准,可以根據實際需要自定義,以滿足功能測試目標要求為目的。一個基本的接口自動化測試框架需要滿足的功能:測試用例管理、各種配置信息管理、數據庫操作、日志打印、報告輸出、郵件發送
""" API_Autotest/ |-- API_Case/ #測試用例 | | |-- PreviewRelease_01_RegisterLogin.py #預發布環境注冊登錄接口case | | |-- PreviewRelease_02_SubmitCredit.py #預發布環境主流程接口case | | |-- PreviewRelease_03_MobilePhonePWD.py #預發布環境.......接口case | | |-- Test_01_RegisterLogin.py #測試環境注冊登錄接口case | | |-- Test_02_SubmitCredit.py #測試環境主流程接口case | | |-- Test_03_LoanRepayment.py #測試環境......接口case | | | |-- data/ ###配置信息 | |-- conf_dict.py #賬號密碼等配置信息 | |-- custom_variable.py #向接口請求的請求的參數變量、keys、請求頭 | |-- export_url.py #接口url | |-- request_dict.py #向接口請求的請求參數 | |-- logic/ ###主要邏輯 | |-- export_logic.py #接口實現主邏輯 | |-- database.py #數據庫操作類 | |-- log_print.py #日志打印類 | |-- public_class.py #公用函數類 | |-- send_email.py #發送郵件類 | |-- log/ #日志 | |-- Case--201804182020_log #根據運行日期保存操作日志 | |-- Case--201804182022_log | |-- report/ #測試報告 | |-- report--201804181729.html #根據運行日期保存測試報告日志 | |-- report--201804181731.html | |-- readme.md #readme | |-- manage.py #接口case運行管理類 """
三、知識技能儲備
蘿卜青菜各有所愛,每個人心中的接口自動化測試框架也各不相同,想實現一個基礎功能完備的接口測試框架需要的Python知識如下:
1、Python基礎知識
列舉了一些需要掌握的基礎知識
- 基本語法、循環
- 列表
- 元組、字典
- 字符串、字符編碼與轉碼
- 文件操作
- 函數、無參/有參參數、全局變量/局部變量
- 函數非固定參數,返回值(return)
- 類的實例化過程、特征、共有屬性和私有屬性
- 類的繼承(新式類/經典類)、多態
- 類的裝飾器方法、特殊成員方法
- 更多基礎知識......
2、主要依賴的庫
下面介紹的庫都是數據庫操作、日志打印、報告輸出、郵件發送功能實現所依賴的庫:
a、數據庫操作
數據庫操作主要pymysql庫,下面為代碼示例:
import pymysql import datetime,time import os from data.conf_dict import ConfData from logic.log_print import LOG from logic.public_class import Public class ConnectDatabase(): """ 連接數據庫類 """ def __init__(self): self.Log = LOG() self.conf_data = ConfData() # 連接數據庫110 self.connection_110 = pymysql.connect(host=self.conf_data.get_conf_data("database_110", "host"), port=self.conf_data.get_conf_data("database_110", "port"), user=self.conf_data.get_conf_data("database_110", "user"), password=self.conf_data.get_conf_data("database_110", "password"), db=self.conf_data.get_conf_data("database_110", "db"), charset='utf8', # 以字典形式展示所查詢數據 cursorclass=pymysql.cursors.DictCursor) # 保存錯誤日志的文件名稱 self.log_name = Public.get_new_file(path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "log")) def select_sql(self, **kwargs): """ 根據傳入參數執行數據庫查詢操作 :param args: :param kwargs: database:(database_110)選擇數據庫、table:表名、condition:where條件 :return: """ database_name = kwargs.get("database") field_name = kwargs.get("field", "*") table_name = kwargs.get("table") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "SELECT %s FROM %s WHERE %s;" data = (field_name, table_name, where_condition) cursor.execute(sql % data) self.connection_110.commit() result = cursor.fetchone() return result except Exception as e: self.Log.log_warning("database", self.log_name, "select_error:%s" % e) def select_sql_all(self, *args, **kwargs): """ 根據傳入參數執行數據庫查詢操作 :param args: :param kwargs: database:(database_110)選擇數據庫、table:表名、condition:where條件 :return: """ database_name = kwargs.get("database") field_name = kwargs.get("field", "*") table_name = kwargs.get("table") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "SELECT %s FROM %s WHERE %s;" data = (field_name, table_name, where_condition) print((sql % data)) cursor.execute(sql % data) self.connection_110.commit() result = cursor.fetchall() return result except Exception as e: self.Log.log_warning("database", self.log_name, "select_error:%s" % e) def update_sql(self, *args, **kwargs): """ 根據傳入參數執行數據庫更新操作 :param args: :param kwargs: database:(database_110)選擇數據庫、table:表名、set:更新的字段和值、condition:where條件 :return: """ database_name = kwargs.get("database") table_name = kwargs.get("table") set_value = kwargs.get("set") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "UPDATE %s SET %s WHERE %s;" data = (table_name, set_value, where_condition) # print(sql%data) cursor.execute(sql % data) self.connection_110.commit() return cursor.rowcount cursor.close() except Exception as e: self.Log.log_warning("database", self.log_name, "update_error:%s" % e) self.connection_110.rollback() def delete_sql(self, *args, **kwargs): """ 根據傳入參數執行數據庫刪除操作 :param args: :param kwargs: database:(database_110)選擇數據庫、table:表名、condition:where條件 :return: """ database_name = kwargs.get("database") table_name = kwargs.get("table") where_condition = kwargs.get("condition") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: sql = "DELETE from %s where %s;" data = (table_name, where_condition) cursor.execute(sql % data) self.connection_110.commit() return cursor.rowcount except Exception as e: self.Log.log_warning("database", self.log_name, "delete_error:%s" % e) self.connection_110.rollback() def insert_sql(self, *args, **kwargs): """ 根據傳入參數執行數據庫插入操作 :param args: :param kwargs: database:(database_110)選擇數據庫、sql:需要插入的sql語句 :return: """ database_name = kwargs.get("database") insert_sql = kwargs.get("sql") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: cursor.execute(insert_sql) self.connection_110.commit() return cursor.rowcount except Exception as e: self.Log.log_warning("database", self.log_name, "insert_error:%s" % e) self.connection_110.rollback() def mysql_function(self, *args, **kwargs): """ 根據傳入參數執行數據庫函數操作 :param args: :param kwargs: database:(database_110)選擇數據庫、function_name:函數名稱,data_id:數據ID,phone:電話 :return: """ database_name = kwargs.get("database") function_name = kwargs.get("function_name") data_id = kwargs.get("data_id") phone = kwargs.get("phone") product_id = kwargs.get("product_id") if database_name == "database_110": try: with self.connection_110.cursor() as cursor: cursor.callproc(function_name,args=(data_id,phone,product_id,)) self.connection_110.commit() except Exception as e: self.Log.log_warning("database", self.log_name, "mysql_function:%s" % e) self.connection_110.rollback() if __name__ == "__main__": b = ConnectDatabase() data_id = Public.create_short_id()
PS:pymsql庫操作mysql數據庫增、刪、查、改、調用函數
b、日志打印
日志打印依賴logging庫,下面為代碼示例:
import logging import os import sys import datetime log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log") sys.path.append(log_path) class LOG(object): """ 日志打印類 """ def __init__(self): self.now_time = datetime.datetime.now().strftime("%Y%m%d%H%M") def log_info(self, *args): """ 根據傳入參數打印普通日志 :param arg[0]log的功能模塊名,arg[1] 保存log的文件名,arg[2]要打印的日志內容 :return 返回logger 對象 """ # 創建一個logger對象 logger = logging.getLogger(args[0]) logger.setLevel(logging.DEBUG) # 創建一個向屏幕輸入的handler對象 ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) # 創建一個像文件輸入的handler對象 log_file = os.path.join(log_path, "%s--%s_log" % (args[1], self.now_time)) fh = logging.FileHandler(log_file, mode="a+", encoding="utf-8") fh.setLevel(logging.DEBUG) # 設置log輸入格式 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) fh.setFormatter(formatter) # logger,添加handler對象 logger.addHandler(ch) logger.addHandler(fh) logger.info(args[2]) # 在記錄日志之后移除句柄, 不然會重復打印日志 logger.removeHandler(fh) logger.removeHandler(ch) return logger @staticmethod def log_warning(*args): """ 根據傳入參數錯誤日志 :param arg[0]log的功能模塊名,arg[1]文件名 arg[2] 需要打印的內容 :return 返回logger 對象 """ # 創建一個logger對象 logger = logging.getLogger(args[0]) logger.setLevel(logging.DEBUG) # 創建一個向屏幕輸入的handler對象 ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) # 創建一個像文件輸入的handler對象 log_file = os.path.join(log_path, args[1]) fh = logging.FileHandler(log_file, mode="a+", encoding="utf-8") fh.setLevel(logging.DEBUG) # 設置log輸入格式 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) fh.setFormatter(formatter) # logger,添加handler對象 logger.addHandler(ch) logger.addHandler(fh) logger.warning(args[2]) # 在記錄日志之后移除句柄, 不然會重復打印日志 logger.removeHandler(fh) logger.removeHandler(ch) print("aaa") return logger # @staticmethod def log_debug(self,message): """ 根據傳入參數錯誤日志 :param arg[0]log的功能模塊名,arg[1]文件名 arg[2] 需要打印的內容 :return 返回logger 對象 """ # 創建Logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # 創建Handler # 終端Handler consoleHandler = logging.StreamHandler(sys.stdout) consoleHandler.setLevel(logging.DEBUG) # 文件Handler fileHandler = logging.FileHandler('ing.log', mode='w', encoding='UTF-8') fileHandler.setLevel(logging.NOTSET) # Formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') consoleHandler.setFormatter(formatter) fileHandler.setFormatter(formatter) # 添加到Logger中 logger.addHandler(consoleHandler) logger.addHandler(fileHandler) return logger.debug(message) # if __name__ == "__main__": # A=LOG() # A.log_debug('123')
PS:logging庫詳細介紹
c、報告輸入
報告輸出依賴庫主要為requests、unittest、下面為代碼示例:
import requests from data.export_url import ExportERL from data.request_dict import RequestData from logic.log_print import LOG from logic.database import ConnectDatabase from data.custom_variable import GlobalVariable class ExportLogic(object): """ ExportLogic 接口主邏輯類 """ def __init__(self): self.Request = RequestData() self.Url = ExportERL() self.ConnectDatabase = ConnectDatabase() self.Log = LOG() self.Custom_variable = GlobalVariable() @staticmethod def request_post(*args, **kwargs): """ 傳入請求參數,返回json請求結果 :param args: :param kwargs: json:請求數據, url:請求路徑, header:請求頭 :return: """ request_data = kwargs.get("json") request_url = kwargs.get("url") request_header = kwargs.get("header") response_json = requests.post(url=request_url, data=request_data, headers=request_header).json() return response_json @staticmethod def request_post_json(*args, **kwargs): """ 傳入請求參數,返回json請求結果 :param args: :param kwargs: json:請求數據, url:請求路徑, header:請求頭 :return: """ request_data = kwargs.get("json") request_url = kwargs.get("url") request_header = kwargs.get("header") response_json = requests.post(url=request_url, json=request_data, headers=request_header) return response_json @staticmethod def request_post_file(*args, **kwargs): """ 上傳接口獨有 :param args: :param kwargs: json:請求數據, url:請求路徑, header:請求頭, file:文件路徑 :return: """ request_data = kwargs.get("json") request_url = kwargs.get("url") request_header = kwargs.get("header") request_file = kwargs.get("files") response_json = requests.post(url=request_url, data=request_data, files=request_file, headers=request_header).json() return response_json if __name__ == "__main__": b = ExportLogic()
import unittest import os import HTMLTestReportCN import datetime from logic.export_logic import ExportLogic from data.custom_variable import GlobalVariable from data.export_url import ExportERL from logic.public_class import Public from data.request_dict import RequestData from logic.log_print import LOG from logic.database import ConnectDatabase from data.conf_dict import ConfData from logic.send_email import SendEmail import random,requests class LZExportCase(unittest.TestCase): """ 新浪有借有還接口測試用例 """ # 全距 PHONE = Public().create_phone() #提交五項資料的Header HEADER = GlobalVariable().get_header('lz_Header') #修改手機號的Header HEADER1 = GlobalVariable().get_header('lz_Header') BASEID = '' PERIODNUM = '' IMGID ='' Repayid = '' Periodnum_ = '' def setUp(self): self.Export_logic = ExportLogic() self.Export_url = ExportERL() self.Public = Public() self.Request_data = RequestData() self.Custom_variable = GlobalVariable() self.Log = LOG() self.ConnectDatabase = ConnectDatabase() self.Conf_data = ConfData() self.password = GlobalVariable().get_variable("password") self.phone = Public().create_phone() self.newphone = GlobalVariable().get_variable("YJ_newphone") # self.report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report") self.log_path = os.path.dirname(os.path.dirname(__file__)) self.path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static"), "logo.jpg") def tearDownC(self): pass # os.remove(os.path.join(self.report_path, "report.html")) # os.remove(os.path.join(os.path.join(self.path, "log"), "Export_log")) def test_085_FileLoad(self): """ Case--上傳文件(saveType正常) """ url = self.Export_url.get_export_url("LZ_test_url", "File") request_json = self.Request_data.get_request_data('File') file_ = open(self.path, 'rb') file = { "file": ('logo.jpg', file_, 'image/jpeg') } request_json["saveType"] = '1' request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post_file(url=url, json=request_json, files=file, header=self.HEADER) LZExportCase.IMGID = response_json['data']['id'] self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code']) file_.close() def test_086_HeadImg(self): """ Case--修改頭像(headimgId正常) """ url = self.Export_url.get_export_url("LZ_test_url", "HeadImg") request_json = self.Request_data.get_request_data('HeadImg') request_json["headimgId"] = LZExportCase.IMGID request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code']) def test_087_HeadImg(self): """ Case--修改頭像(headimgId錯誤) """ url = self.Export_url.get_export_url("LZ_test_url", "HeadImg") request_json = self.Request_data.get_request_data('HeadImg') request_json["headimgId"] = '3213adcx213vcv' request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['code'], response_json['code']) def test_088_HeadImg(self): """ Case--修改頭像(headimgId為空) """ url = self.Export_url.get_export_url("LZ_test_url", "HeadImg") request_json = self.Request_data.get_request_data('HeadImg') request_json["headimgId"] = '' request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s request data:%s" % (self._testMethodName, request_json)) response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER) self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"), "%s response data:%s" % (self._testMethodName, response_json)) self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['msg'], response_json['msg']) \ and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['code'], response_json['code']) if __name__ == '__main__': Send_mail = SendEmail() now_time = datetime.datetime.now().strftime("%Y%m%d%H%M") module_name = os.path.basename(__file__).split(".")[0] module = __import__(module_name) path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "report") logo_path = os.path.join(os.path.join(path, "static"),"logo.jpg") fp = open(os.path.join(path, "report--%s.html" % now_time), "wb") runner = HTMLTestReportCN.HTMLTestRunner(stream=fp) all_suite = unittest.defaultTestLoader.loadTestsFromModule(module) runner.run(all_suite) fp.close() SendEmail.send_email(Send_mail)
d、郵件發送
郵件發送依賴兩個python內置庫smtplib、email:
import os import smtplib import email.mime.multipart import email.mime.text from email import encoders import datetime from data.conf_dict import ConfData from logic.public_class import Public class SendEmail(object): def __init__(self): self.conf_data = ConfData() self.sender = self.conf_data.get_conf_data("email", "sender") self.receiver = self.conf_data.get_conf_data("email", "receiver") self.SMTP_server = self.conf_data.get_conf_data("email", "SMTP_server") self.username = self.conf_data.get_conf_data("email", "username") self.password = self.conf_data.get_conf_data("email", "password") self.content = self.conf_data.get_conf_data("email", "content") self.now_time = datetime.datetime.now().strftime("%Y%m%d%H%M") self.report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report") self.log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log") def create_msg(self,receiver): """ 此函數主要構建收發郵件聯系人、郵件正文、郵件附件、郵件標題 :return: """ # 構建郵件正文 msg = email.mime.multipart.MIMEMultipart() msg["from"] = self.sender msg["to"] = receiver msg['subject'] = "API自動化測試報告" txt = email.mime.text.MIMEText(self.content) msg.attach(txt) # 構建郵件附件之一:自動化測試報告 report_name = Public.get_new_file(path=self.report_path) report_path = os.path.join(self.report_path, report_name) report = email.mime.text.MIMEText(open(report_path, "rb") .read(), 'html', 'utf-8') report["Content-Type"] = 'application/octet-stream' report.add_header('Content-Disposition', 'attachment', filename=('gbk', '', report_name)) encoders.encode_base64(report) msg.attach(report) # 構建郵件附件之一:測試日志 log_name = Public.get_new_file(path=self.log_path) log_path = os.path.join(self.log_path, log_name) info_log = email.mime.text.MIMEText(open(log_path, 'rb').read(), 'base64', 'utf-8') info_log["Content-Type"] = 'application/octet-stream' info_log.add_header('Content-Disposition', 'attachment', filename=('gbk', '', log_name)) encoders.encode_base64(info_log) msg.attach(info_log) return msg def send_email(self,*args): """ 創建實例,發送郵件 :return: """ receiver = self.conf_data.get_conf_data("email", "%s_receiver" % args[0][0]) if args else self.receiver msg = self.create_msg(receiver) smtp = smtplib.SMTP_SSL(self.SMTP_server, 465) # 在Linux端使用ssL方式連接郵箱服務器 # smtp.connect(self.SMTP_server, 465) # 在windows端使用connect方式連接郵箱服務器 smtp.login(self.username, self.password) smtp.sendmail(self.sender, receiver.split(","), msg.as_string()) smtp.quit() if __name__ == "__main__": s = SendEmail() s.send_email()
PS:smtplib、email庫詳細介紹
