Django項目中接受前端頁面點擊事件異步執行之celery+redis


問題場景:

當在Python Django項目中,創建多個APP應用,並且Django實現后端業務邏輯處理過程,屬於(后端代碼),既然后有后端代碼,那基本會與前端代碼(前端頁面)進行人機交互操作。在此情況下,基本操作前端輸入對應的參數和點擊對應的按鈕事件,將數據實時傳輸給后端代碼進行業務處理,然后並在最短時間內反饋到前端,實現一次完整的輸入輸出過程(該過程實時並高效快速),但往往該情況屬於實時處理,既然有實時,也存在異步操作,此處就以Python調用ansible執行自動化部署操作為例,在該例子中,前端提交部署請求,Python調用ansible進行部署處理,在處理過程中,一般簡單操作會快速執行完成並返回結果數據,但當部署復雜的ansible操作時其中等待的時間會更長,給前端會處於等待狀態,會因此帶來擁塞情況和用戶體驗較差。針對此情況,Django項目有他處理的成熟方案。

解決方案:

根據以上提出的問題,進行分析,既要讓用戶不處於一直等待狀態,又要讓該任務后端異步執行,同時用戶需要在任務執行完成后被動知道最終結果。綜上所述得出方案:Django項目中采用celery異步任務執行+redis任務隊列數據存儲+執行任務結束回調過程。

具體實現:mysql存儲執行結果日志,redis緩存,Django1.11.7,Python3.6.3(asstes項目,asstes_cd APP應用)

1、Django項目基礎依賴包:ansible,celery,Django,Django,django-celery-beat(django項目中會生成定時任務表),redis

(assets) root@python-dev:/application/assets# pip list
Package             Version    
------------------- ----------- ansible 2.3.2.0 celery 4.2.0 Django 1.11.7 django-celery-beat  1.1.1      
django-filter       1.1.0      
django-mysql        2.2.2      
django-rest-swagger 2.1.2      
djangorestframework 3.7.3    
pip                 10.0.1    
PyMySQL             0.8.0 redis 2.10.6     
requests            2.18.4     
requests-ntlm       1.1.0      
setuptools          28.8.0          
urllib3             1.22     
virtualenv          16.0.0     

2、Django項目asstes主項目配置:redis數據庫用於celery任務存放

CELERY_BROKER_URL = 'redis://localhost:6379/0'
#CELERY_RESULT_BACKEND = 'redis://'
#CELERY_RESULT_PERSISTENT = False
#CELERY_TASK_SERIALIZER = 'json'
#CELERY_RESULT_SERIALIZER = 'json'
#CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = TIME_ZONE
# CELERY_ENABLE_UTC = True

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_beat',
]

 

3、Django項目asstes主目錄下__init__.py配置啟用celery在項目中使用

import pymysql
pymysql.install_as_MySQLdb()
from .celerytask import app as celery_app

# include all celery task

__all__ = ['celery_app']

4、Django項目asstes主目錄下建立celerytask.py實現Django與celery任務對接

import os
from celery import Celery
from celery.schedules import crontab

# By default, celery searches for a submodule named celery.py
# http://docs.celeryproject.org/en/latest/getting-started/\
# next-steps.html#about-the-app-argument
#
# -- Refer
# http://docs.celeryproject.org/en/latest/
# https://github.com/celery/celery/tree/master/examples/django/
# http://docs.celeryproject.org/en/latest/userguide/application.html
# http://docs.celeryproject.org/en/latest/userguide/configuration.html


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'assets.settings')


#app = Celery('oms', backend='rpc://', broker='pyamqp://guest@localhost//')
app = Celery('assets')


# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')


# Load task modules from all registered Django app configs.
#from django.conf import settings
#app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks()

5、APP asstes_cd應用中建立view.py業務類(與前端交互API),tasks.py(celery任務處理類)

view類:由於內容過長,直接上重點

from assets_cd import tasks #導入task celery任務處理
.............省略.............
oper_log_id_list = [] ip_list = [] ssh_list = [] path_list = [] with transaction.atomic(): for deploy in deploy_log_list: deploy.save() oper_log_id_list.append(deploy.id) ip_list.append(deploy.host_ip) ssh_list.append(deploy.ssh_user) path_list.append(deploy.service_path) callback_id = ','.join([str(x) for x in oper_log_id_list]) ip_list = ','.join([str(ip) for ip in ip_list]) ssh_list = ','.join([str(ssh) for ssh in ssh_list]) path_list = ','.join([str(path) for path in path_list]) logger.info('callback_id, {}'.format(callback_id)) logger.info('add tasks_deploy, tasks_deploy.run_playbook_task_deploy.delay') for playbook in play_book_list: tasks.run_playbook_task.apply_async((playbook,), queue='priority.high') #此處任務丟入執行 logger.info('add tasks_deploy completed, congrats')
          #將以下日志反饋到前端 deployment_result_list.append({
'id': '{}'.format(callback_id), 'ip_list': ip_list, 'ssh_user': ssh_list, 'path': path_list, 'status':'任務提交celery成功,請在到日志列表中查詢相關記錄.' })
 .............省略.............

tasks.py類

# -*- coding: utf-8 -*-
import logging
import os
import time
try: import simplejson as json
except ImportError: import json
from util import ansibleutil
import logging
from util import deploymentutil
from celery import shared_task #導入celery任務執行裝飾器 import traceback
import datetime
import requests
import time
from django.db import transaction
try:
    import simplejson as json
except ImportError:
    import json
from assets_cd.models import DeployLog

logger = logging.getLogger(__name__)

@shared_task #裝飾器引用,執行celery異步開始 def run_playbook_task(playbook):
    begin_time = datetime.datetime.now()
    end_time = datetime.datetime.now()
    diff_time = end_time - begin_time
    elapsed_time = '{}.{}'.format(diff_time.seconds, diff_time.microseconds)
    serial_number = playbook['serial_number']
    callback_url = playbook['callback_url']

    try:
        ssh_user = playbook['ssh_user']
        sudo_user = playbook['sudo_user']
        playbook_path = playbook['playbook_path']
        playbook_extra_vars = playbook['playbook_extra_vars']
        os_type = playbook['os_type']
        ip = playbook['ip']
        result = 0
        DeployLog.objects.filter(
            serial_number=serial_number).update(
            begin_time=begin_time,
            status=-3,
            result=-3,
        )

#ansible任務執行過程,耗時就在此處,將整個任務丟入celery異步執行。不影響前端快速響應 ansible_result
= ansibleutil.run_playbook( serial_number=serial_number, ssh_user=ssh_user, sudo_user=sudo_user, playbook_path=playbook_path, playbook_extra_vars=playbook_extra_vars, os_type=os_type, ) deploymentutil.del_maint_ip_list(ip) logger.info('ansible_result {}'.format(ansible_result)) #執行ansible操作完成后結果狀態反饋。 result = ansible_result log_file = DeployLog.objects.get(serial_number=serial_number) if log_file: log_content = log_file.result_log # with open(log_file,'r') as f: # log_content = f.read() log_content = str(log_content).replace('\n', '<br>').replace('\t', '&nbsp;' * 8) logger.info(log_content) else: logger.info("reslut:[]") except: msg = traceback.format_exc() logger.error(msg) # log_dir = '/var/log/ansible' log_file = DeployLog.objects.get(serial_number=serial_number) if log_file: data = log_file.result_log +'\n'+ msg with transaction.atomic(): log_file.result_log = data log_file.save() else: logger.error(msg) result = -1 try: logger.info("callback_url:{}".format(callback_url)) logger.info("result:{}".format(result)) if result != 0: DeployLog.objects.filter( serial_number=serial_number).update( begin_time=begin_time, end_time=end_time, elapsed_time=elapsed_time, status=result, result=result, )
#執行結束前進行接口回調操作
if callback_url != "" or callback_url is not None: log_file = DeployLog.objects.get(serial_number=serial_number) log = -1 data_log = { "id": "{}".format(log_file.id), "result": int(log), } logger.info(json.dumps(data_log)) try: req = requests.post(callback_url,json.dumps(data_log),timeout=120) logger.info(req.status_code) logger.info(req.content) if req.status_code != 200: for i in range(3): time.sleep(5) req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break elif req.status_code == 200: logger.info("callback log url seccess.") except Exception as e: logger.info(e.args[0]) for i in range(3): time.sleep(5) try: req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break except Exception as e: logger.info(e.args[0]) pass else: DeployLog.objects.filter( serial_number=serial_number).update( end_time=end_time, elapsed_time=elapsed_time, status=result, result=result, ) if callback_url != "" or callback_url is not None: log_file = DeployLog.objects.get(serial_number=serial_number) log = log_file.result data_log = { "id": "{}".format(log_file.id), "result": int(log), } logger.info(json.dumps(data_log)) try: req = requests.post(callback_url,json.dumps(data_log),timeout=120) if req.status_code != 200: for i in range(3): time.sleep(5) req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break elif req.status_code == 200: logger.info("callback log url seccess.") except Exception as e: logger.info(e.args[0]) for i in range(3): time.sleep(5) try: req = requests.post(callback_url, json.dumps(data_log), timeout=120) if req.status_code == 200: break except Exception as e: logger.info(e.args[0]) pass
except: logger.error(traceback.format_exc()) logger.info('run_ansible_playbook, result {}'.format(result))

6、執行流程:

前端頁面事務提交--->后端view接收任務--->任務數據存入redis--->celery觸發任務執行(任務來源redis數據庫)--->結果反饋前端--->形成閉環


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM