Ubuntu 安裝Redis redis-5.0.3 服務端
python 安裝 pip install redis==2.10.6
在寫celery異步任務時,注意導入Django的配置環境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DRF_test.settings')
如果channels配置有Redis緩存將配置寫在settings.py中:
# 頻道層的緩存 # CHANNEL_LAYERS = { "default": { # "BACKEND": "channels.layers.InMemoryChannelLayer", "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [("redis://:123@192.168.133.128:6380")], # channel layers緩存使用Redis }, # "channel_capacity": { # "http.request": 200, # "http.response!*": 10, # re.compile(r"^websocket.send\!.+"): 20, # }, }, }
然后編寫異步任務:
from celery import Celery import time, os from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from DRF_test import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DRF_test.settings') app = Celery('tasks', broker='redis://:123@192.168.133.128:6380/2', backend='redis://:123@192.168.133.128:6380/2') @app.task def hello(id, channel_name): channel_layer = get_channel_layer() with open(os.path.join( settings.BASE_DIR, 'static/log.txt'), 'r') as f: while True: line = f.readline() if line: print(line) async_to_sync(channel_layer.send)( channel_name, { 'type': 'websocket.celery', 'message': line } ) return 'hello world' @app.task def hello2(x,y): return x+y
編輯channels的websocket內容:
class ChatConsumer(WebsocketConsumer): result = None def websocket_connect(self, message): """客戶端請求建立鏈接時 自動觸發""" print("建立", message, self.scope["user"], self.scope['url_route']['kwargs']['id']) logId = self.scope['url_route']['kwargs']['id'] self.accept() # 建立鏈接 並且自動幫你維護每一個客戶端 # self.send('11331', self.channel_name) # channel_layer = get_channel_layer() self.result = hello.delay(logId, self.channel_name) # 將鏈接在列表中存儲一份 # consumer_object_list.append(self) # --------------- # self.group_name = self.scope['url_route']['kwargs']['group_name'] # self.channel_layer.group_add(self.group_name, self.channel_name) # # 將用戶添加至聊天組信息chats中 # try: # ChatConsumer.chats[self.group_name].add(self) # except: # ChatConsumer.chats[self.group_name] = set([self]) # print(ChatConsumer.chats) # 創建連接時調用 # self.accept() def websocket_receive(self, message): """客戶端發送數據過來 自動觸發""" print(message) # message = {'type': 'websocket.receive', 'text': 'hello world!'} text = message.get('message') # 真正的數據 # 給客戶端發送消息 單獨發送 self.send(text_data=text) # 給所有的鏈接對象發送數據 for obj in consumer_object_list: obj.send(text_data=text) def websocket_celery(self, message): """celery發送數據過來 自動觸發""" print(message, '----------') # message = {'type': 'websocket.receive', 'text': 'hello world!'} text = message.get('message') # 真正的數據 # 給客戶端發送消息 單獨發送 self.send(text_data=text) # 給所有的鏈接對象發送數據 for obj in consumer_object_list: obj.send(text_data=text) def websocket_disconnect(self, message): """客戶端斷開鏈接之后 自動觸發""" # 客戶端斷開鏈接之后 應該將當前對象移除 # consumer_object_list.remove(self) # raise StopConsumer() # 連接關閉時調用 # 將關閉的連接從群組中移除 # self.channel_layer.group_discard(self.group_name, self.channel_name) # 將該客戶端移除聊天組連接信息 # ChatConsumer.chats[self.group_name].remove(self) # self.close() # 清除celery任務 self.result.revoke() print('清除成功') self.close()
注意;
最后啟動celery:
celery -A app1.test worker -l info
一個異步執行任務完成,並且實時更新內容