
pip3 install django
pip3 install channels
創建channels庫根路由配置文件,根路由配置文件類似Django URLconf,它會告訴Channels當收到由Channes服務器發過來的Http請求時,應該執行什么代碼:
# wssite/routing.py from channels.routing import ProtocolTypeRouter application = ProtocolTypeRouter({ # (http->django views is added by default) })
將 Channels 庫添加到已安裝的應用程序列表中。編輯 wssite/settings.py 文件並將 'channels' 添加到 INSTALLED_APPS 設置:
# mysite/settings.py INSTALLED_APPS = [ 'channels', 'chat', 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', ]
配置根路由指向 Channels:
# wssite/settings.py # Channels ASGI_APPLICATION = 'wssite.routing.application'
現在已安裝的應用程序中有 Channels, 它將控制 runserver 命令, 用 Channels 開發服務器替換標准的 Django 開發服務器。
consumer配置
在app中創建新文件consumer,並添加如下代碼:
# app/consumers.py from channels.generic.websocket import WebsocketConsumer import json class ChatConsumer(WebsocketConsumer): def connect(self): self.accept() def disconnect(self, close_code): pass def receive(self, text_data=None, bytes_data=None): text_data_json = json.loads(text_data) message = text_data_json['message'] self.send(text_data=json.dumps({ 'message': message
這是一個同步 WebSocket consumer, 它接受所有連接, 接收來自其客戶端的消息, 並將這些消息回送到同一客戶端。現在, 它不向同一個房間的其他客戶端廣播消息。
Channels 還支持編寫異步 consumers 以提高性能。但是, 任何異步 consumers 都必須小心, 避免直接執行阻塞操作。
我們需要為 app 創建一個路由配置, 它有一個通往 consumer 的路由。創建新文件 app/routing.py。並寫入以下代碼:
# app/routing.py from django.conf.urls import url from . import consumers websocket_urlpatterns = [ url(r'^ws/chat/(?P<room_name>[^/]+)/$', consumers.ChatConsumer), ]
下一步是將根路由指向 chat.routing 模塊。在 mysite/routing.py 中, 導入 AuthMiddlewareStack、URLRouter 和 chat.routing ;並在 ProtocolTypeRouter 列表中插入一個 "websocket" 鍵, 格式如下:
# app/routing.py from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import chat.routing application = ProtocolTypeRouter({ # (http->django views is added by default) 'websocket': AuthMiddlewareStack( URLRouter( chat.routing.websocket_urlpatterns ) ), })
這個根路由配置指定,當與 Channels 開發服務器建立連接的時候, ProtocolTypeRouter 將首先檢查連接的類型。如果是 WebSocket 連接 (ws://或 wss://), 則連接會交給 AuthMiddlewareStack。
AuthMiddlewareStack 將使用對當前經過身份驗證的用戶的引用來填充連接的 scope, 類似於 Django 的 AuthenticationMiddleware 用當前經過身份驗證的用戶填充視圖函數的請求對象。然后連接將被給到 URLRouter。
根據提供的 url 模式, URLRouter 將檢查連接的 HTTP 路徑, 以將其路由指定到到特定的 consumer。
channel layer
channel layer 是一種通信系統。它允許多個 consumer 實例互相交談, 以及與 Django 的其他部分進行通信。
channel layer 提供以下抽象:
channel 是可以發送消息的郵箱。每個 channel 都有一個名稱。任何有名稱的 channel 都可以向 channel 發送消息。
group 是一組相關的 channels。group 具有名稱。任何具有名字的 group 都可以按名稱向 group 中添加/刪除 channel, 也可以向 group 中的所有 channel 發送消息。無法列舉特定 group 中的 channel。
每個 consumer 實例都有一個自動生成的唯一的 channel 名稱, 因此可以通過 channel layer 進行通信。
使用redis作為channel layer的后備存儲。
安裝 channels_redis, 以便 Channels 知道如何調用 redis。運行以下命令:
pip3 install channels_redis
在使用 channel layer 之前, 必須對其進行配置。編輯 wssite/settings.py 文件並將 CHANNEL_LAYERS 設置添加到底部:
# wssite/settings.py # Channels ASGI_APPLICATION = 'mysite.routing.application' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('127.0.0.1', 6379)], }, }, }
現在我們有了一個 channel layer, 讓我們在 ChatConsumer 中使用它。將以下代碼放在 chat/consumers.py 中, 替換舊代碼:
# app/consumers.py from asgiref.sync import async_to_sync from channels.generic.websocket import WebsocketConsumer import json class ChatConsumer(WebsocketConsumer): def connect(self): self.room_name = self.scope['url_route']['kwargs']['room_name'] # scope中包含有關其連接的信息且在app/routes.py中的url路由中獲取'room_name'參數 self.room_group_name = 'chat_%s' % self.room_name # 構建channels_group名稱 # Join room group async_to_sync(self.channel_layer.group_add)( self.room_group_name, self.channel_name ) self.accept() # 用於接受websocket連接,不調用則表示拒絕接收連接 def disconnect(self, close_code): # Leave room group async_to_sync(self.channel_layer.group_discard)( self.room_group_name, self.channel_name ) # Receive message from WebSocket def receive(self, text_data=None, bytes_data=None): text_data_json = json.loads(text_data) message = text_data_json['message'] # Send message to room group async_to_sync(self.channel_layer.group_send)( self.room_group_name, { 'type': 'chat_message', 'message': message } ) # Receive message from room group def chat_message(self, event): message = event['message'] # Send message to WebSocket self.send(text_data=json.dumps({ 'message': message }))
異步模式
重寫 ChatConsumer 使其變為異步的。在 app/consumers.py 中輸入以下代碼:
# app/consumers.py from channels.generic.websocket import AsyncWebsocketConsumer import json
class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): self.room_name = self.scope['url_route']['kwargs']['room_name'] self.room_group_name = 'chat_%s' % self.room_name # Join room group await self.channel_layer.group_add( self.room_group_name, self.channel_name ) await self.accept() async def disconnect(self, close_code): # Leave room group await self.channel_layer.group_discard( self.room_group_name, self.channel_name ) # Receive message from WebSocket async def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] # Send message to room group await self.channel_layer.group_send( self.room_group_name, { 'type': 'chat_message', 'message': message } ) # Receive message from room group async def chat_message(self, event): message = event['message'] # Send message to WebSocket await self.send(text_data=json.dumps({ 'message': message }))
這些用於 ChatConsumer 的新代碼與原始代碼非常相似, 它們具有以下差異:
- 現在 ChatConsumer 繼承自 AsyncWebsocketConsumer 而不是 WebsocketConsumer。
- 所有方法都是 async def, 而不僅僅是 def。
- await 被用於調用執行 I/O 的異步函數。
- 在 channel layer 上調用方法時, 不再需要 async_to_sync。
數據庫訪問
Django ORM是一段同步代碼,因此如果您想從異步代碼訪問它,您需要進行特殊處理以確保其連接正確關閉。
如果你正在使用SyncConsumer
或者基於它的任何東西 - 比如 JsonWebsocketConsumer
- 你不需要做任何特別的事情,因為所有代碼都已經在同步模式下運行,並且Channels將作為SyncConsumer
代碼的一部分為你做清理工作。
但是,如果要編寫異步代碼,則需要使用安全的同步上下文調用數據庫方法database_sync_to_async
。
數據庫連接
如果您使用線程使用者(同步的),通道可能會比您可能使用的通道打開更多的數據庫連接 - 每個線程最多可以打開一個連接。
默認情況下,線程數設置為“CPU數* 5”,因此您可以看到最多這個線程數。如果要更改它,請將ASGI_THREADS
環境變量設置為您希望允許的最大數量。
為了避免在連接中有太多線程空閑,您可以改寫代碼以使用異步使用者,並且只在需要使用Django的ORM(使用database_sync_to_async
)時才進入線程。
database_sync_to_async
要使用它,請在單獨的函數或方法中編寫ORM查詢,然后database_sync_to_async
像這樣調用它:
from channels.db import database_sync_to_async async def connect(self): self.username = await database_sync_to_async(self.get_name)() def get_name(self): return User.objects.all()[0].name
您也可以將它用作裝飾者:
from channels.db import database_sync_to_async async def connect(self): self.username = await self.get_name() @database_sync_to_async def get_name(self): return User.objects.all()[0].name