Django + Channels + Celery 實時更新日志


Django + Channels + Celery 實時更新日志

1.准備工作:

  • 系統為windows系統。技術實現:

    python 3.6.8
    django 2.2
    celery 3.1.26
    redis  2.10.6
    django-celery 3.3.1
    channels 2.4.0
    channels-redis 2.4.2
    
  • settings.py配置。

    • 注冊APP
    INSTALLED_APPS = [
        ...
        # 注冊django-celery
        "djcelery",
        # 注冊 channel
        "channels",
    ]
    
    • celery一些參數配置

      • 一張圖簡單看一下celery

    import djcelery
    # 加載djcelery
    djcelery.setup_loader() 
    # 數據庫調度
    BROKER_TRANSPORT='redis' #指定redis
    # CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler' # celey處理器,固定
    CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Broker配置,使用Redis作為消息中間件   消息隊列
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # BACKEND配置,這里使用redis    存儲結果
    # 指定任務路徑。為api應用下的tasks.py文件
    CELERY_IMPORTS = ('api.tasks')
    # CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案
    #允許的內容類型,
    CELERY_ACCEPT_CONTENT=['pickle','json']
    #任務的序列化方式
    CELERY_TASK_SERIALIZER = 'json'
    #celery時區,定時任務使用
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    #  每個worker最多執行100個任務被銷毀,可以防止內存泄漏
    CELERYD_MAX_TASKS_PER_CHILD = 100
    #  有些情況下可以防止死鎖
    CELERYD_FORCE_EXECV = True
    #  設置並發的worker數量
    CELERYD_CONCURRENCY = 4
    # 允許重試
    CELERY_ACKS_LATE = True
    #  單個任務的最大運行時間,超過就殺死
    CELERYD_TASK_TIME_LEMIT = 12 * 30
    
    • channels一些配置,注冊channels需要指定ASGI路由地址
    # 指定ASGI的路由地址
    # 指定api 應用下routing.py
    ASGI_APPLICATION = 'api.routing.application'#
    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                "hosts": [('127.0.0.1', 6379)],
            },
        },
    }
    
    • 假數據
    # 這里字典形式封裝路經代表日志文件路徑
    TAILF = {
        1: r'J:\djangoSerializers\api\log_recored\sheet1',
        2: r'J:\djangoSerializers\api\log_recored\sheet2',
    }
    

1.5文件目錄如下:

djangoSerializers
	|__ api
	    |___ __init__.py 
		|___ taksk.py # 為創建
		|___ views.py
		|___ ...
	|__ djangoSerializers
		|___ __init__.py
		|___ settings.py
         |___ urls.py
         |___ wsgi.py
         |___ celery.py   #為創建

2.celery

  • 在項目目錄下二級目錄下創建celery.py

    from django.conf import setting
    from __future__ import absolute_import, unicode_literals
    from celery import Celery, platforms
    from django.conf import settings
    import os
    # 設置當前django環境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoSerializers.settings")
    # 實例化Celery對象
    app = Celery("djangoSerializers")
    # 加載配置文件,並使用CELERY前綴
    app.config_from_object("django.conf:settings", namespace='CELERY')
    # celery不能root用戶啟動解決
    platforms.C_FORCE_ROOT = True
    # 去尋找每個app下的tasks.py文件
    app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
    
  • 在該目錄__init___.py 添加

    # 這是為了確保在django啟動時啟動 celery
    from __future__ import absolute_import
    from .celery import app as celery_app
    
  • 在app應用下創建tasks.py,用於celery異步任務處理。

    from celery import shared_task
    from asgiref.sync import async_to_sync
    from channels.layers import get_channel_layer
    @shared_task
    def tailf(id,channel_name):
    	# 暫時先空出來
    

