針對 WebSocket 協議的 Locust 壓測腳本實現(基於 Locust 1.0 以上版本)


Locust 默認支持 HTTP 協議(默認通過 HttpUser 類),我們也可以自行實現任意協議的 Client 對它 User 類進行繼承(HttpUser 也是繼承自 User)並增加所需要的方法,這樣也就實現了任意協議的壓測。

針對 WebSocket 協議的 Locust 壓測腳本實現無非就是三個步驟

  1. 編寫一個 WebSocket Client,也就是定義一個 Class,實現 WS連接初始化、事件訂閱、消息接收 所需要的方法
  2. 使用 WebSocket Client 繼承 User 類,產生 WebsocketUser
  3. 依據測試用例編寫壓測腳本,使用 WebsocketUser內預定義的方法 實現並發的連接、事件訂閱、消息接收

腳本實現參考

from locust import User, task, events, constant
import time
import websocket
import ssl
import json
import jsonpath

def eventType_success(eventType, recvText, total_time):
    events.request_success.fire(request_type="[RECV]",
                                name=eventType,
                                response_time=total_time,
                                response_length=len(recvText))

class WebSocketClient(object):
    
    _locust_environment = None
    
    def __init__(self, host):
        self.host = host
        # 針對 WSS 關閉 SSL 校驗警報
        self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
        
    def connect(self, burl):
        start_time = time.time()
        try:
            self.conn = self.ws.connect(url=burl)
        except websocket.WebSocketConnectionClosedException as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(
                request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
        except websocket.WebSocketTimeoutException as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(
                request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(
                request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
        return self.conn
        
    def recv(self):
        return self.ws.recv()
        
    def send(self, msg):
        self.ws.send(msg)
        
class WebsocketUser(User):
    abstract = True
    def __init__(self, *args, **kwargs):
        super(WebsocketUser, self).__init__(*args, **kwargs)
        self.client = WebSocketClient(self.host)
        self.client._locust_environment = self.environment
        
class ApiUser(WebsocketUser):
    host = "wss://ws.xxxxx.com/"
    wait_time = constant(0)
    
    @task(1)
    def pft(self):
        # wss 地址
        self.url = 'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
        self.data = {}
        self.client.connect(self.url)
        
        # 發送的訂閱請求
        sendMsg = '{"appid":"futures","cover":0,"event":[\
            {"type":"exchange_rate","toggle":1,"expireTime":86400},\
            {"type":"accountInfo_USDT","toggle":1,"expireTime":86400},\
            {"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
        self.client.send(sendMsg)
        
        while True:
            # 消息接收計時
            start_time = time.time()
            recv = self.client.recv()
            total_time = int((time.time() - start_time) * 1000)
            
            # 為每個推送過來的事件進行歸類和獨立計算性能指標
            try:
                recv_j = json.loads(recv)
                eventType_s = jsonpath.jsonpath(recv_j, expr='$.eventType')
                eventType_success(eventType_s[0], recv, total_time)
            except websocket.WebSocketConnectionClosedException as e:
                events.request_failure.fire(request_type="[ERROR] WebSocketConnectionClosedException",
                                            name='Connection is already closed.',
                                            response_time=total_time,
                                            exception=e)
            except:
                print(recv)
                # 正常 OK 響應,或者其它心跳響應加入進來避免當作異常處理
                if 'ok' in recv:
                    eventType_success('ok', 'ok', total_time)

class WebSocketClient

  1. 實現了 WebSocket 的所有行為方法,包括連接初始化、消息發送(訂閱)、消息接收
  2. 對連接過程中的異常進行捕獲統計,記錄異常響應的時間,便於后續測試分析
  3. 這段腳本基本拷貝就能用:)

class WebsocketUser

  1. 繼承 Locust 的 user 成為 WebsocketUser

class ApiUser

  1. 在這里加載 WebsocketUser,初始化的 user,發送訂閱請求、並在一個死循環內接收消息推送內容
  2. 對接收的消息內容(json格式)進行解析,最終可以在 WEB UI 看到各種推送事件的推送統計
  3. 對接收推送過程中的異常進行捕獲,記錄異常響應的時間,便於后續測試分析

也可以在死循環內加入心跳發送,但建議建議按照規則發送,避免過於頻繁,此處略

壓測過程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM