Locust 默認支持 HTTP 協議(默認通過 HttpUser 類),我們也可以自行實現任意協議的 Client 對它 User 類進行繼承(HttpUser 也是繼承自 User)並增加所需要的方法,這樣也就實現了任意協議的壓測。
針對 WebSocket 協議的 Locust 壓測腳本實現無非就是三個步驟
- 編寫一個 WebSocket Client,也就是定義一個 Class,實現 WS連接初始化、事件訂閱、消息接收 所需要的方法
- 使用 WebSocket Client 繼承 User 類,產生 WebsocketUser
- 依據測試用例編寫壓測腳本,使用 WebsocketUser內預定義的方法 實現並發的連接、事件訂閱、消息接收
腳本實現參考
from locust import User, task, events, constant
import time
import websocket
import ssl
import json
import jsonpath
def eventType_success(eventType, recvText, total_time):
events.request_success.fire(request_type="[RECV]",
name=eventType,
response_time=total_time,
response_length=len(recvText))
class WebSocketClient(object):
_locust_environment = None
def __init__(self, host):
self.host = host
# 針對 WSS 關閉 SSL 校驗警報
self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
def connect(self, burl):
start_time = time.time()
try:
self.conn = self.ws.connect(url=burl)
except websocket.WebSocketConnectionClosedException as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(
request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
except websocket.WebSocketTimeoutException as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(
request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(
request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
return self.conn
def recv(self):
return self.ws.recv()
def send(self, msg):
self.ws.send(msg)
class WebsocketUser(User):
abstract = True
def __init__(self, *args, **kwargs):
super(WebsocketUser, self).__init__(*args, **kwargs)
self.client = WebSocketClient(self.host)
self.client._locust_environment = self.environment
class ApiUser(WebsocketUser):
host = "wss://ws.xxxxx.com/"
wait_time = constant(0)
@task(1)
def pft(self):
# wss 地址
self.url = 'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
self.data = {}
self.client.connect(self.url)
# 發送的訂閱請求
sendMsg = '{"appid":"futures","cover":0,"event":[\
{"type":"exchange_rate","toggle":1,"expireTime":86400},\
{"type":"accountInfo_USDT","toggle":1,"expireTime":86400},\
{"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
self.client.send(sendMsg)
while True:
# 消息接收計時
start_time = time.time()
recv = self.client.recv()
total_time = int((time.time() - start_time) * 1000)
# 為每個推送過來的事件進行歸類和獨立計算性能指標
try:
recv_j = json.loads(recv)
eventType_s = jsonpath.jsonpath(recv_j, expr='$.eventType')
eventType_success(eventType_s[0], recv, total_time)
except websocket.WebSocketConnectionClosedException as e:
events.request_failure.fire(request_type="[ERROR] WebSocketConnectionClosedException",
name='Connection is already closed.',
response_time=total_time,
exception=e)
except:
print(recv)
# 正常 OK 響應,或者其它心跳響應加入進來避免當作異常處理
if 'ok' in recv:
eventType_success('ok', 'ok', total_time)
class WebSocketClient
- 實現了 WebSocket 的所有行為方法,包括連接初始化、消息發送(訂閱)、消息接收
- 對連接過程中的異常進行捕獲統計,記錄異常響應的時間,便於后續測試分析
- 這段腳本基本拷貝就能用:)
class WebsocketUser
- 繼承 Locust 的 user 成為 WebsocketUser
class ApiUser
- 在這里加載 WebsocketUser,初始化的 user,發送訂閱請求、並在一個死循環內接收消息推送內容
- 對接收的消息內容(json格式)進行解析,最終可以在 WEB UI 看到各種推送事件的推送統計
- 對接收推送過程中的異常進行捕獲,記錄異常響應的時間,便於后續測試分析
也可以在死循環內加入心跳發送,但建議建議按照規則發送,避免過於頻繁,此處略
壓測過程