3.channel

  • 因我們之前在settings.py指定ASGI_APPLICATION路徑,於是在api下創建routing.py

    
    from channels.auth import AuthMiddlewareStack
    from channels.routing import ProtocolTypeRouter, URLRouter
    from django.urls import re_path
    
    from tailf.consumers import TailfConsumer
    class TokenAuthMiddle:
        def __init__(self,inner):
            self.inner = inner
        def __call__(self,scope):
            return self.inner(scope)
    # 這里為了簡單驗證直接返回當前對象,也可以自定義或者使用內置 AuthMiddlewareStack
    TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddle(AuthMiddlewareStack(inner))
    # 指向處理websocket的類視圖函數 TailfConsumer
    application = ProtocolTypeRouter({
        "websocket": TokenAuthMiddlewareStack(URLRouter([
                re_path(r'^ws/tailf/(?P<id>\d+)/$', TailfConsumer),
            ])
        )
    })
    
    
  • 在當前項目目錄下新建consumers.py用於websocket的連接和斷開。

    import json
    
    from channels.generic.websocket import WebsocketConsumer
    # 這里tailf是要執行異步任務
    from api.tasks import tailf
    
    class TailfConsumer(WebsocketConsumer):
        def connect(self):
            # 通過獲取id來執行異步任務
            self.file_id = self.scope["url_route"]["kwargs"]["id"]
            self.result = tailf.delay(self.file_id, self.channel_name)
            self.accept()
        def disconnect(self, code):
            # 終止執行中task
            self.result.revoke(terminate=True)
            print("disconnect:",self.file_id,self.channel_name)
        def send_message(self,event):
            # 發送給客戶端消息
            self.send(text_data=json.dumps({
                "message":event["message"]
            }))
    
    

4.其他邏輯配置

  • url.py

    urlpatterns = [
    	...
    	url(r'tailf', views.tailf_view, name='tailf-url'),
    	]
    
  • 視圖函數

    def tailf_view(request):
        logDict = settings.TAILF
        return render(request,"index.html",{"logDict": logDict})
    
  • templates

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <div class="col-sm-8">
        <select class="form-control" id="file">
            <option value="">選擇要監聽的日志</option>
            {% for k,v in logDict.items %}
                <option value="{{ k }}">{{ v }}</option>
            {% endfor %}
        </select>
    </div>
    <div class="col-sm-2">
        <input class="btn btn-success btn-block" type="button" onclick="connect()" value="開始監聽"/><br/>
    </div>
    <div class="col-sm-2">
        <input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="終止監聽"/><br/>
    </div>
    <div class="col-sm-12">
        <textarea class="form-control" id="chat-log" disabled rows="20"></textarea>
    </div>
    <script src="https://cdn.bootcss.com/jquery/3.5.0/jquery.min.js"></script>
    </body>
    <script>
        function connect() {
            if ( $('#file').val() ) {
                var url = 'ws://' + window.location.host + '/ws/tailf/' + $('#file').val() + '/';
                window.chatSocket = new WebSocket(url);
                // 當瀏覽器接收到websocket服務器發送過來的數據時,就會觸發onmessage消息,參數e包含了服務端發送過來的數據
                chatSocket.onmessage = function(e) {
                    var data = JSON.parse(e.data);
                    var message = data['message'];
                    document.querySelector('#chat-log').value += (message);
                    // 跳轉到頁面底部
                    $('#chat-log').scrollTop($('#chat-log')[0].scrollHeight);
              };
    		 // 如果連接失敗,或者發送、接收數據失敗,或者數據處理出錯都會觸發onerror消息
              chatSocket.onerror = function(e) {
                console.error('服務端連接異常!')
              };
    		// 當瀏覽器接收到websocket服務器發送過來的關閉連接請求時,會觸發onclose消息
              chatSocket.onclose = function(e) {
                console.error('websocket已關閉!')
              };
            } else {
              console.log('請選擇要監聽的日志文件')
            }
          };
        function goclose() {
            // 用於關閉連接
            window.chatSocket.close();
            window.chatSocket.onclose = function(e) {
            console.log('已終止日志監聽!')
        };
      }
    </script>
    </html>
    
    
    • 當然還有onopen:當瀏覽器和websocket服務端連接成功后會觸發onopen消息。

5.啟動worker節點:

  • 終端執行:
python3 manage.py celery worker -l INFO
  • 啟動項目測試一下吧。

6.Django項目敏感信息保存

  • django項目做完后,向生產環境部署時,為了避免一些敏感信息被其他人利用,我們需要進行一定保護,比如settings配置中的一些密碼等內容。
  • 通過os.environ模塊實現,這里以SECRET_KEY為例:
在linux系統中 /etc/profile 中寫入SECRET_KEY
e.g.:
	export SECRET_KEY = "..."
settings.py
	import os
	SECRET_KEY = os.environ["SECRET_KEY"]
  • 注意:更改完/etc/profile后執行source /etc/profile,以使更新后的內容生效。

參考鏈接:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM