python做一個http接口測試框架


目錄結構

project

  |

  case#測試用例

    |

    suite#測試目錄

  |

  logs#測試日志

  |

  papi#測試類

  |

  result#測試結果

  |

  setting.py#配置文件

1、日志類,用於測試時日志記錄 

pyapilog.py
 1 # -*-coding:utf-8 -*-
 2 # !/usr/bin/python
 3 __author__ = 'dongjie'
 4 __data__ = '2015-05-20'
 5 
 6 import logging
 7 import datetime
 8 import os
 9 import setting
10 logLevel = {
11    1 : logging.NOTSET,
12    2 : logging.DEBUG,
13    3 : logging.INFO,
14    4 : logging.WARNING,
15    5 : logging.ERROR,
16    6 : logging.CRITICAL
17 }
18 setFile = os.path.join(setting.root_dir, 'setting.ini')
19 loggers = {}
20 
21 
22 #  定義日志方法,從配置文件讀取日志等級,且定義日志輸出路徑
23 def pyapilog(**kwargs):
24     global loggers
25     log_level = setting.logLevel
26     log_path = setting.logFile
27     if os.path.exists(log_path):
28         log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
29     else:
30         os.mkdir(r'%s' % log_path)
31         log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
32     logger = logging.getLogger()
33     logger.setLevel(logLevel[log_level])
34     if not logger.handlers:
35         # 創建一個handler,用於寫入日志文件
36         fh = logging.FileHandler(log_file)
37         fh.setLevel(logLevel[log_level])
38         # 再創建一個handler,用於輸出到控制台
39         ch = logging.StreamHandler()
40         ch.setLevel(logging.ERROR)
41         # 定義handler的輸出格式
42         formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
43         fh.setFormatter(formatter)
44         ch.setFormatter(formatter)
45         # 給logger添加handler
46         logger.addHandler(fh)
47         logger.addHandler(ch)
48         loggers.update(dict(name=logger))
49     return  logger

2、http測試類

httprequest.py
# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'

from pyapilog import pyapilog
import requests
import json
import urllib

