flask 利用socketIO 實現在線消息推送


需求:

在Flask應用中,如果用戶A關注用戶B后需要向用戶B推送消息,通過消息隊列告知IM服務為用戶進行推送

下面通過一張業務場景圖給大家來說明:

 

 

 

所以,要實現此過程,必須經過兩個步驟:

1:flask服務向消息隊列中添加消息

2:socketIO 獲取消息,推送消息,而在這一步我們只需要給他指定消息隊列即可,然后將用戶添加到名為用戶id的room房間中,方便按照user_id進行推送。

在Socket.IO 框架中可以選擇使用以下兩種方式作為消息中間件:

  • 使用Redis

      mgr = socketio.RedisManager('redis://')
      sio = socketio.Server(client_manager=mgr)
  • 使用RabbitMQ

      pip install kombu
      mgr = socketio.KombuManager('amqp://')
      sio = socketio.Server(client_manager=mgr)

     

實現

因為要給指定的用戶推送消息,所以需要用到用戶的身份,用戶在客戶端攜帶JWT連接SocketIO服務器,我們在服務器端對jwt token進行驗證,對於驗證出用戶身份(user_id)的客戶端,將其添加到名為用戶id的room房間中,方便按照user_id進行推送。

socketio服務端編寫(接上一篇博文繼續編寫https://www.cnblogs.com/Live-up-to-your-youth/p/15058865.html)

在/im/main.py中補充添加搜尋包的路徑,方便使用utils中的jwt_utils模塊(項目中使用)

import eventlet
eventlet.monkey_patch()

import eventlet.wsgi
import sys
import os

# 補充搜索包路徑
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR, 'common'))

if len(sys.argv) < 2:
    print('Usage: python main.py [port]')
    exit(1)

port = int(sys.argv[1])

from server import app
import chat
import notify

SERVER_ADDRESS = ('', port)
sock = eventlet.listen(SERVER_ADDRESS)
eventlet.wsgi.server(sock, app)

 

在im/server.py文件中補充消息隊列rabbitmq的配置信息和jwt使用的秘鑰

import socketio

RABBITMQ = 'amqp://python:rabbitmqpwd@localhost:5672/ao'
JWT_SECRET = 'TPmi4aLWRbyVq8zu9v82dWYW17/z+UvRnYTt4P6fAXA'

mgr = socketio.KombuManager(RABBITMQ)

sio = socketio.Server(async_mode='eventlet', client_manager=mgr)
app = socketio.Middleware(sio)

在/im目錄中新建notify.py

from server import sio, JWT_SECRET
from werkzeug.wrappers import Request
from utils.jwt_util import verify_jwt

def check_jwt_token(environ):
    """
    檢驗jwt token
    :param environ:
    :return:
    """
    request = Request(environ)
    token = request.args.get('token')
    if token:
        payload = verify_jwt(token, JWT_SECRET)
        if payload:
            user_id = payload.get('user_id')
            return user_id
    return None

@sio.on('connect')
def on_connect(sid, environ):
    """
    與客戶端建立連接后執行
    """
    # 檢驗連接客戶端的jwt token
    user_id = check_jwt_token(environ)
    print('user_id={}'.format(user_id))
    # 若檢驗出user_id,將此客戶端添加到user_id的room中
    if user_id:
        sio.enter_room(sid, str(user_id))

@sio.on('disconnect')
def on_disconnect(sid):
    """
    與客戶端斷開連接時執行
    """
    # 客戶端離線時將客戶端從所有房間中移除
    rooms = sio.rooms(sid)
    for room in rooms:
        sio.leave_room(sid, room)

flask web服務端編寫

在創建app的__init__.py中 添加sio對象的創建

import socketio

def create_app(config, enable_config_file=False):
    """
    創建應用
    :param config: 配置信息對象
    :param enable_config_file: 是否允許運行環境中的配置文件覆蓋已加載的配置信息
    :return: 應用
    """
    ...

    # socket.io
    app.sio = socketio.KombuManager(app.config['RABBITMQ'], write_only=True)

在 用戶關注接口視圖中添加發送事件消息

class 類名(Resource):
    """
    關注用戶
    """
    method_decorators = {
        'post': [login_required],
        'get': [login_required],
    }

    def post(self):
        """
        關注用戶
        """
        # 關注用戶的數據庫保存
        ...

        # 發送關注通知

        current_app.sio.emit('following notify', data=_data, room=str(target))

        return {'target': target}, 201

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM