利用python對websocket進行並發壓測


簡述

產品經理鑒於運營反饋並對程序的websocket長連接保持懷疑的態度,讓我對websocket服務器進行壓力測試,我內心是拒絕的。

 

開發思路

查閱websocket的相關資料,查到python的websocket有兩個連接方式,長連接:WebSocketApp,短連接:create_connection。

使用 WebSocketApp 的話,我沒辦法獲取websocket服務器端返回的數據,這個我還在研究,這里使用 create_connection 來進行壓測。

連接:

1 ws = create_connection("websocket的服務器地址")

獲取連接狀態:

1 print("獲取連接狀態:", ws.connected)  # True為連接成功,False為連接失敗

發送消息至服務器並接收返回結果:

1 initialize_data = '{"code":2001, "msg": "","data": ""}'
2 ws.send(initialize_data)
3 result_initialize = ws.recv()        # 獲取返回結果
4 print("接收結果:", result_initialize)

因為create_connection為短連接,所以這一系列下來后連接會斷開,所以這個時候需要一個東西來維持連接,長連接叫心跳,那短連接這里我願成為:心臟起搏器(while True)

完整代碼:

 1 import threading
 2 from websocket import create_connection
 3 import time
 4 
 5 thread = []
 6 def socket():
 7     while True:
 8         ws = create_connection("websocket服務器地址")
 9         # 2、獲取連接狀態
10         print("獲取連接狀態:", ws.connected)
11         print("獲取服務器返回的連接結果", ws.recv())
12         if ws.connected == True:
13             initialize = '{"code":2001, "msg": "","data": {}}'
14             ws.send(initialize)
15             result_initialize = ws.recv()  # 獲取返回結果
16             print("接收結果:", result_initialize)
17 if __name__ == "__main__":
18     for i in range(25):
19         t = threading.Thread(target=socket, args=())
20         t.start()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM