Django + Channels + Celery 實時更新日志
1.准備工作:
-
系統為windows系統。技術實現:
python 3.6.8 django 2.2 celery 3.1.26 redis 2.10.6 django-celery 3.3.1 channels 2.4.0 channels-redis 2.4.2
-
settings.py配置。
- 注冊APP
INSTALLED_APPS = [ ... # 注冊django-celery "djcelery", # 注冊 channel "channels", ]
-
celery一些參數配置
-
一張圖簡單看一下celery
-
import djcelery # 加載djcelery djcelery.setup_loader() # 數據庫調度 BROKER_TRANSPORT='redis' #指定redis # CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler' # celey處理器,固定 CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Broker配置,使用Redis作為消息中間件 消息隊列 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # BACKEND配置,這里使用redis 存儲結果 # 指定任務路徑。為api應用下的tasks.py文件 CELERY_IMPORTS = ('api.tasks') # CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 #允許的內容類型, CELERY_ACCEPT_CONTENT=['pickle','json'] #任務的序列化方式 CELERY_TASK_SERIALIZER = 'json' #celery時區,定時任務使用 CELERY_TIMEZONE = 'Asia/Shanghai' # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD = 100 # 有些情況下可以防止死鎖 CELERYD_FORCE_EXECV = True # 設置並發的worker數量 CELERYD_CONCURRENCY = 4 # 允許重試 CELERY_ACKS_LATE = True # 單個任務的最大運行時間,超過就殺死 CELERYD_TASK_TIME_LEMIT = 12 * 30
- channels一些配置,注冊channels需要指定ASGI路由地址
# 指定ASGI的路由地址 # 指定api 應用下routing.py ASGI_APPLICATION = 'api.routing.application'# CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('127.0.0.1', 6379)], }, }, }
- 假數據
# 這里字典形式封裝路經代表日志文件路徑 TAILF = { 1: r'J:\djangoSerializers\api\log_recored\sheet1', 2: r'J:\djangoSerializers\api\log_recored\sheet2', }
1.5文件目錄如下:
djangoSerializers
|__ api
|___ __init__.py
|___ taksk.py # 為創建
|___ views.py
|___ ...
|__ djangoSerializers
|___ __init__.py
|___ settings.py
|___ urls.py
|___ wsgi.py
|___ celery.py #為創建
2.celery
-
在項目目錄下二級目錄下創建celery.py
from django.conf import setting from __future__ import absolute_import, unicode_literals from celery import Celery, platforms from django.conf import settings import os # 設置當前django環境 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoSerializers.settings") # 實例化Celery對象 app = Celery("djangoSerializers") # 加載配置文件,並使用CELERY前綴 app.config_from_object("django.conf:settings", namespace='CELERY') # celery不能root用戶啟動解決 platforms.C_FORCE_ROOT = True # 去尋找每個app下的tasks.py文件 app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
-
在該目錄
__init___.py
添加# 這是為了確保在django啟動時啟動 celery from __future__ import absolute_import from .celery import app as celery_app
-
在app應用下創建
tasks.py
,用於celery異步任務處理。from celery import shared_task from asgiref.sync import async_to_sync from channels.layers import get_channel_layer @shared_task def tailf(id,channel_name): # 暫時先空出來
3.channel
-
因我們之前在
settings.py
指定ASGI_APPLICATION
路徑,於是在api
下創建routing.py
:from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter from django.urls import re_path from tailf.consumers import TailfConsumer class TokenAuthMiddle: def __init__(self,inner): self.inner = inner def __call__(self,scope): return self.inner(scope) # 這里為了簡單驗證直接返回當前對象,也可以自定義或者使用內置 AuthMiddlewareStack TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddle(AuthMiddlewareStack(inner)) # 指向處理websocket的類視圖函數 TailfConsumer application = ProtocolTypeRouter({ "websocket": TokenAuthMiddlewareStack(URLRouter([ re_path(r'^ws/tailf/(?P<id>\d+)/$', TailfConsumer), ]) ) })
-
在當前項目目錄下新建
consumers.py
用於websocket的連接和斷開。import json from channels.generic.websocket import WebsocketConsumer # 這里tailf是要執行異步任務 from api.tasks import tailf class TailfConsumer(WebsocketConsumer): def connect(self): # 通過獲取id來執行異步任務 self.file_id = self.scope["url_route"]["kwargs"]["id"] self.result = tailf.delay(self.file_id, self.channel_name) self.accept() def disconnect(self, code): # 終止執行中task self.result.revoke(terminate=True) print("disconnect:",self.file_id,self.channel_name) def send_message(self,event): # 發送給客戶端消息 self.send(text_data=json.dumps({ "message":event["message"] }))
4.其他邏輯配置
-
url.py
urlpatterns = [ ... url(r'tailf', views.tailf_view, name='tailf-url'), ]
-
視圖函數
def tailf_view(request): logDict = settings.TAILF return render(request,"index.html",{"logDict": logDict})
-
templates
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <div class="col-sm-8"> <select class="form-control" id="file"> <option value="">選擇要監聽的日志</option> {% for k,v in logDict.items %} <option value="{{ k }}">{{ v }}</option> {% endfor %} </select> </div> <div class="col-sm-2"> <input class="btn btn-success btn-block" type="button" onclick="connect()" value="開始監聽"/><br/> </div> <div class="col-sm-2"> <input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="終止監聽"/><br/> </div> <div class="col-sm-12"> <textarea class="form-control" id="chat-log" disabled rows="20"></textarea> </div> <script src="https://cdn.bootcss.com/jquery/3.5.0/jquery.min.js"></script> </body> <script> function connect() { if ( $('#file').val() ) { var url = 'ws://' + window.location.host + '/ws/tailf/' + $('#file').val() + '/'; window.chatSocket = new WebSocket(url); // 當瀏覽器接收到websocket服務器發送過來的數據時,就會觸發onmessage消息,參數e包含了服務端發送過來的數據 chatSocket.onmessage = function(e) { var data = JSON.parse(e.data); var message = data['message']; document.querySelector('#chat-log').value += (message); // 跳轉到頁面底部 $('#chat-log').scrollTop($('#chat-log')[0].scrollHeight); }; // 如果連接失敗,或者發送、接收數據失敗,或者數據處理出錯都會觸發onerror消息 chatSocket.onerror = function(e) { console.error('服務端連接異常!') }; // 當瀏覽器接收到websocket服務器發送過來的關閉連接請求時,會觸發onclose消息 chatSocket.onclose = function(e) { console.error('websocket已關閉!') }; } else { console.log('請選擇要監聽的日志文件') } }; function goclose() { // 用於關閉連接 window.chatSocket.close(); window.chatSocket.onclose = function(e) { console.log('已終止日志監聽!') }; } </script> </html>
- 當然還有
onopen:
當瀏覽器和websocket服務端連接成功后會觸發onopen消息。
- 當然還有
5.啟動worker節點:
- 終端執行:
python3 manage.py celery worker -l INFO
- 啟動項目測試一下吧。
6.Django項目敏感信息保存
- django項目做完后,向生產環境部署時,為了避免一些敏感信息被其他人利用,我們需要進行一定保護,比如settings配置中的一些密碼等內容。
- 通過
os.environ
模塊實現,這里以SECRET_KEY
為例:
在linux系統中 /etc/profile 中寫入SECRET_KEY
e.g.:
export SECRET_KEY = "..."
settings.py
import os
SECRET_KEY = os.environ["SECRET_KEY"]
- 注意:更改完
/etc/profile
后執行source /etc/profile
,以使更新后的內容生效。