python-socketio 多進程部署


實現原理

當使用多進程的時候。多個socketio服務器通過消息隊列來溝通之間的客戶端sid。若發現該sid在自己的連接中。就由該進程處理發送給其下面連接的客戶端

詳細的可以看這里

socket.io要實現多進程以及廣播,房間等功能,勢必需要接入一個redis之類的消息隊列,進而socket.io的emit會調用對應隊列管理器pubsub_manager的emit方法,比如用redis做消息隊列則最終調用 redis_manager中的_publish()

方法通過redis的訂閱發布功能將消息推送到flask_socketio頻道。另一方面,每個進程在初始化時都訂閱了 flask_socketio頻道,而且都有一個協程(或線程)在監聽頻道中是否有消息,一旦有消息,就會調用pubsub_manager._handle_emit()方法對本機對應的socket發送對應的消息,最終是通過socket.io服務器的_emit_internal()方法實現對本機中room為sid的所有socket發送消息的,如果room為None,則就是廣播,即對所有連接到本機的所有客戶端推送消息。

作者:ssjhust
鏈接:https://juejin.cn/post/6844903632819716110
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

服務端實現

設定client_manger即可
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
#!/usr/bin/env python
import asyncio

import uvicorn

import socketio
# 添加一行clientManger
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
app = socketio.ASGIApp(sio, static_files={
    '/': 'app.html',
})

@sio.on('my_event')
async def test_message(sid, message):
    await sio.emit('my_response', {'data': message['data']}, room=sid)

@sio.on('connect')
async def test_connect(sid, environ):
    print(sid)
    await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid)


@sio.on('disconnect')
def test_disconnect(sid):
    print('Client disconnected')


if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5000)

實驗效果

部署兩個app 分別監聽5000和50001 表示負載均衡后面的兩個服務器
然后用a客戶端連接5000。用b客戶端向a發消息。如果a接收成功證明服務器之間進行了通訊轉發

app1.py

#!/usr/bin/env python
import asyncio

import uvicorn

import socketio
# 添加一行clientManger
mgr = socketio.AsyncRedisManager('redis://127.0.0.1:6379/2')
sio = socketio.AsyncServer(async_mode='asgi',client_manager=mgr)
app = socketio.ASGIApp(sio, static_files={
    '/': 'app.html',
})
background_task_started = False


@sio.on('my_event')
async def test_message(sid, message):
    print(message)

@sio.on('disconnect request')
async def disconnect_request(sid):
    await sio.disconnect(sid)


@sio.on('connect')
async def test_connect(sid, environ):
    print(sid)
    await sio.emit('my_response', {'data': 'Connected', 'count': sid}, room=sid)


@sio.on('disconnect')
def test_disconnect(sid):
    print('Client disconnected')


if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5000)

app2.py
只需要修改端口號

if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5001)

接收消息的客戶端1 連接app1服務器

import socketio

sio = socketio.Client()

@sio.event
def connect():
    print('connection established')

@sio.on("my_event")
def my_message(data):
    print('message received with ', data)

@sio.event
def disconnect():
    print('disconnected from server')

sio.connect('http://localhost:5000')
sio.wait()

發送消息的客戶端 連接app2

import socketio
import json
sio = socketio.Client()

@sio.event
def connect():
    print('connection established')

@sio.on("my_event")
def my_message(data):
    print('message received with ', data)

@sio.event
def disconnect():
    print('disconnected from server')

sio.connect('http://localhost:5001')
data = json.dumps({"message":"拜托了,另一個我","sid":"d84b2e8cbcde479f813627b6ab8e4a47"},ensure_ascii=False)
sio.emit("my_event",data)
sio.wait()

注意發送者需要提供sid 才能誇進程發送成功


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM