一篇搞定Python3.6+Django+channels实现WebSocket的实时聊天室


个人博客,欢迎来撩 fangzengye.com

1.第一部分:基础设置

Channels3.0支持Python3.6和Django2.2+

1.1项目结构

mysite/
    manage.py
    mysite/
        __init__.py
        asgi.py
        settings.py
        urls.py
        wsgi.py

1.2自建项目主要文件夹

chat/
    __init__.py
    admin.py
    apps.py
    migrations/
        __init__.py
    models.py
    tests.py
    views.py

1.3移除不必须文件,保留必须文件,像这样

chat/
    __init__.py
    views.py

1.4在INSTALLED_APPS加入项目名

# mysite/settings.py
INSTALLED_APPS = [
    'chat',
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
]

1.5创建聊天首页view

1.5.1在templates文件夹创建一个html文件

chat/
    __init__.py
    templates/
        chat/
            index.html
    views.py

html代码

<!-- chat/templates/chat/index.html -->
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>Chat Rooms</title>
</head>
<body>
    What chat room would you like to enter?<br>
    <input id="room-name-input" type="text" size="100"><br>
    <input id="room-name-submit" type="button" value="Enter">
&lt;script&gt;
    document.querySelector('#room-name-input').focus();
    document.querySelector('#room-name-input').onkeyup = function(e) {
        if (e.keyCode === 13) {  // enter, return
            document.querySelector('#room-name-submit').click();
        }
    };

    document.querySelector('#room-name-submit').onclick = function(e) {
        var roomName = document.querySelector('#room-name-input').value;
        window.location.pathname = '/chat/' + roomName + '/';
    };
&lt;/script&gt;

</body>
</html>

1.6在view创建接受前端请求,返回html

# chat/views.py
from django.shortcuts import render

def index(request):
return render(request, 'chat/index.html')

1.7在路由端chat/urls.py添加路径

# chat/urls.py
from django.urls import path

from . import views

urlpatterns = [
path('', views.index, name='index'),
]

1.8在路由端mysite/urls.py添加路径

# mysite/urls.py
from django.conf.urls import include
from django.urls import path
from django.contrib import admin

urlpatterns = [
path('chat/', include('chat.urls')),
path('admin/', admin.site.urls),
]

1.9可以运行了

python3 manage.py runserver

这里有两个连接

http://127.0.0.1:8000/chat/

http://127.0.0.1:8000/chat/lobby/这个连接可能会404,因为还没有整合channels

1.10修改mysite/Saginaw.py

# mysite/asgi.py
import os

from channels.routing import ProtocolTypeRouter
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

application = ProtocolTypeRouter({
"http": get_asgi_application(),
# Just HTTP for now. (We can add other protocols later.)
})

1.11在mysite/settings.py的INSTALLED_APP 添加channels

# mysite/settings.py
INSTALLED_APPS = [
    'channels',
    'chat',
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
]

1.12在mysite/settings.py添加指向Channels

# mysite/settings.py
# Channels
ASGI_APPLICATION = 'mysite.asgi.application'

1.13再次运行

python3 manage.py runserver

 

 

2.聊天室服务器接口

2.1添加一个聊天页面视图view

新建一个空html页面文件chat/templates/chat/room.html

因此项目结构如下:

chat/
    __init__.py
    templates/
        chat/
            index.html
            room.html
    urls.py
    views.py

html代码如下

<!-- chat/templates/chat/room.html -->
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>Chat Room</title>
</head>
<body>
    <textarea id="chat-log" cols="100" rows="20"></textarea><br>
    <input id="chat-message-input" type="text" size="100"><br>
    <input id="chat-message-submit" type="button" value="Send">
    {{ room_name|json_script:"room-name" }}
    <script>
        const roomName = JSON.parse(document.getElementById('room-name').textContent);
    const chatSocket = new WebSocket(
        'ws://'
        + window.location.host
        + '/ws/chat/'
        + roomName
        + '/'
    );

    chatSocket.onmessage = function(e) {
        const data = JSON.parse(e.data);
        document.querySelector('#chat-log').value += (data.message + '\n');
    };

    chatSocket.onclose = function(e) {
        console.error('Chat socket closed unexpectedly');
    };

    document.querySelector('#chat-message-input').focus();
    document.querySelector('#chat-message-input').onkeyup = function(e) {
        if (e.keyCode === 13) {  // enter, return
            document.querySelector('#chat-message-submit').click();
        }
    };

    document.querySelector('#chat-message-submit').onclick = function(e) {
        const messageInputDom = document.querySelector('#chat-message-input');
        const message = messageInputDom.value;
        chatSocket.send(JSON.stringify({
            'message': message
        }));
        messageInputDom.value = '';
    };
&lt;/script&gt;

</body>
</html>

这段前端Html挺重要的,实例化了WebSocket,添加onsend、onclosed、onmessage函数,与后端设置WebSocket连接起来了

 

2.2在chat/views.py添加romm函数

# chat/views.py
from django.shortcuts import render

def index(request):
return render(request, 'chat/index.html', {})

def room(request, room_name):
return render(request, 'chat/room.html', {
'room_name': room_name
})

2.3修改chat/urls.py

# chat/urls.py
from django.urls import path

from . import views

urlpatterns = [
path('', views.index, name='index'),
path('<str:room_name>/', views.room, name='room'),
]

2.4运行

python3 manage.py runserver

Go to http://127.0.0.1:8000/chat/我们可以看到首页

http://127.0.0.1:8000/chat/lobby/可以看到展示一个空聊天记录信息

2.5聊天室输入hello回🚗没有任何发生说明还没启动Websocket

到目前为止,只是在前端设置了WebSocket,还没在后端设置WebSocket,因此还没建立连接,接下来在后端设置WebSocket

2.6创建chat/consumer.py,像

chat/
    __init__.py
    consumers.py
    templates/
        chat/
            index.html
            room.html
    urls.py
    views.py

consumer.py代码

# chat/consumers.py
import json
from channels.generic.websocket import WebsocketConsumer

class ChatConsumer(WebsocketConsumer):
def connect(self):
self.accept()

def disconnect(self, close_code):
    pass

def receive(self, text_data):
    text_data_json = json.loads(text_data)
    message = text_data_json['message']

    self.send(text_data=json.dumps({
        'message': message
    }))</code></pre> 

2.7创建chat/routing.py,像

chat/
    __init__.py
    consumers.py
    routing.py
    templates/
        chat/
            index.html
            room.html
    urls.py
    views.py

routing.py代码

chat/routing.py
from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

这里调用as_asgi()为了适配ASGI应用

2.8修改mysite/asgi.py

mysite/asgi.py
import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import chat.routing

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
chat.routing.websocket_urlpatterns
)
),
})

2.9使用redis作为消息存储仓(backing store )

2.10安装channels_redis

Pip install channels_redis

2.11配置mysite/sttings.py,添加CHANNELS_LAYERS

mysite/settings.py
# Channels
ASGI_APPLICATION = 'mysite.asgi.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

2.12打开python shell

$ python3 manage.py shell
>>> import channels.layers
>>> channel_layer = channels.layers.get_channel_layer()
>>> from asgiref.sync import async_to_sync
>>> async_to_sync(channel_layer.send)('test_channel', {'type': 'hello'})
>>> async_to_sync(channel_layer.receive)('test_channel')
{'type': 'hello'}

control+D退出shell

2.13现在有了chanel层,使用ChatConsumer函数,在chat/consumers.py上添加函数

# chat/consumers.py
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer

class ChatConsumer(WebsocketConsumer):
def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.room_name

    # Join room group
    async_to_sync(self.channel_layer.group_add)(
        self.room_group_name,
        self.channel_name
    )

    self.accept()

def disconnect(self, close_code):
    # Leave room group
    async_to_sync(self.channel_layer.group_discard)(
        self.room_group_name,
        self.channel_name
    )

# Receive message from WebSocket
def receive(self, text_data):
    text_data_json = json.loads(text_data)
    message = text_data_json['message']

    # Send message to room group
    async_to_sync(self.channel_layer.group_send)(
        self.room_group_name,
        {
            'type': 'chat_message',
            'message': message
        }
    )

# Receive message from room group
def chat_message(self, event):
    message = event['message']

    # Send message to WebSocket
    self.send(text_data=json.dumps({
        'message': message
    }))</code></pre> 

更深层次对函数解释

self.scope['url_route']['kwargs']['room_name']

从已经建立好连接WebSocket的URL路由地址chat/routing.py获取room_name参数

每个用户都有一个保存自己连接的信息的scope

self.room_group_name = 'chat_%s' % self.room_name

建立一个Channels聊天组名

async_to_sync(self.channel_layer.group_add)(...)

加入一个group

任何一个聊天室都需要实时同步的async WebSocket方法

Group name 限制在ASCII字符

self.accept()

接收WebSocket连接

如果你没有在connect()中调用accept()可能会导致连接关闭或被拒绝

如果你想要使用accept(),建议你在connect()函数中最后使用(放在最后一行)

async_to_sync(self.channel_layer.group_discard)(...)

退出group群行为

async_to_sync(self.channel_layer.group_send)

发送一个事件到群group

如果返回的事件包含type键可能会引起用户接受到次事件信息

 

2.14运行runserver

使用两个浏览器同时访问http://127.0.0.1:8000/chat/lobby/,在上面发送消息,可以看到另外浏览器收到消息

[refers](https://channels.readthedocs.io/en/latest/tutorial/part_2.html)

 

3.用Asynchronous重写聊天室

3.1重写ChatConsumer类,修改chat/consumers.py

chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.room_name

    # Join room group
    await self.channel_layer.group_add(
        self.room_group_name,
        self.channel_name
    )

    await self.accept()

async def disconnect(self, close_code):
    # Leave room group
    await self.channel_layer.group_discard(
        self.room_group_name,
        self.channel_name
    )

# Receive message from WebSocket
async def receive(self, text_data):
    text_data_json = json.loads(text_data)
    message = text_data_json['message']

    # Send message to room group
    await self.channel_layer.group_send(
        self.room_group_name,
        {
            'type': 'chat_message',
            'message': message
        }
    )

# Receive message from room group
async def chat_message(self, event):
    message = event['message']

    # Send message to WebSocket
    await self.send(text_data=json.dumps({
        'message': message
    }))

简化上面代码,写成如下框架

chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
async def connect():

    await self.channel_layer.group_add()

    await self.accept()

async def disconnect():

    await self.channel_layer.group_discard()

async def receive():

    await self.channel_layer.group_send()

async def chat_message(self, event):

    await self.send()

await 是调用同步asynshronous函数的I/O平台

 

[refers](https://channels.readthedocs.io/en/latest/tutorial/part_3.html)

 

4.自动化测试

4.1安装selenium

pip install selenium

4.2创建新文件chat/test.py,文件架构如下

chat/
    __init__.py
    consumers.py
    routing.py
    templates/
        chat/
            index.html
            room.html
    tests.py
    urls.py
    views.py

修改chat/test.py

# chat/tests.py
from channels.testing import ChannelsLiveServerTestCase
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait

class ChatTests(ChannelsLiveServerTestCase):
serve_static = True # emulate StaticLiveServerTestCase

@classmethod
def setUpClass(cls):
    super().setUpClass()
    try:
        # NOTE: Requires "chromedriver" binary to be installed in $PATH
        cls.driver = webdriver.Chrome()
    except:
        super().tearDownClass()
        raise

@classmethod
def tearDownClass(cls):
    cls.driver.quit()
    super().tearDownClass()

def test_when_chat_message_posted_then_seen_by_everyone_in_same_room(self):
    try:
        self._enter_chat_room('room_1')

        self._open_new_window()
        self._enter_chat_room('room_1')

        self._switch_to_window(0)
        self._post_message('hello')
        WebDriverWait(self.driver, 2).until(lambda _:
            'hello' in self._chat_log_value,
            'Message was not received by window 1 from window 1')
        self._switch_to_window(1)
        WebDriverWait(self.driver, 2).until(lambda _:
            'hello' in self._chat_log_value,
            'Message was not received by window 2 from window 1')
    finally:
        self._close_all_new_windows()

def test_when_chat_message_posted_then_not_seen_by_anyone_in_different_room(self):
    try:
        self._enter_chat_room('room_1')

        self._open_new_window()
        self._enter_chat_room('room_2')

        self._switch_to_window(0)
        self._post_message('hello')
        WebDriverWait(self.driver, 2).until(lambda _:
            'hello' in self._chat_log_value,
            'Message was not received by window 1 from window 1')

        self._switch_to_window(1)
        self._post_message('world')
        WebDriverWait(self.driver, 2).until(lambda _:
            'world' in self._chat_log_value,
            'Message was not received by window 2 from window 2')
        self.assertTrue('hello' not in self._chat_log_value,
            'Message was improperly received by window 2 from window 1')
    finally:
        self._close_all_new_windows()

# === Utility ===

def _enter_chat_room(self, room_name):
    self.driver.get(self.live_server_url + '/chat/')
    ActionChains(self.driver).send_keys(room_name + '\n').perform()
    WebDriverWait(self.driver, 2).until(lambda _:
        room_name in self.driver.current_url)

def _open_new_window(self):
    self.driver.execute_script('window.open("about:blank", "_blank");')
    self.driver.switch_to_window(self.driver.window_handles[-1])

def _close_all_new_windows(self):
    while len(self.driver.window_handles) &gt; 1:
        self.driver.switch_to_window(self.driver.window_handles[-1])
        self.driver.execute_script('window.close();')
    if len(self.driver.window_handles) == 1:
        self.driver.switch_to_window(self.driver.window_handles[0])

def _switch_to_window(self, window_index):
    self.driver.switch_to_window(self.driver.window_handles[window_index])

def _post_message(self, message):
    ActionChains(self.driver).send_keys(message + '\n').perform()

@property
def _chat_log_value(self):
    return self.driver.find_element_by_css_selector('#chat-log').get_property('value')</code></pre> 

4.3搭载数据库sqlite3,需要告诉Django要使用数据库,在mysite/settings.py修改

mysite/settings.py
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
        'TEST': {
            'NAME': os.path.join(BASE_DIR, 'db_test.sqlite3')
        }
    }
}

 

4.4运行

python3 manage.py test chat.tests

您可以看到

Creating test database for alias 'default'...
System check identified no issues (0 silenced).
..
----------------------------------------------------------------------
Ran 2 tests in 5.014s

OK
Destroying test database for alias 'default'...

4.5大功完成

[refers](https://channels.readthedocs.io/en/latest/tutorial/part_4.html)

4.6如您想要更深的了解可以访问https://channels.readthedocs.io/en/latest/index.html#topics

后记

一直在做Django后台开发,但是,一开始并不知道WebSocket是啥,更不知道python的Django还有第三方库Channels库实现网页全双工WebSocket,是Amanda小姐姐告诉还有WebSocket可以聊解一下,没想到,才意识到现在使用的微信、高德实时导航,QQ等这些实时通信工具都是基于这种原理实现。

https://channels.readthedocs.io/en/latest/tutorial/index.html

https://blog.csdn.net/weixin_43486863/article/details/83344368

 

 

 

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM