Websocket的長連接測試


import websocket
try:
    import thread
except ImportError:
    import _thread as thread
import time

# 服務器推送消息時,message可以接收,然后進行下一步操作
def on_message(ws, message):
    print(message)

# 程序報錯時,會觸發on_error事件
def on_error(ws, error):
    print(error)

# 主動斷開連接時,會調用該函數
def on_close(ws):
    print("### closed ###")

# 連接建立后,會調用on_open函數
def on_open(ws):
    def run(*args):
        for i in range(3):
            time.sleep(1)
            ws.send("Hello %d" % i)
        time.sleep(1)
        ws.close()
        print("thread terminating...")
    thread.start_new_thread(run, ())


if __name__ == "__main__":
    #需要安裝websocket-client庫,不然這里會出錯
    websocket.enableTrace(True)
   #建立連接 ws
= websocket.WebSocketApp("wss://echo.websocket.org/", on_message = on_message, on_error = on_error, on_close = on_close) ws.on_open = on_open ws.run_forever()

該代碼為測試代碼,可以根據自己項目的需求進行對應的修改

總結:

1.需要安裝websocket-client庫

2.如果是wss,跳過ssl的驗證方法

ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

3.最好不要用強制殺線程的方式斷開連接,這樣有可能導致資源釋放不完全,連接不能正常斷開

可以通過輸入特定的字符,或者檢測目錄下是否有特定文件,比如exit.txt等方式主動斷開連接

也可以通過Timer設置固定多少時間后,主動斷開連接

4.可以通過多線程的方式,啟動多個線程,來測試多少個長連接可以同時在線。服務器廣播時,能否正常接收消息

5.單機連接數量有限,可以通過多台測試機一起跑,來達到目標連接數

6.ws.run_forever(ping_interval=60,ping_timeout=5) 

ping_interval:自動發送“ping”命令,每個指定的時間(秒),如果設置為0,則不會自動發送。

ping_timeout:如果沒有收到pong消息,則為超時(秒)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM