WebSocket压测,可以使用Jmeter方式,也可以使用Python代码直接压测
import websocket import time import threading SERVER_URL = "ws://192.168.1.208:7600/box/reader-" def on_message(ws, message): print(message) def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): def send_trhead(): send_info = { "store_id": "0001", "msg_type":"pay_info", "device_id": "device_001", "pay_code": "ffe2sdfdf-8sdfew-sdfsdf-ijlkj", "goods": [ "C004-SD343-234", "D004-GE343-990", "G004-X343-452" ] } while True: time.sleep(5) ws.send(json.dumps(send_info)) t = threading.Thread(target=send_trhead) t.start() def on_start(a): time.sleep(a%20) websocket.enableTrace(True) ws = websocket.WebSocketApp(SERVER_URL + str(a), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() from threadpool import ThreadPool, makeRequests if __name__ == "__main__": init_logging() pool = ThreadPool(5000) test = list() for ir in range(5000): test.append(ir) requests = makeRequests(on_start, test) [pool.putRequest(req) for req in requests] pool.wait() # 同时5000个连接,每个连接5秒发一次数据 # 如果需用作socket client的测试,直接修改启动连接的函数为socket连接方式 # 参考:https://blog.csdn.net/ennismar/article/details/78190887
压测参考2:https://blog.csdn.net/m0_37581001/article/details/83624240
# -*- coding:utf-8 -*- # __author__ == 'chenmingle' import websocket import time import threading import json import multiprocessing import uuid from threadpool import ThreadPool, makeRequests # 修改成自己的websocket地址 WS_URL = "xxxx" # 定义进程数 processes = 4 # 定义线程数(每个文件可能限制1024个,可以修改fs.file等参数) thread_num = 700 index = 1 def on_message(ws, message): # print(message) pass def on_error(ws, error): print(error) pass def on_close(ws): # print("### closed ###") pass def on_open(ws): global index index = index + 1 def send_thread(): # 设置你websocket的内容 # 每隔10秒发送一下数据使链接不中断 while True: ws.send(u'hello服务器') time.sleep(10) t = threading.Thread(target=send_thread) t.start() def on_start(num): time.sleep(5) # websocket.enableTrace(True) ws = websocket.WebSocketApp(WS_URL + str(num), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() def thread_web_socket(): # 线程池 pool_list = ThreadPool(thread_num) num = list() # 设置开启线程的数量 for ir in range(thread_num): num.append(ir) requests = makeRequests(on_start, num) [pool_list.putRequest(req) for req in requests] pool_list.wait() if __name__ == "__main__": # 进程池 pool = multiprocessing.Pool(processes=processes) # 设置开启进程的数量 for i in xrange(processes): pool.apply_async(thread_web_socket) pool.close() pool.join()
可以参考:https://blog.51cto.com/legehappy/2144645?source=dra
前提需要安装相关依赖库
pip install websocket pip install threadpool pip install websocket-client pip install multiprocessing
脚本如下,参考,也可以参考其github:
#!/usr/bin/python #-*- coding:utf-8 -*- #__author__ == 'chenmingle' import websocket import time import threading import json import multiprocessing from threadpool import ThreadPool, makeRequests #修改成自己的websocket地址 WS_URL = "wss://ws.test.com/" #定义进程数 processes=5 #定义线程数(每个文件可能限制1024个,可以修改fs.file等参数) thread_num=1000 def on_message(ws, message): print(message) pass def on_error(ws, error): print(error) pass def on_close(ws): print("### closed ###") pass def on_open(ws): def send_trhead(): #设置你websocket的内容 send_info = {"cmd": "refresh", "data": {"room_id": "58", "wx_user_id": 56431}} #每隔10秒发送一下数据使链接不中断 while True: time.sleep(10) ws.send(json.dumps(send_info)) t = threading.Thread(target=send_trhead) t.start() def on_start(num): time.sleep(num%20) websocket.enableTrace(True) ws = websocket.WebSocketApp(WS_URL + str(num), on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever() def thread_web_socket(): #线程池 pool = ThreadPool(thread_num) num = list() #设置开启线程的数量 for ir in range(thread_num): num.append(ir) requests = makeRequests(on_start, num) [pool.putRequest(req) for req in requests] pool.wait() if __name__ == "__main__": #进程池 pool = multiprocessing.Pool(processes=processes) #设置开启进程的数量 for i in xrange(processes): pool.apply_async(thread_web_socket) pool.close() pool.join()
待续