需求:
在Flask應用中,如果用戶A關注用戶B后需要向用戶B推送消息,通過消息隊列告知IM服務為用戶進行推送
下面通過一張業務場景圖給大家來說明:
所以,要實現此過程,必須經過兩個步驟:
1:flask服務向消息隊列中添加消息
2:socketIO 獲取消息,推送消息,而在這一步我們只需要給他指定消息隊列即可,然后將用戶添加到名為用戶id的room房間中,方便按照user_id進行推送。
在Socket.IO 框架中可以選擇使用以下兩種方式作為消息中間件:
-
使用Redis
mgr = socketio.RedisManager('redis://') sio = socketio.Server(client_manager=mgr)
-
使用RabbitMQ
pip install kombu mgr = socketio.KombuManager('amqp://') sio = socketio.Server(client_manager=mgr)
實現
因為要給指定的用戶推送消息,所以需要用到用戶的身份,用戶在客戶端攜帶JWT連接SocketIO服務器,我們在服務器端對jwt token進行驗證,對於驗證出用戶身份(user_id)的客戶端,將其添加到名為用戶id的room房間中,方便按照user_id進行推送。
socketio服務端編寫(接上一篇博文繼續編寫https://www.cnblogs.com/Live-up-to-your-youth/p/15058865.html)
在/im/main.py中補充添加搜尋包的路徑,方便使用utils中的jwt_utils模塊(項目中使用)
import eventlet eventlet.monkey_patch() import eventlet.wsgi import sys import os # 補充搜索包路徑 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(BASE_DIR, 'common')) if len(sys.argv) < 2: print('Usage: python main.py [port]') exit(1) port = int(sys.argv[1]) from server import app import chat import notify SERVER_ADDRESS = ('', port) sock = eventlet.listen(SERVER_ADDRESS) eventlet.wsgi.server(sock, app)
在im/server.py文件中補充消息隊列rabbitmq的配置信息和jwt使用的秘鑰
import socketio
RABBITMQ = 'amqp://python:rabbitmqpwd@localhost:5672/ao'
JWT_SECRET = 'TPmi4aLWRbyVq8zu9v82dWYW17/z+UvRnYTt4P6fAXA'
mgr = socketio.KombuManager(RABBITMQ)
sio = socketio.Server(async_mode='eventlet', client_manager=mgr)
app = socketio.Middleware(sio)
在/im目錄中新建notify.py
from server import sio, JWT_SECRET from werkzeug.wrappers import Request from utils.jwt_util import verify_jwt def check_jwt_token(environ): """ 檢驗jwt token :param environ: :return: """ request = Request(environ) token = request.args.get('token') if token: payload = verify_jwt(token, JWT_SECRET) if payload: user_id = payload.get('user_id') return user_id return None @sio.on('connect') def on_connect(sid, environ): """ 與客戶端建立連接后執行 """ # 檢驗連接客戶端的jwt token user_id = check_jwt_token(environ) print('user_id={}'.format(user_id)) # 若檢驗出user_id,將此客戶端添加到user_id的room中 if user_id: sio.enter_room(sid, str(user_id)) @sio.on('disconnect') def on_disconnect(sid): """ 與客戶端斷開連接時執行 """ # 客戶端離線時將客戶端從所有房間中移除 rooms = sio.rooms(sid) for room in rooms: sio.leave_room(sid, room)
flask web服務端編寫
在創建app的__init__.py
中 添加sio對象的創建
import socketio def create_app(config, enable_config_file=False): """ 創建應用 :param config: 配置信息對象 :param enable_config_file: 是否允許運行環境中的配置文件覆蓋已加載的配置信息 :return: 應用 """ ... # socket.io app.sio = socketio.KombuManager(app.config['RABBITMQ'], write_only=True)
在 用戶關注接口視圖中添加發送事件消息
class 類名(Resource): """ 關注用戶 """ method_decorators = { 'post': [login_required], 'get': [login_required], } def post(self): """ 關注用戶 """ # 關注用戶的數據庫保存 ... # 發送關注通知 current_app.sio.emit('following notify', data=_data, room=str(target)) return {'target': target}, 201