轉載:https://www.cnblogs.com/huny/p/14079320.html
3.使用一個demo測試網站:https://www.websocket.org/echo.html 進行演示。
import json from websocket import create_connection url = 'wss://echo.websocket.org'#websocket連接地址,地址為虛擬地址 #websocket.enableTrace(True) #打開跟蹤,查看日志 while True: ws = create_connection(url) #創建連接 data =input('輸入傳輸消息:') #測試數據 # new_data=json.dumps(data,ensure_ascii=False) #將data轉化為字符串 ws.send(data) #發送請求 print('收到回復消息:',ws.recv()) #打印服務器響應數據 if data == 'q': ws.close() #關閉連接 break
4.實戰
公司開發的websocket接口只是單純的建立連接進行監聽,並不支持發送和接受數據。
import unittest from websocket import create_connection import websocket class Test_websoket(unittest.TestCase): def test_websk(self): '''websocket 消息推送: /socketServer/${userId} ''' url = 'ws://10.10.100.224:9997/socketServer/528' # websocket連接地址 websocket.enableTrace(True) # 打開跟蹤,查看日志 ws = create_connection(url) # 創建連接 print(ws.getstatus()) # 打印連接狀態 self.assertEqual(101, ws.getstatus(), 'websocket連接錯誤') # 斷言連接狀態 # ws.settimeout(10) #設置超時時間 # print(ws.gettimeout()) #獲取超時時間 # ws.shutdown() # 關閉連接 if __name__ == '__main__': unittest.main()
注意這里返回的狀態是101,表示協議切換的意思,於是可以用來斷言是否連接成功。