flask-socketio 實現


Flask-SocketIO使Flask應用程序可以訪問客戶端和服務器之間的低延遲雙向通信。 客戶端應用程序可以使用Javascript,C ++,Java和Swift中的任何SocketIO官方客戶端庫,或任何兼容的客戶端來建立與服務器的永久連接。

1,安裝

pip install flask-socketio

2,依賴
Flask-SocketIO兼容Python 2.7和Python 3.3+。這個軟件包所依賴的異步服務可以從以下三種選擇中選擇:

eventlet是最好的高性能選項,支持長輪詢和WebSocket傳輸。
gevent支持多種不同的配置。 long-polling傳輸完全由gevent包支持,但與eventlet不同,gevent沒有原生的WebSocket支持。為了添加對WebSocket的支持,目前有兩種選擇。安裝gevent-websocket軟件包會將WebSocket支持添加到gevent中,或者可以使用隨WebSocket功能一起提供的uWSGI Web服務器。 gevent的使用也是一個性能選項,但比eventlet略低。
也可以使用基於Werkzeug的Flask開發服務器,但缺少其他兩個選項的性能,因此只能用於簡化開發流程。該選項僅支持長輪詢傳輸。
擴展程序根據安裝的內容自動檢測使用哪個異步框架。優先考慮eventlet,接着是gevent。對於gevent中的WebSocket支持,首選uWSGI,然后是gevent-websocket。如果既沒有安裝eventlet也沒有安裝gevent,則使用Flask開發服務器。

如果使用多個進程,則進程使用消息隊列服務來協調諸如廣播的操作。受支持的隊列是Redis,RabbitMQ和Kombu軟件包支持的任何其他消息隊列。

在客戶端,官方的Socket.IO Javascript客戶端庫可以用來建立到服務器的連接。還有用Swift,Java和C ++編寫的官方客戶端。只要他們實現Socket.IO協議,非官方客戶端也可以工作。

3,初始化
簡單的代碼例子:

from flask import Flask, render_template
from flask_socketio import SocketIO

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)

if __name__ == '__main__':
socketio.run(app)

初始化的init_app()樣式也被支持。 請注意Web服務器的啟動方式。 socketio.run()函數封裝了Web服務器的啟動,代替了app.run()標准的Flask開發服務器啟動。 當應用程序處於調試模式時,Werkzeug開發服務器仍在socketio.run()中使用和正確配置。 在生產模式下首選使用eventlet Web服務器,否則使用gevent Web服務器。 如果沒有安裝eventlet和gevent,則使用Werkzeug開發Web服務器。

還支持基於Flask 0.11中引入的點擊命令行界面。 該擴展提供了適用於啟動Socket.IO服務器的flask run命令的新版本。 用法示例:

$ FLASK_APP=my_app.py flask run

應用程序必須向加載Socket.IO庫的客戶端提供一個頁面,並建立一個連接:

<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.6/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
socket.emit('my event', {data: 'I\'m connected!'});
});
</script>

4,接收信息:
1)當使用SocketIO時,消息被雙方作為事件接收。 在客戶端使用Javascript回調。 使用Flask-SocketIO,服務器需要為這些事件注冊處理程序,類似於視圖函數處理路由的方式。

以下示例為未命名的事件創建一個服務器端事件處理程序:

@socketio.on('message')
def handle_message(message):
     print('received message: ' + message)

2)上面的例子使用字符串消息。 另一種未命名事件使用JSON數據:

@socketio.on('json')
def handle_json(json):
      print('received json: ' + str(json))

3)最靈活的事件類型使用自定義事件名稱。 這些事件的消息數據可以是字符串,字節,整數或JSON:

@socketio.on('my event')
def handle_my_custom_event(json):
      print('received json: ' + str(json))

4)自定義命名事件也可以支持多個參數:

@socketio.on('my event')
def handle_my_custom_event(arg1, arg2, arg3):
     print('received args: ' + arg1 + arg2 + arg3)

5)命名事件是最靈活的,因為它們不需要包含額外的元數據來描述消息類型。

Flask-SocketIO還支持SocketIO命名空間,它允許客戶端在同一個物理套接字上復用幾個獨立的連接:

@socketio.on('my event', namespace='/test')
def handle_my_custom_namespace_event(json):
      print('received json: ' + str(json))

6)當沒有指定名稱空間時,將使用名稱為“/”的默認全局名稱空間。

對於裝飾器語法不方便的情況,可以使用on_event方法:

def my_function_handler(data):
      pass

socketio.on_event('my event', my_function_handler, namespace='/test')

7)客戶可以要求確認回復,確認收到他們發送的消息。 從處理函數返回的任何值將作為回調函數中的參數傳遞給客戶端:

@socketio.on('my event')
def handle_my_custom_event(json):
     print('received json: ' + str(json))
     return 'one', 2

在上面的例子中,客戶端回調函數將有兩個參數“one”和2返回。如果一個處理函數沒有返回任何值,客戶端回調函數將被調用而不帶參數。

5,發送信息
如上一節中所示定義的SocketIO事件處理程序可以使用send()和emit()函數向連接的客戶端發送回復消息。

以下示例將收到的事件反饋回發送給它們的客戶端:

from flask_socketio import send, emit

@socketio.on('message')
def handle_message(message):
      send(message)

@socketio.on('json')
def handle_json(json):
      send(json, json=True)

@socketio.on('my event')
def handle_my_custom_event(json):
      emit('my response', json)

請注意send()和emit()分別用於未命名事件和已命名事件。

使用名稱空間時,默認情況下,send()和emit()使用傳入消息的名稱空間。 可以使用可選的命名空間參數來指定不同的命名空間:

@socketio.on('message')
def handle_message(message):
     send(message, namespace='/chat')

@socketio.on('my event')
def handle_my_custom_event(json):
      emit('my response', json, namespace='/chat')

要發送具有多個參數的事件,請發送一個元組:

@socketio.on('my event')
def handle_my_custom_event(json):
      emit('my response', ('foo', 'bar', json), namespace='/chat')

SocketIO支持確認消息被客戶端接收的確認回調:

def ack():
      print 'message was received!'

@socketio.on('my event')
def handle_my_custom_event(json):
      emit('my response', json, callback=ack)

當使用回調函數時,Javascript客戶端接收到一個回調函數來接收消息。 客戶端應用程序調用回調函數后,調用相應的服務器端回調。 如果用參數調用客戶端回調,則這些回調也作為參數提供給服務器端回調。

6,廣播
SocketIO的另一個非常有用的功能是消息的廣播。 Flask-SocketIO支持使用broadcast = True和optional(可選參數)來send()和emit():

@socketio.on('my event')
def handle_my_custom_event(data):
     emit('my response', data, broadcast=True)

在啟用廣播選項的情況下發送消息時,連接到命名空間的所有客戶端都會收到它,包括發件人。 當不使用名稱空間時,連接到全局名稱空間的客戶端將收到該消息。 請注意,廣播消息不會調用回調。

在所有示例中,直到這一點,服務器都響應客戶端發送的事件。 但對於某些應用程序,服務器需要成為消息的發起者。 將通知發送到服務器中發生的事件的客戶端可能會很有用,例如在后台線程中。 socketio.send()和socketio.emit()方法可用於向所有連接的客戶端廣播:

def some_function():
      socketio.emit('some event', {'data': 42})

請注意,socketio.send()和socketio.emit()與上下文感知的send()和emit()不同。 還要注意,在上面的用法中沒有客戶端上下文,所以假定broadcast = True,不需要指定。

8,聊天室
對於許多應用程序來說,有必要將用戶分成可以一起處理的子集。 最好的例子是有多個房間的聊天應用程序,用戶從房間或房間接收消息,而不是從其他房間的其他房間接收消息。 Flask-SocketIO通過join_room()和leave_room()函數來支持這個房間的概念:

from flask_socketio import join_room, leave_room

@socketio.on('join')
def on_join(data):
    username = data['username']
    room = data['room']
    join_room(room)
    send(username + ' has entered the room.', room=room)

@socketio.on('leave')
def on_leave(data):
    username = data['username']
    room = data['room']
    leave_room(room)
    send(username + ' has left the room.', room=room)

send()和emit()函數接受一個可選的房間參數,使得消息被發送到給定房間中的所有客戶端。

所有的客戶端在連接時都被分配一個空間,用連接的會話ID命名,可以從request.sid中獲得。 一個給定的客戶可以加入任何房間,可以給任何名字。 當一個客戶端斷開連接時,它將從它所在的所有房間中移除。上下文無關的socketio.send()和socketio.emit()函數也接受一個房間參數來廣播給房間中的所有客戶端。

由於所有的客戶端都被分配了一個個人房間,所以為了向一個客戶端發送消息,客戶端的會話ID可以被用作房間參數。


9,連接事件
Flask-SocketIO也調度連接和斷開事件。 以下示例顯示如何為其注冊處理程序:

@socketio.on('connect', namespace='/chat')
def test_connect():
    emit('my response', {'data': 'Connected'})

@socketio.on('disconnect', namespace='/chat')
def test_disconnect():
    print('Client disconnected')

連接事件處理程序可以選擇返回False來拒絕連接。 這樣就可以在這個時候驗證客戶端。

請注意,連接和斷開連接事件在每個使用的名稱空間上單獨發送。

10,基於類的命名空間
作為上述基於裝飾器的事件處理程序的替代方法,屬於名稱空間的事件處理程序可以把類的方法名映射到命名空間。 flask_socketio.Namespace作為基類提供,以創建基於類的命名空間:

from flask_socketio import Namespace, emit

class MyCustomNamespace(Namespace):
    def on_connect(self):
        pass

    def on_disconnect(self):
        pass

    def on_my_event(self, data):
        emit('my_response', data)

socketio.on_namespace(MyCustomNamespace('/test'))

當使用基於類的命名空間時,服務器收到的任何事件都會被分配到一個名為帶有on_前綴的事件名稱的方法。 例如,事件my_event將由名為on_my_event的方法處理。 如果收到一個沒有在命名空間類中定義的相應方法的事件,則該事件被忽略。 在基於類的命名空間中使用的所有事件名稱必須使用方法名稱中合法的字符。

為了方便在基於類的命名空間中定義的方法,命名空間實例包含了flask_socketio.SocketIO類中的幾個方法的版本,當沒有給出命名空間參數時,默認為適當的命名空間。

如果事件在基於類的名稱空間中有一個處理程序,並且還有基於裝飾器的函數處理程序,則只調用裝飾的函數處理程序。

11,處理錯誤
處理錯誤的例子:

@socketio.on_error()        # Handles the default namespace
def error_handler(e):
    pass

@socketio.on_error('/chat') # handles the '/chat' namespace
def error_handler_chat(e):
    pass

@socketio.on_error_default  # handles all namespaces without an explicit error handler
def default_error_handler(e):
    pass

錯誤處理函數將異常對象作為參數。

當前請求的消息和數據參數也可以使用request.event變量進行檢查,這對於事件處理程序之外的錯誤日志記錄和調試很有用:

from flask import request

@socketio.on("my error event")
def on_my_event(data):
    raise RuntimeError()

@socketio.on_error_default
def default_error_handler(e):
    print(request.event["message"]) # "my error event"
    print(request.event["args"])    # (data,)

12,訪問flask全局內容
SocketIO事件的處理程序與路由處理程序的處理程序不同,這引起了有關在一個SocketIO處理程序中可以和不可以完成什么的困惑。主要區別在於,為客戶端生成的所有SocketIO事件都發生在單個長時間運行請求的上下文中。

盡管有所不同,但Flask-SocketIO試圖通過使環境類似於普通的HTTP請求,來使得SocketIO事件處理程序更容易工作。以下列表描述了什么可行,什么不可行:

在調用事件處理程序之前推送應用程序上下文,使得處理程序可以使用current_app和g。
在調用處理程序之前,還會推送請求上下文,同時也使請求和會話可用。但是請注意,WebSocket事件沒有與它們關聯的單個請求,所以啟動連接的請求上下文被推送到連接生命期間分派的所有事件。
全局請求上下文使用sid成員進行了增強,該成員被設置為連接的唯一會話ID。此值用作添加客戶端的初始空間。
通過包含當前處理的名稱空間和事件參數的名稱空間和事件成員來增強請求上下文全局。事件成員是一個帶有消息和參數鍵的字典。
會話上下文全局的行為方式與常規請求不同。在建立SocketIO連接時用戶會話的副本可用於該連接上下文中調用的處理程序。如果一個SocketIO處理程序修改會話,修改的會話將被保留以供將來的SocketIO處理程序使用,但常規的HTTP路由處理程序將不會看到這些更改。實際上,當SocketIO處理程序修改會話時,將為這些處理程序專門創建會話的“分支”。這種限制的技術原因是為了保存用戶會話,需要將cookie發送到客戶端,並且需要HTTP請求和響應,這在SocketIO連接中不存在。在使用服務器端會話(如Flask-Session或Flask-KVSession擴展提供的會話)時,只要會話在SocketIO處理程序中未修改,則可以通過SocketIO處理程序查看HTTP路由處理程序中對會話所做的更改。
對於SocketIO事件處理程序,不會調用before_request和after_request掛鈎。
SocketIO處理程序可以使用自定義裝飾器,但是大多數Flask裝飾器將不適合用於SocketIO處理程序,因為在SocketIO連接期間沒有關於Response對象的概念。
13 授權
應用程序的共同需求是驗證用戶的身份。 基於Web表單和HTTP請求的傳統機制不能用於SocketIO連接,因為沒有地方發送HTTP請求和響應。 如果需要,應用程序可以實現一個定制的登錄表單,當用戶按下提交按鈕時,該表單將證書作為SocketIO消息發送到服務器。

但是,在大多數情況下,在SocketIO連接建立之前執行傳統的認證過程會更方便。 然后可以將用戶的標識記錄在用戶會話中或cookie中,稍后在建立SocketIO連接時,可以通過SocketIO事件處理程序訪問該信息。

14 ,用flask-login 作為例子講解flask-socketIO
Flask-SocketIO可以訪問由Flask-Login維護的登錄信息。 在執行常規Flask-Login身份驗證並調用login_user()函數以在用戶會話中記錄用戶之后,任何SocketIO連接都將有權訪問current_user上下文變量:

@socketio.on('connect')
def connect_handler():
    if current_user.is_authenticated:
        emit('my response',
             {'message': '{0} has joined'.format(current_user.name)},
             broadcast=True)
    else:
        return False  # not allowed here

請注意,login_required裝飾器不能與SocketIO事件處理程序一起使用,但可以創建斷開未經認證用戶的自定義裝飾器,如下所示:

import functools
from flask import request
from flask_login import current_user
from flask_socketio import disconnect

def authenticated_only(f):
    @functools.wraps(f)
    def wrapped(*args, **kwargs):
        if not current_user.is_authenticated:
            disconnect()
        else:
            return f(*args, **kwargs)
    return wrapped

@socketio.on('my event')
@authenticated_only
def handle_my_custom_event(data):
    emit('my response', {'message': '{0} has joined'.format(current_user.name)},
         broadcast=True)

15 發布
部署Flask-SocketIO服務器有多種選擇,從簡單到復雜。 在本節中,將介紹最常用的選項。
1)嵌入式服務
最簡單的部署策略是安裝eventlet或gevent,並通過調用socketio.run(app)來啟動Web服務器,如上例所示。 這將在eventlet或gevent web服務器上運行應用程序,以安裝的為准。

請注意,當安裝eventlet或gevent時,socketio.run(app)會運行一個生產准備服務器。 如果這兩個都沒有安裝,那么應用程序將在Flask的開發Web服務器上運行,這不適合於生產使用。

不幸的是,在uWSGI中使用gevent時,這個選項是不可用的。 有關此選項的信息,請參閱下面的uWSGI部分。
2)gunicorn web 服務
socketio.run(app)的替代方法是使用gunicorn作為web服務器,使用eventlet或gevent工作。 對於這個選項,除了gunicorn之外,還需要安裝eventlet或gevent。 通過gunicorn啟動eventlet服務器的命令行是:
gunicorn --worker-class eventlet -w 1 module:app
如果您更喜歡使用gevent,則啟動服務器的命令是:

gunicorn -k gevent -w 1 module:app

當gevent worker使用gunicorn和gevent-websocket提供的WebSocket支持時,必須更改啟動服務器的命令以選擇支持WebSocket協議的自定義gevent Web服務器。 修改后的命令是:

```gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -w 1 module:app```
在所有這些命令中,module是定義應用程序實例的Python模塊或包,app是應用程序實例本身。

Gunicorn版本18.0是Flask-SocketIO推薦的版本。 已知19.x版本在包含WebSocket的某些部署方案中具有不兼容性。

由於gunicorn使用有限的負載均衡算法,因此使用此Web服務器時不可能使用多個工作進程。 出於這個原因,上面的所有例子都包含-w 1選項。

3) uWSGI web服務
將uWSGI服務器與gevent結合使用時,Socket.IO服務器可以利用uWSGI的原生WebSocket支持。

有關uWSGI服務器的配置和使用的完整說明超出了本文檔的范圍。 uWSGI服務器是一個相當復雜的軟件包,提供了大量全面的選項。 必須使用WebSocket和SSL支持來編譯WebSocket傳輸才能使用。 作為介紹的一種方式,以下命令在端口5000上為示例應用程序app.py啟動一個uWSGI服務器:
```$ uwsgi --http :5000 --gevent 1000 --http-websockets --master --wsgi-file app.py --callable app```

16 ngnix 做反向代理
可以使用nginx作為將請求傳遞給應用程序的前端反向代理。 但是,只有nginx 1.4和更新版本支持WebSocket協議的代理。 以下是代理HTTP和WebSocket請求的基本nginx配置:

