django + celery + channels.websocket 異步任務


Ubuntu 安裝Redis redis-5.0.3  服務端

python 安裝 pip install redis==2.10.6

在寫celery異步任務時,注意導入Django的配置環境

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DRF_test.settings')

如果channels配置有Redis緩存將配置寫在settings.py中:

# 頻道層的緩存    #
CHANNEL_LAYERS = {
    "default": {
        # "BACKEND": "channels.layers.InMemoryChannelLayer",
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("redis://:123@192.168.133.128:6380")],  # channel layers緩存使用Redis
        },
        # "channel_capacity": {
        #     "http.request": 200,
        #     "http.response!*": 10,
        #     re.compile(r"^websocket.send\!.+"): 20,
        # },
    },
}

然后編寫異步任務:

from celery import Celery
import time, os
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

from DRF_test import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DRF_test.settings')

app = Celery('tasks', broker='redis://:123@192.168.133.128:6380/2', backend='redis://:123@192.168.133.128:6380/2')


@app.task
def hello(id, channel_name):

    channel_layer = get_channel_layer()

    with open(os.path.join( settings.BASE_DIR, 'static/log.txt'), 'r') as f:
        while True:
            line = f.readline()
            if line:
                print(line)
                async_to_sync(channel_layer.send)(
                    channel_name,
                    {
                        'type': 'websocket.celery',
                        'message': line
                    }
                )
    return 'hello world'
@app.task
def hello2(x,y):
    return x+y

編輯channels的websocket內容:

class ChatConsumer(WebsocketConsumer):
    result = None

    def websocket_connect(self, message):
        """客戶端請求建立鏈接時 自動觸發"""
        print("建立", message, self.scope["user"], self.scope['url_route']['kwargs']['id'])
        logId = self.scope['url_route']['kwargs']['id']
        self.accept()  # 建立鏈接  並且自動幫你維護每一個客戶端
        # self.send('11331', self.channel_name)
        # channel_layer = get_channel_layer()
        self.result = hello.delay(logId, self.channel_name)

        # 將鏈接在列表中存儲一份
        # consumer_object_list.append(self)

    #      ---------------
    #     self.group_name = self.scope['url_route']['kwargs']['group_name']
    #     self.channel_layer.group_add(self.group_name, self.channel_name)
    #     # 將用戶添加至聊天組信息chats中
    #     try:
    #         ChatConsumer.chats[self.group_name].add(self)
    #     except:
    #         ChatConsumer.chats[self.group_name] = set([self])
        # print(ChatConsumer.chats)
        # 創建連接時調用
        # self.accept()


    def websocket_receive(self, message):
        """客戶端發送數據過來  自動觸發"""

        print(message)
        #  message = {'type': 'websocket.receive', 'text': 'hello world!'}
        text = message.get('message')  # 真正的數據
        # 給客戶端發送消息  單獨發送
        self.send(text_data=text)

        # 給所有的鏈接對象發送數據
        for obj in consumer_object_list:
            obj.send(text_data=text)

    def websocket_celery(self, message):
        """celery發送數據過來  自動觸發"""

        print(message, '----------')
        #  message = {'type': 'websocket.receive', 'text': 'hello world!'}
        text = message.get('message')  # 真正的數據
        # 給客戶端發送消息  單獨發送
        self.send(text_data=text)

        # 給所有的鏈接對象發送數據
        for obj in consumer_object_list:
            obj.send(text_data=text)



    def websocket_disconnect(self, message):
        """客戶端斷開鏈接之后  自動觸發"""
        # 客戶端斷開鏈接之后 應該將當前對象移除
        # consumer_object_list.remove(self)
        # raise StopConsumer()

        # 連接關閉時調用
        # 將關閉的連接從群組中移除
        # self.channel_layer.group_discard(self.group_name, self.channel_name)
        # 將該客戶端移除聊天組連接信息
        # ChatConsumer.chats[self.group_name].remove(self)
        # self.close()
        # 清除celery任務
        self.result.revoke()
        print('清除成功')
        self.close()

注意;

 

最后啟動celery:

  celery -A app1.test worker -l info

一個異步執行任務完成,並且實時更新內容

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM