WebSocket壓測,可以使用Jmeter方式,也可以使用Python代碼直接壓測
import websocket import time import threading SERVER_URL = "ws://192.168.1.208:7600/box/reader-" def on_message(ws, message): print(message) def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): def send_trhead(): send_info = { "store_id": "0001", "msg_type":"pay_info", "device_id": "device_001", "pay_code": "ffe2sdfdf-8sdfew-sdfsdf-ijlkj", "goods": [ "C004-SD343-234", "D004-GE343-990", "G004-X343-452" ] } while True: time.sleep(5) ws.send(json.dumps(send_info)) t = threading.Thread(target=send_trhead) t.start() def on_start(a): time.sleep(a%20) websocket.enableTrace(True) ws = websocket.WebSocketApp(SERVER_URL + str(a), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() from threadpool import ThreadPool, makeRequests if __name__ == "__main__": init_logging() pool = ThreadPool(5000) test = list() for ir in range(5000): test.append(ir) requests = makeRequests(on_start, test) [pool.putRequest(req) for req in requests] pool.wait() # 同時5000個連接,每個連接5秒發一次數據 # 如果需用作socket client的測試,直接修改啟動連接的函數為socket連接方式 # 參考:https://blog.csdn.net/ennismar/article/details/78190887
壓測參考2:https://blog.csdn.net/m0_37581001/article/details/83624240
# -*- coding:utf-8 -*- # __author__ == 'chenmingle' import websocket import time import threading import json import multiprocessing import uuid from threadpool import ThreadPool, makeRequests # 修改成自己的websocket地址 WS_URL = "xxxx" # 定義進程數 processes = 4 # 定義線程數(每個文件可能限制1024個,可以修改fs.file等參數) thread_num = 700 index = 1 def on_message(ws, message): # print(message) pass def on_error(ws, error): print(error) pass def on_close(ws): # print("### closed ###") pass def on_open(ws): global index index = index + 1 def send_thread(): # 設置你websocket的內容 # 每隔10秒發送一下數據使鏈接不中斷 while True: ws.send(u'hello服務器') time.sleep(10) t = threading.Thread(target=send_thread) t.start() def on_start(num): time.sleep(5) # websocket.enableTrace(True) ws = websocket.WebSocketApp(WS_URL + str(num), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() def thread_web_socket(): # 線程池 pool_list = ThreadPool(thread_num) num = list() # 設置開啟線程的數量 for ir in range(thread_num): num.append(ir) requests = makeRequests(on_start, num) [pool_list.putRequest(req) for req in requests] pool_list.wait() if __name__ == "__main__": # 進程池 pool = multiprocessing.Pool(processes=processes) # 設置開啟進程的數量 for i in xrange(processes): pool.apply_async(thread_web_socket) pool.close() pool.join()
可以參考:https://blog.51cto.com/legehappy/2144645?source=dra
前提需要安裝相關依賴庫
pip install websocket pip install threadpool pip install websocket-client pip install multiprocessing
腳本如下,參考,也可以參考其github:
#!/usr/bin/python #-*- coding:utf-8 -*- #__author__ == 'chenmingle' import websocket import time import threading import json import multiprocessing from threadpool import ThreadPool, makeRequests #修改成自己的websocket地址 WS_URL = "wss://ws.test.com/" #定義進程數 processes=5 #定義線程數(每個文件可能限制1024個,可以修改fs.file等參數) thread_num=1000 def on_message(ws, message): print(message) pass def on_error(ws, error): print(error) pass def on_close(ws): print("### closed ###") pass def on_open(ws): def send_trhead(): #設置你websocket的內容 send_info = {"cmd": "refresh", "data": {"room_id": "58", "wx_user_id": 56431}} #每隔10秒發送一下數據使鏈接不中斷 while True: time.sleep(10) ws.send(json.dumps(send_info)) t = threading.Thread(target=send_trhead) t.start() def on_start(num): time.sleep(num%20) websocket.enableTrace(True) ws = websocket.WebSocketApp(WS_URL + str(num), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() def thread_web_socket(): #線程池 pool = ThreadPool(thread_num) num = list() #設置開啟線程的數量 for ir in range(thread_num): num.append(ir) requests = makeRequests(on_start, num) [pool.putRequest(req) for req in requests] pool.wait() if __name__ == "__main__": #進程池 pool = multiprocessing.Pool(processes=processes) #設置開啟進程的數量 for i in xrange(processes): pool.apply_async(thread_web_socket) pool.close() pool.join()
待續