websocket接口自动化的封装


import time,logging
import websocket

class WebSocketCli:
    def __init__(self):
        self.ws = websocket.create_connection("ws://testws.111.com/",timeout=30)

    def send(self,message):
        self.ws.send(message)

    def recv(self,line=1,include=None):
        result = []
        try:
            while True:
                recvdata =  self.ws.recv()
                if "error" in recvdata and '"error": null' not in recvdata:
                    result.append(recvdata)
                    break
                if include and include in recvdata:
                    result.append(recvdata)
                else:
                    result.append(recvdata)
                if len(result) >= line:
                    break
        except websocket._exceptions.WebSocketTimeoutException:
            logging.error("超时啦")
        except:
            logging.error("出现了其他错误")
        logging.info(result)
        self.ws.close()
        return result

 

websocket自动化的使用:

import time,json,logging
from common.websocketcli import WebSocketCli
from verify_response.verify_base import verify_data


def test_ping():
    d = '{"id":1,"method":"server.ping","params":[]}'
    ws = WebSocketCli()
    ws.send(d)
    recvdata = ws.recv(1)
    recvjson = json.loads(recvdata[0])
    # logging.info(recvjson)
    verify_data(recvjson,{"error": None, "result": "pong", "id": 1})

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM