簡述
產品經理鑒於運營反饋並對程序的websocket長連接保持懷疑的態度,讓我對websocket服務器進行壓力測試,我內心是拒絕的。
開發思路
查閱websocket的相關資料,查到python的websocket有兩個連接方式,長連接:WebSocketApp,短連接:create_connection。
使用 WebSocketApp 的話,我沒辦法獲取websocket服務器端返回的數據,這個我還在研究,這里使用 create_connection 來進行壓測。
連接:
1 ws = create_connection("websocket的服務器地址")
獲取連接狀態:
1 print("獲取連接狀態:", ws.connected) # True為連接成功,False為連接失敗
發送消息至服務器並接收返回結果:
1 initialize_data = '{"code":2001, "msg": "","data": ""}' 2 ws.send(initialize_data) 3 result_initialize = ws.recv() # 獲取返回結果 4 print("接收結果:", result_initialize)
因為create_connection為短連接,所以這一系列下來后連接會斷開,所以這個時候需要一個東西來維持連接,長連接叫心跳,那短連接這里我願成為:心臟起搏器(while True)
完整代碼:
1 import threading 2 from websocket import create_connection 3 import time 4 5 thread = [] 6 def socket(): 7 while True: 8 ws = create_connection("websocket服務器地址") 9 # 2、獲取連接狀態 10 print("獲取連接狀態:", ws.connected) 11 print("獲取服務器返回的連接結果", ws.recv()) 12 if ws.connected == True: 13 initialize = '{"code":2001, "msg": "","data": {}}' 14 ws.send(initialize) 15 result_initialize = ws.recv() # 獲取返回結果 16 print("接收結果:", result_initialize) 17 if __name__ == "__main__": 18 for i in range(25): 19 t = threading.Thread(target=socket, args=()) 20 t.start()