server {
    listen 80;
    server_name _;

    location / {
        include proxy_params;
        proxy_pass http://127.0.0.1:5000;
    }

    location /socket.io {
        include proxy_params;
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        proxy_pass http://127.0.0.1:5000/socket.io;
    }
}

 

下一個示例添加了對多個Socket.IO服務器的負載平衡的支持:

upstream socketio_nodes {
    ip_hash;

    server 127.0.0.1:5000;
    server 127.0.0.1:5001;
    server 127.0.0.1:5002;
    # to scale the app, just add more nodes here!
}

server {
    listen 80;
    server_name _;

    location / {
        include proxy_params;
        proxy_pass http://127.0.0.1:5000;
    }

    location /socket.io {
        include proxy_params;
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        proxy_pass http://socketio_nodes/socket.io;
    }
}

盡管上述示例可以作為初始配置工作,但請注意,nginx的生產安裝將需要更完整的配置,涵蓋其他部署方面,如提供靜態文件資產和SSL支持。

17,使用多進程
Flask-SocketIO支持從2.0版本開始的負載均衡器之后的多個工作者。 部署多個工作者使得使用Flask-SocketIO的應用程序能夠在多個進程和主機之間傳播客戶端連接,並以這種方式進行擴展以支持大量的並發客戶端。
有兩個要求使用多個Flask-SocketIO工作者:

負載均衡器必須配置為將來自給定客戶端的所有HTTP請求始終轉發給同一個工作者。這有時被稱為“粘性會話”。對於nginx,使用ip_hash指令來實現這一點。 Gunicorn不能用於多個工作者,因為它的負載平衡器算法不支持粘性會話。
由於每個服務器只擁有客戶端連接的一部分,因此服務器使用Redis或RabbitMQ等消息隊列來協調諸如廣播和房間之類的復雜操作。
使用消息隊列時,還需要安裝其他依賴項:

對於Redis,必須安裝軟件包redis(pip install redis)。
對於RabbitMQ,必須安裝包kombu(pip install kombu)。
對於Kombu支持的其他消息隊列,請參閱Kombu文檔以了解需要什么依賴關系。
如果使用eventlet或gevent,那么通常需要修補Python標准庫來強制消息隊列包使用協程友好的函數和類。
要啟動多個Flask-SocketIO服務器,必須首先確保您有消息隊列服務正在運行。要啟動Socket.IO服務器並將其連接到消息隊列,請將message_queue參數添加到SocketIO構造函數中:
```socketio = SocketIO(app, message_queue='redis://')```
message_queue參數的值是使用的隊列服務的連接URL。 對於在與服務器相同的主機上運行的redis隊列,可以使用“redis://”URL。 同樣,對於默認的RabbitMQ隊列,可以使用“amqp://”URL。 Kombu軟件包有一個文檔部分,描述所有支持的隊列的URL的格式。

對於許多類型的應用程序,有必要從不是SocketIO服務器的進程發出事件,例如一個Celery進程。 如果將SocketIO服務器或服務器配置為按照上一節所述在消息隊列中進行偵聽,則其他任何進程都可以創建自己的SocketIO實例,並使用該實例以與服務器相同的方式發出事件。

例如,對於在eventlet Web服務器上運行並使用Redis消息隊列的應用程序,以下Python腳本向所有客戶端廣播一個事件:

socketio = SocketIO(message_queue='redis://')
socketio.emit('my event', {'data': 'foo'}, namespace='/test')

以這種方式使用SocketIO實例時,Flask應用程序實例不會傳遞給構造函數。

SocketIO的通道參數可用於通過消息隊列選擇特定的通信通道。當有多個獨立的SocketIO服務共享相同的隊列時,使用自定義通道名是必要的。

當使用eventlet或gevent時,Flask-SocketIO不適用猴子補丁。但是,當使用消息隊列時,如果Python標准庫沒有被修補,與消息隊列服務對話的Python包很可能會掛起。

需要注意的是,要連接到SocketIO服務器的外部進程不需要像主服務器那樣使用eventlet或gevent。讓服務器使用協程框架,而外部進程不是問題。例如,芹菜工作人員不需要配置為僅僅因為主服務器而使用eventlet或gevent。但是如果你的外部進程不管用什么理由使用協程框架,那么可能需要猴子補丁,這樣消息隊列就可以訪問協程友好的函數和類。

 

地址:https://www.cnblogs.com/minsons/p/8251780.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM