python测试http、websocket接口


背景:测试环境需要构造大量订单数据来验证资金分账的正确性

创建订单

# coding=utf-8

import requests
import time
import uuid
import hashlib
import hmac
import urllib3
# from config import read_yaml

urllib3.disable_warnings()


def shunfeng():
    # 时间戳
    millis = "%d" % (time.time() * 1000)

    # 订单号
    order_id = ''.join((str(uuid.uuid1()).split('-')))

    url = 'https://test'
    mid = 'test'
    secret = 'test'

    sec_data = {"data": mid,
                "data": 'HmacSHA256',
                "data": '1',
                "data": millis,
                "data": order_id,
                "orderType": "1",
                "data": "100",
                "data": "WechatPay",
                "data": "88888888",
                "data": "110.110.110.110",
                "data": "https://test.com",
                "data": "CNY",
                "data": order_id}

    sec_data_str = ''
    for k in sorted(sec_data):
        sec_data_str += k + '=' + sec_data[k] + '&'
    sec_data_str = sec_data_str[:-1]
    print('\nsing排序后参数:\n'+sec_data_str)

    signature_str = hmac.new(secret.encode('utf-8'), sec_data_str.encode('utf-8'), digestmod=hashlib.sha256).digest()
    sign = signature_str.hex().upper()
    print('\nsign:\n' + sign)

    body_data = {"data": mid,
                 "data": 'HmacSHA256',
                 "data": '1',
                 "data": millis,
                 "data": order_id,
                 "data": "1",
                 "data": "100",
                 "data": "WechatPay",
                 "data": "88888888",
                 "data": "110.110.110.110",
                 "data": "https://test.com",
                 "data": "CNY",
                 "data": order_id,
                 "data": sign}

    body_str = ''
    for i in body_data.items():
        body_str += i[0] + '=' + i[1] + '&'
    body_str = body_str[:-1]
    print('\nbody参数:\n' + body_str)

    head = {"Content-Type": "application/x-www-form-urlencoded"}

    req = requests.post(url=url, data=body_str, headers=head)
    print('\n请求url:\n' + req.url)
    print('\n下单返回结果:' + req.text)


if __name__ == '__main__':
    shunfeng()

 

接收订单推送

# coding=utf-8

import urllib3
import re
from websocket import create_connection

urllib3.disable_warnings()


def wss(uid):

    url = 'wss://test'
    ws = create_connection(url)

    ws.send('{"test":2,"tese":16,"test":1,"body":{"test":"test","test":"test","test":"1.0","test":"1.0"}}')

    while True:

        ws.send('{"test":5,"test":16,"test":2,"test":{"userId":' + '"' + uid + '"' + '}}')
        send_data = ws.recv()
        if 'test' in str(send_data):

            str1 = str(send_data).split('\\')

            str2 = str(str1).split(' '' ')

            str3 = ''.join(str(str2).split('\\'))

            str4 = re.sub("[\"\'[, ]", "", str3)

            str5 = re.findall(r'invoice:(.*?)=', str4)

            order_id = ''.join(str5) + '='
            # print(order_id)

            return order_id


if __name__ == '__main__':
    wss('uid')

 

登陆-抢单-完成

import urllib3
import requests
import logging
import time
from web_wss import wss

urllib3.disable_warnings()

time_log = time.strftime('%Y-%m-%d %H_%M_%S', time.localtime(time.time()))
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logging.basicConfig(filename= 'result.log', filemode= 'w+', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def login():
    list_uid = []
    list_head = []

    phone = phone

    login_header = {'User-Agent': 'test/1.0.0(build:test;ANDROID 9)',
                    'bndkey': 'test'}

    for line in phone:
        try:
            # requests.adapters.DEFAULT_RETRIES = 10
            # s = requests.session()
            # s.keep_alive = False

            sessionRequest = requests.session()

            # 登陆
            app_login = {'account': '86,' + line,
                         'accountKind': '1',
                         'passwd': 'test'}
            r_login = sessionRequest.post(url=url_login, data=app_login, headers=login_header, verify=False, timeout=10)
            # print(r_login.url)

            if '登录成功' in r_login.text:
                logging.info(u'登陆成功: ' + line + ' - ' + r_login.json()['data']['uid'])
            else:
                logging.info(u'登陆失败: ' + r_login.text)

            # 请求头
            uid = r_login.json()['data']['uid']
            u_token = dict(r_login.headers)['utoken']
            t_token = dict(r_login.headers)['ttoken']
            head = {'User-Agent': 'test/1.0.0(build:test;ANDROID 7.1.1)',
                    'bndkey': 'test',
                    'utoken': u_token,
                    'ttoken': t_token}

            r_start = sessionRequest.post(url=url_open_take, data={'main': '1'}, headers=head, verify=False, timeout=10)
            if 'true' in r_start.text:
                logging.info(u'接单开启成功: uid - ' + str(uid) + ' - ' + r_start.text)
            else:
                logging.info(u'接单开启失败: uid - ' + str(uid) + ' - ' + r_start.text)

            r_sell = requests.post(url=url_sell, data={'merchantDeposit': '1', 'playerDeposit': '1'}, headers=head, verify=False, timeout=10)
            if 'true' in r_sell.text:
                logging.info(u'卖币开启成功: uid - ' + str(uid) + ' - ' + r_sell.text)
            else:
                logging.info(u'卖币开启失败: uid - ' + str(uid) + ' - ' + r_sell.text)

            list_uid.append(uid)
            list_head.append(head)

            logging.info(u'uid: ' + str(uid) + ' - utoken: ' + str(u_token) + ' - ttoken: ' + str(t_token) + '\n')
            # logging.info(u'HEAD: ' + str(list_head) + '\n')

        except BaseException as msg:
            logging.info(u'---ERROR:' + str(msg) + '\n' + '\n' + '\n')

    return list_uid, list_head


def accomplishOrder():
    list_data = login()
    list_uid = list_data[0]
    list_head = list_data[1]

    for uid, head in zip(list_uid, list_head):
        try:
            sessionRequest = requests.session()

            # list_order_id = []
            # list_amount = []

            try:
                if wss(uid) is not None:
                    # 抢单
                    order_id_sec = wss(uid)
                    r_rob = sessionRequest.post(url=url_rob, data={'id': order_id_sec}, headers=head, verify=False, timeout=10)
                    logging.info(u'抢单: ' + r_rob.text)

                    # 查询进行中订单
                    r_get_order = sessionRequest.get(url=url_run_order, headers=head, verify=False, timeout=10)

                    if r_get_order.json()['data'] is not None:
                        order_data = r_get_order.json()
                        for i in range(len(r_get_order.json()['data'])):
                            order_id = order_data['data'][i]['id']
                            amount = order_data['data'][i]['amount']['reality']

                            # list_order_id.append(order_id)
                            # list_amount.append(amount)
                            # # logging.info('uid: ' + uid + ' - orderId: ' + order_id + '\n')
                            # return list_order_id, list_amount

                            # 完成订单
                            r_pay = sessionRequest.post(url=url_pay, data={'id': order_id, 'amount': amount}, headers=head, verify=False, timeout=10)
                            if 'true' in r_pay.text:
                                logging.info(u'完成订单: uid - ' + uid + '   orderId: ' + order_id + '\n')
                            else:
                                logging.info(u'订单未完成: uid - ' + uid + r_pay.text)
                    else:
                        logging.info(u'暂时没有进行中订单: ' + 'uid: ' + uid)

            except BaseException as msg:
                logging.info(u'---ERROR:' + str(msg) + '\n')
        except BaseException as msg:
            logging.info(u'--- ERROR:' + str(msg) + '\n')


if __name__ == '__main__':
    while True:
        accomplishOrder()


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM