實現原理
當使用多進程的時候。多個socketio服務器通過消息隊列來溝通之間的客戶端sid。若發現該sid在自己的連接中。就由該進程處理發送給其下面連接的客戶端
詳細的可以看這里
socket.io要實現多進程以及廣播,房間等功能,勢必需要接入一個redis之類的消息隊列,進而socket.io的emit會調用對應隊列管理器pubsub_manager的emit方法,比如用redis做消息隊列則最終調用 redis_manager中的_publish()
方法通過redis的訂閱發布功能將消息推送到flask_socketio頻道。另一方面,每個進程在初始化時都訂閱了 flask_socketio頻道,而且都有一個協程(或線程)在監聽頻道中是否有消息,一旦有消息,就會調用pubsub_manager._handle_emit()方法對本機對應的socket發送對應的消息,最終是通過socket.io服務器的_emit_internal()方法實現對本機中room為sid的所有socket發送消息的,如果room為None,則就是廣播,即對所有連接到本機的所有客戶端推送消息。
作者:ssjhust
鏈接:https://juejin.cn/post/6844903632819716110
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
服務端實現
設定client_manger即可
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
#!/usr/bin/env python
import asyncio
import uvicorn
import socketio
# 添加一行clientManger
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
app = socketio.ASGIApp(sio, static_files={
'/': 'app.html',
})
@sio.on('my_event')
async def test_message(sid, message):
await sio.emit('my_response', {'data': message['data']}, room=sid)
@sio.on('connect')
async def test_connect(sid, environ):
print(sid)
await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid)
@sio.on('disconnect')
def test_disconnect(sid):
print('Client disconnected')
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5000)
實驗效果
部署兩個app 分別監聽5000和50001 表示負載均衡后面的兩個服務器
然后用a客戶端連接5000。用b客戶端向a發消息。如果a接收成功證明服務器之間進行了通訊轉發
app1.py
#!/usr/bin/env python
import asyncio
import uvicorn
import socketio
# 添加一行clientManger
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
app = socketio.ASGIApp(sio, static_files={
'/': 'app.html',
})
background_task_started = False
@sio.on('my_event')
async def test_message(sid, message):
print(message)
@sio.on('disconnect request')
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.on('connect')
async def test_connect(sid, environ):
print(sid)
await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid)
@sio.on('disconnect')
def test_disconnect(sid):
print('Client disconnected')
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5000)
app2.py
只需要修改端口號
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5001)
接收消息的客戶端1 連接app1服務器
import socketio
sio = socketio.Client()
@sio.event
def connect():
print('connection established')
@sio.on("my_event")
def my_message(data):
print('message received with ', data)
@sio.event
def disconnect():
print('disconnected from server')
sio.connect('http://localhost:5000')
sio.wait()
發送消息的客戶端 連接app2
import socketio
import json
sio = socketio.Client()
@sio.event
def connect():
print('connection established')
@sio.on("my_event")
def my_message(data):
print('message received with ', data)
@sio.event
def disconnect():
print('disconnected from server')
sio.connect('http://localhost:5001')
data = json.dumps({"message":"拜托了,另一個我","sid":"d84b2e8cbcde479f813627b6ab8e4a47"},ensure_ascii=False)
sio.emit("my_event",data)
sio.wait()
注意發送者需要提供sid 才能誇進程發送成功