背景:测试环境需要构造大量订单数据来验证资金分账的正确性
创建订单
# coding=utf-8 import requests import time import uuid import hashlib import hmac import urllib3 # from config import read_yaml urllib3.disable_warnings() def shunfeng(): # 时间戳 millis = "%d" % (time.time() * 1000) # 订单号 order_id = ''.join((str(uuid.uuid1()).split('-'))) url = 'https://test' mid = 'test' secret = 'test' sec_data = {"data": mid, "data": 'HmacSHA256', "data": '1', "data": millis, "data": order_id, "orderType": "1", "data": "100", "data": "WechatPay", "data": "88888888", "data": "110.110.110.110", "data": "https://test.com", "data": "CNY", "data": order_id} sec_data_str = '' for k in sorted(sec_data): sec_data_str += k + '=' + sec_data[k] + '&' sec_data_str = sec_data_str[:-1] print('\nsing排序后参数:\n'+sec_data_str) signature_str = hmac.new(secret.encode('utf-8'), sec_data_str.encode('utf-8'), digestmod=hashlib.sha256).digest() sign = signature_str.hex().upper() print('\nsign:\n' + sign) body_data = {"data": mid, "data": 'HmacSHA256', "data": '1', "data": millis, "data": order_id, "data": "1", "data": "100", "data": "WechatPay", "data": "88888888", "data": "110.110.110.110", "data": "https://test.com", "data": "CNY", "data": order_id, "data": sign} body_str = '' for i in body_data.items(): body_str += i[0] + '=' + i[1] + '&' body_str = body_str[:-1] print('\nbody参数:\n' + body_str) head = {"Content-Type": "application/x-www-form-urlencoded"} req = requests.post(url=url, data=body_str, headers=head) print('\n请求url:\n' + req.url) print('\n下单返回结果:' + req.text) if __name__ == '__main__': shunfeng()
接收订单推送
# coding=utf-8 import urllib3 import re from websocket import create_connection urllib3.disable_warnings() def wss(uid): url = 'wss://test' ws = create_connection(url) ws.send('{"test":2,"tese":16,"test":1,"body":{"test":"test","test":"test","test":"1.0","test":"1.0"}}') while True: ws.send('{"test":5,"test":16,"test":2,"test":{"userId":' + '"' + uid + '"' + '}}') send_data = ws.recv() if 'test' in str(send_data): str1 = str(send_data).split('\\') str2 = str(str1).split(' '' ') str3 = ''.join(str(str2).split('\\')) str4 = re.sub("[\"\'[, ]", "", str3) str5 = re.findall(r'invoice:(.*?)=', str4) order_id = ''.join(str5) + '=' # print(order_id) return order_id if __name__ == '__main__': wss('uid')
登陆-抢单-完成
import urllib3 import requests import logging import time from web_wss import wss urllib3.disable_warnings() time_log = time.strftime('%Y-%m-%d %H_%M_%S', time.localtime(time.time())) # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') logging.basicConfig(filename= 'result.log', filemode= 'w+', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def login(): list_uid = [] list_head = [] phone = phone login_header = {'User-Agent': 'test/1.0.0(build:test;ANDROID 9)', 'bndkey': 'test'} for line in phone: try: # requests.adapters.DEFAULT_RETRIES = 10 # s = requests.session() # s.keep_alive = False sessionRequest = requests.session() # 登陆 app_login = {'account': '86,' + line, 'accountKind': '1', 'passwd': 'test'} r_login = sessionRequest.post(url=url_login, data=app_login, headers=login_header, verify=False, timeout=10) # print(r_login.url) if '登录成功' in r_login.text: logging.info(u'登陆成功: ' + line + ' - ' + r_login.json()['data']['uid']) else: logging.info(u'登陆失败: ' + r_login.text) # 请求头 uid = r_login.json()['data']['uid'] u_token = dict(r_login.headers)['utoken'] t_token = dict(r_login.headers)['ttoken'] head = {'User-Agent': 'test/1.0.0(build:test;ANDROID 7.1.1)', 'bndkey': 'test', 'utoken': u_token, 'ttoken': t_token} r_start = sessionRequest.post(url=url_open_take, data={'main': '1'}, headers=head, verify=False, timeout=10) if 'true' in r_start.text: logging.info(u'接单开启成功: uid - ' + str(uid) + ' - ' + r_start.text) else: logging.info(u'接单开启失败: uid - ' + str(uid) + ' - ' + r_start.text) r_sell = requests.post(url=url_sell, data={'merchantDeposit': '1', 'playerDeposit': '1'}, headers=head, verify=False, timeout=10) if 'true' in r_sell.text: logging.info(u'卖币开启成功: uid - ' + str(uid) + ' - ' + r_sell.text) else: logging.info(u'卖币开启失败: uid - ' + str(uid) + ' - ' + r_sell.text) list_uid.append(uid) list_head.append(head) logging.info(u'uid: ' + str(uid) + ' - utoken: ' + str(u_token) + ' - ttoken: ' + str(t_token) + '\n') # logging.info(u'HEAD: ' + str(list_head) + '\n') except BaseException as msg: logging.info(u'---ERROR:' + str(msg) + '\n' + '\n' + '\n') return list_uid, list_head def accomplishOrder(): list_data = login() list_uid = list_data[0] list_head = list_data[1] for uid, head in zip(list_uid, list_head): try: sessionRequest = requests.session() # list_order_id = [] # list_amount = [] try: if wss(uid) is not None: # 抢单 order_id_sec = wss(uid) r_rob = sessionRequest.post(url=url_rob, data={'id': order_id_sec}, headers=head, verify=False, timeout=10) logging.info(u'抢单: ' + r_rob.text) # 查询进行中订单 r_get_order = sessionRequest.get(url=url_run_order, headers=head, verify=False, timeout=10) if r_get_order.json()['data'] is not None: order_data = r_get_order.json() for i in range(len(r_get_order.json()['data'])): order_id = order_data['data'][i]['id'] amount = order_data['data'][i]['amount']['reality'] # list_order_id.append(order_id) # list_amount.append(amount) # # logging.info('uid: ' + uid + ' - orderId: ' + order_id + '\n') # return list_order_id, list_amount # 完成订单 r_pay = sessionRequest.post(url=url_pay, data={'id': order_id, 'amount': amount}, headers=head, verify=False, timeout=10) if 'true' in r_pay.text: logging.info(u'完成订单: uid - ' + uid + ' orderId: ' + order_id + '\n') else: logging.info(u'订单未完成: uid - ' + uid + r_pay.text) else: logging.info(u'暂时没有进行中订单: ' + 'uid: ' + uid) except BaseException as msg: logging.info(u'---ERROR:' + str(msg) + '\n') except BaseException as msg: logging.info(u'--- ERROR:' + str(msg) + '\n') if __name__ == '__main__': while True: accomplishOrder()