import websocket try: import thread except ImportError: import _thread as thread import time # 服務器推送消息時,message可以接收,然后進行下一步操作 def on_message(ws, message): print(message) # 程序報錯時,會觸發on_error事件 def on_error(ws, error): print(error) # 主動斷開連接時,會調用該函數 def on_close(ws): print("### closed ###") # 連接建立后,會調用on_open函數 def on_open(ws): def run(*args): for i in range(3): time.sleep(1) ws.send("Hello %d" % i) time.sleep(1) ws.close() print("thread terminating...") thread.start_new_thread(run, ()) if __name__ == "__main__": #需要安裝websocket-client庫,不然這里會出錯 websocket.enableTrace(True)
#建立連接 ws = websocket.WebSocketApp("wss://echo.websocket.org/", on_message = on_message, on_error = on_error, on_close = on_close) ws.on_open = on_open ws.run_forever()
該代碼為測試代碼,可以根據自己項目的需求進行對應的修改
總結:
1.需要安裝websocket-client庫
2.如果是wss,跳過ssl的驗證方法
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
3.最好不要用強制殺線程的方式斷開連接,這樣有可能導致資源釋放不完全,連接不能正常斷開
可以通過輸入特定的字符,或者檢測目錄下是否有特定文件,比如exit.txt等方式主動斷開連接
也可以通過Timer設置固定多少時間后,主動斷開連接
4.可以通過多線程的方式,啟動多個線程,來測試多少個長連接可以同時在線。服務器廣播時,能否正常接收消息
5.單機連接數量有限,可以通過多台測試機一起跑,來達到目標連接數
6.ws.run_forever(ping_interval=60,ping_timeout=5)
ping_interval:自動發送“ping”命令,每個指定的時間(秒),如果設置為0,則不會自動發送。
ping_timeout:如果沒有收到pong消息,則為超時(秒)。