class SendHttpRequest(object):
    def __init__(self, url):
        self.url = url
    # post request

    def post(self, value=None):
        params = urllib.urlencode(value)
        try:
            req = requests.post(self.url + "?%s" % params)
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"發送post請求: %s  服務器返回:  %s" % (req.url, req.status_code))
        else:
            pyapilog().error(u"發送post請求: %s   服務器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
        return req.text

    def post_json(self, value):
        head = {'content-type': 'application/json'}
        try:
            req = requests.post(self.url, data=json.dumps(value), headers=head)
            print req.url
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"發送post請求: %s  服務器返回:  %s" % (req.url, req.status_code))
            return req.text
        else:
            pyapilog().error(u"發送post請求: %s   服務器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))

    def get(self, value=None):
        try:
            req = requests.get(self.url, params=value)
        except Exception, err:
            print err
        if req.status_code == 200:
            pyapilog().info(u"發送get請求: %s   服務器返回:  %s" % (req.url, req.status_code))
        else:
            pyapilog().error(u"發送get請求: %s   服務器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
        return req.text

3、數據庫操作類

databasedriver.py# -*-coding:utf-8 -*# !/usr/bin/python
__author__ = 'dongjie' __data__ = '2015-05-21' import pymssql import MySQLdb import setting from pyapilog import pyapilog class sqldriver(object): def __init__(self, host, port, user, password, database): self.host = host self.port = port self.user = user self.password = password self.database = database # 執行SQLserver查詢 def exec_mssql(self, sql): try: conn = pymssql.connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset="utf8") cur = conn.cursor() if cur: pyapilog().info(u"執行SQL語句|%s|" % sql) cur.execute(sql) rows = cur.fetchall() if len(rows) == 0: pyapilog().warning(u"沒有查詢到數據") return rows else: pyapilog().error(u"數據庫連接不成功") conn.close() except Exception, e: pyapilog().error(e) # 執行Mysql查詢 def exec_mysql(self, sql): try: conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, db=self.database, ) cur = conn.cursor() if cur: pyapilog().info(u"執行SQL語句|%s|" % sql) resList = cur.execute(sql) return resList except Exception, e: pyapilog().error(e) # 執行sql語句返回結果 def execsql(sql): config = setting.DATABASE driver = config.get("ENGINE") host = config.get("HOST") port = config.get("PORT") user = config.get("USER") password = config.get("PWD") database = config.get("DATABASE") if driver == "MYSQL": try: sql_result = sqldriver( host=host, port=port, user=user, password=password, database=database ).exec_mysql(sql) return sql_result except Exception, e: pyapilog().error(e) elif driver == "MSSQL": try: sql_result = sqldriver( host=host, port=port, user=user, password=password, database=database ).exec_mssql(sql) return sql_result except Exception, e: pyapilog().error(e)
else: pyapilog().error(u"[%s]數據庫配置支持MYSQL、MSSQL、ORACLE" % driver)

4、解析json字符串

dataprase.py
# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-21'
import json
import xmltodict
from pyapilog import pyapilog

# 解析json字符串
class jsonprase(object):
    def __init__(self, json_value):
        try:
            self.json_value = json.loads(json_value)
        except ValueError, e:
            pyapilog().error(e)

    def find_json_node_by_xpath(self, xpath):
        elem = self.json_value
        nodes = xpath.strip("/").split("/")
        for x in range(len(nodes)):
            try:
                elem = elem.get(nodes[x])
            except AttributeError:
                elem = [y.get(nodes[x]) for y in elem]
        return elem

    def datalength(self, xpath="/"):
        return len(self.find_json_node_by_xpath(xpath))

    @property
    def json_to_xml(self):
        try:
            root = {"root": self.json_value}
            xml = xmltodict.unparse(root, pretty=True)
        except ArithmeticError, e:
            pyapilog().error(e)
        return xml

# 解析xml字符串
class xmlprase(object):
    def __init__(self, xml_value):
        self.xml_str = xml_value

    @property
    def xml_to_json(self):
        try:
            xml_dic = xmltodict.parse(self.xml_str,
                                      encoding="utf-8",
                                      process_namespaces=True,
                                      )
            json_str = json.dumps(xml_dic)
        except Exception, e:
            print e
        return json_str

5、還有配置文件差點忘記說了

# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'

'''
    配置系統相關的參數,提供全局的相關配置
'''
import os
import sys
root_dir = '/'.join(os.path.realpath(__file__).split('/')[:-1])
sys.path.append(root_dir)
# log等級,1:notset 2:debug  3:info 4:warning 5:error 6:critical
logLevel = 2
# 日志文件路徑
logFile = os.path.join(root_dir, 'logs')

# 數據庫配置,支持MYSQL、MSSQL、ORACLE
DATABASE = {
    "ENGINE": "MSSQL",
    "HOST": "",
    "PORT": 3433,
    "USER": "",
    "PWD": "",
    "DATABASE": ""
}

6、最后看看我們的測試用例吧,當然是數據驅動了

# -*-coding:utf-8 -*-
from ddt import ddt, data, unpack
import unittest
from papi.httprequest import SendHttpRequest
from papi.dataparse import jsonprase, xmlprase

@ddt
class TestSingleRequest(unittest.TestCase):
    def setUp(self):
        self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx"
    @data(
        (32351, 6),
        (9555, 4)
    )
    @unpack
    def test_Single_right(self, sid, count):
        value = {"sid": sid, "count": count}
        data = SendHttpRequest(self.url).get(value)
        json_data = jsonprase(data)
        point_lat = json_data.find_json_node_by_xpath("/Point/Lat")
        point_lng = json_data.find_json_node_by_xpath("/Point/Lng")
        is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap")
        size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size")
        # 斷言
        assert float(point_lat) != 0 and float(point_lng) != 0
        # 斷言
        assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None
        if is_exists_map == True:
            assert size != ""

    # 導常請求SingleRequest接口
    @data(
        ("abceeffffg", 6),
        (9555, "")
    )
    @unpack
    def test_Single_error(self, sid, count):
        value = {"sid": sid, "count": count}
        data = SendHttpRequest(self.url).get(value)
        self.assertEqual(data, u'{"Message":"請求無效。"}')

@ddt
class TourMaps(unittest.TestCase):
    def setUp(self):
        self.url = "http://xxxxxx/api/TourMap"

    @data(32351, 9555)
    def test_requests_online_xml(self, tourId):
        xml_url = self.url + "/%s" % tourId
        data = SendHttpRequest(xml_url).get()
        json_st = xmlprase(data).xml_to_json
        json_data = jsonprase(json_st)
        lng = json_data.find_json_node_by_xpath("/root/data/@lng")
        lat = json_data.find_json_node_by_xpath("/root/data/@lat")
        assert lng != "" and lat != ""
        son_tour = json_data.find_json_node_by_xpath("/root/data/data")
        assert len(son_tour) > 0

class TourData(unittest.TestCase):
    def setUp(self):
        self.url = "http://xxxxxx/api/xxx"

    @data(
        (),
        (),
        (),
    )
    @unpack
    def test_tourList_Location_in_open(self):
        pass

    def test_tourList_Location_not_open(self):
        pass

    def test_tour_open_city(self):
        pass


if __name__ == "__main__":
    suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest)
    unittest.TextTestRunner(verbosity=2).run(suite)

關於python ddt查以參考https://ddt.readthedocs.org/en/latest/example.html

測試結果生成,可以查看python nose相關文檔,生成hmtl

本文主要提供了代碼,如果相關疑問可聯系本人,郵箱d1988505j@163.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM