在某些時候,需要建立websocket路由,來建立長鏈接,來實時傳輸數據,就比如一些聊天應用,就有實時音視頻,需要實時傳出狀態
在sanic框架中支持兩種websocket路由方式,有一種是再app中建立,另一種實在藍圖中,但是沒有提供類的路由寫法,都是通過函數的寫法
1.app
from sanic import Sanic app = Sanic(__name__) @app.websocket('/feed') async def feed(request, ws): try: while True: data = 'hello!' print('Sending: ' + data) await ws.send(data) data = await ws.recv() print('Received: ' + data) except Exception as e: print(e) #app.add_websocket_route(feed, "/feed") if __name__ == '__main__': app.run(host='0.0.0.0', port=8009)
2.blueprint
接口:ws://ip號:8001/api/student/v2/feed

StudentVideoCallBP = Blueprint(__name__, url_prefix='/api/student/v2/') @StudentVideoCallBP.websocket('/feed') async def feed(ws): try: while True: data = 'hello!' print('Sending: ' + data) await ws.send(data) data = await ws.recv() print('Received: ' + data) except Exception as e: print(e) # StudentVideoCallBP.add_websocket_route(feed, "/feed")
3.略微復雜的一個小demo,寫的是一個客戶端和服務端實時通信,接口ws://10.130.145.200:8001/api/student/v2/wxlink/5ad85e49705deb4d3b98c111

async def wxLink(request, ws, studentMid): student = await Student.find_one(studentMid, as_raw=True) #循環監聽是否有客戶端鏈接, while True: #try客戶端鏈接狀態 try: #try判斷是數據是否有這個學生id try: if not student: #告訴客戶端id錯誤,可能是個非法鏈接,然后主動斷開非法鏈接 await ws.send(osjson.dumps( {"studentMid": "", "videoRoomId": "", 'userSig': "", 'privateMapKey': "", 'sdkAppId': ""})) break Rucode = student['Rucode'] key = studentMid + '_' + Rucode videoStatus = await ws.recv() if videoStatus == '0' and str(ws.state) == 'State.OPEN': data = await VideoData.get(key) if data: value = osjson.loads(data) roomId = value['videoRoomId'] privateMapKey = value['privateMapKey'] sdkAppId = str(value['sdkAppId']) userSig = str(value['user_userSig']) status = value['status'] IsAvailable = value['IsAvailable'] if status == 0 and str(ws.state) == 'State.OPEN' and IsAvailable == 1: await ws.send(osjson.dumps({"studentMid": "ok", 'videoRoomId': roomId, 'userSig': userSig, 'privateMapKey': privateMapKey, 'sdkAppId': sdkAppId})) elif status == 0 and str(ws.state) == 'State.OPEN' and IsAvailable == 2: await ws.send(osjson.dumps( {"studentMid": "ok", 'videoRoomId': "", 'userSig': "", 'privateMapKey': "", 'sdkAppId': ""})) # 確認接收到roomId elif videoStatus == '1': data = await VideoData.get(key) if data: value = osjson.loads(data) value['status'] = 1 doc = osjson.dumps(value) await VideoData.set(key, doc) # 接通成功 elif videoStatus == '2': value = await VideoData.get(key) if value: await VideoData.delete(key) else: pass # 拒絕接通 elif videoStatus == '3': req = await VideoData.get(key) if req: classRoomUid = osjson.loads(req)['classRoomUid'] await VideoData.delete(key) # 推送消息 push_title = "I40001" message = '對方已拒絕' try: await pushmsg(student['Rucode'], classRoomUid, push_title, message) except Exception as e: print(e) elif videoStatus == '3': req = await VideoData.get(key) if req: await VideoData.delete(key) else: pass except TypeError: ws.close() #監聽如果客戶端斷開鏈接,則服務端退出循環,斷開鏈接 except ConnectionClosed: break
4.一般websocket需要加驗證,否則各種非法連接進來可咋整,加個token裝飾器就可以了
5.提供兩個在線測試工具
http://www.blue-zero.com/WebSocket/
http://coolaf.com/tool/chattest
注釋:
ws.state 查看ws鏈接狀態
ws.close 關閉鏈接